'Task priority in celery with redis

I would like to implement a distributed job execution system with celery. Given that rabbitMQ doesn't support priorities and I'm painfully needing this feature, I turned to celery+redis.

In my situation, the tasks are closely related to hardware, for example, task A could only run on Worker 1 since only the PC of Worker 1 has got the necessary hardware. I set the CONCURRENCY of each worker to 1 so that a worker will only run one task each time. Each task takes about 2 minites.

To implement the priority feature, first of all I tried adding priority argument when calling apply_async(), for example apply_async(priority=0) and apply_async(priority=9). In this test I launched only one Worker with COCURRENCY=1, and kicked off 10 tasks one by one with different priorities. I expected to see the tasks kicked off by apply_async(priority=0) will run in priority, but unfortunately they're just started as the kicking-off order.

Then I try to do some work around. I cloned each task, so for each one I have task_high and task_low, decorated by @celery.task(priority=0) and @celery.task(priority=1). Then I did the same test as above, this time it was better, when the kicking-off order is "HH-LLLL-HHHH", the real order comes out to be "HH-L-H-H-L-H-L-L-H". I suppose redis did some scheduling and balancing work here.

But this still can't meet my expectation. I hope to get an order like "HHHHHH-LLLL", because for some tasks I have only one proper machine with the necessary hardware and hope the high-priority task to run as soon as possible.

I've searched for other work around on the Internet, for example using two queues, one for high-priority tasks and the other for low-priority, and using 2 machines for the former and 1 machine for the latter. But since my hardware is quite limited, this doesn't work for me.

Could you please give some suggestions?



Solution 1:[1]

The Celery Redis transport does honor the priority field, but Redis itself has no notion of priorities.

The priority support is implemented by creating n lists for each queue and using that order in the BRPOP command. I say n here because even though there are 10 (0-9) priority levels, these are consolidated into 4 levels by default to save resources. This means that a queue named celery will really be split into 4 queues:

['celery0', 'celery3`, `celery6`, `celery9`]

If you want more priority levels you can set the priority_steps transport option:

BROKER_TRANSPORT_OPTIONS = {
    'priority_steps': list(range(10)),
}

That said, note that this will never be as good as priorities implemented at the server level, and may be approximate at best. But it may still be good enough for your application.

Source.

Solution 2:[2]

Celery docs about redis message priorities is here redis-message-priorities,you can customize priority levels.Take 10 for example:

  1. set the priority_steps transport option
app.conf.broker_transport_options = {
    'priority_steps': list(range(10)),
    'queue_order_strategy': 'priority',
}
  1. start celery worker in normal way
celery -A tasks worker --loglevel=info
  1. calling tasks, 0 being highest priority and 9 being lowest priority
custom_priority=5 
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'},priority=custom_priority)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 pdoherty926
Solution 2 ideastudios