'How can I get results/failures for celery tasks from a redis backend when I don't store their their task-ID?
In my web-application I start backgrounds jobs with celery without storing their id. Some of the task are periodic and some are triggered by user-interaction. The celery-tasks just do their thing and eventually the user will see the updated data in their browser. When a task has recently failed, I want to notify all logged-in admin-users about it (since they are usually the ones who triggered the recent failure). So they at least know something's up.
The relevant celery-methods I found, either require a valid task-id (e.g. celery.result.AsyncResult
) or they only have infos about active tasks, but not about finished/failed tasks (e.g. celery.app.control.Inspect
).
I am using a flask-frontend, a redis-backend for celery and also a regular DB for persistent data.
How would I collect information about recently finished or failed celery tasks in this scenario?
What I have tried:
# I setup celery with
my_celery_project = Celery(__name__,
backend='redis://localhost:1234/0',
broker='redis://localhost:1234/0')
# later in the view I want to collect status information:
i = my_celery_project.control.inspect()
i.active() # this one exists, but I don't care about it
i.failed() # this is what I want, but it doesn't exist
i.results() # this also doesn't exist
# getting the result directly also doesn't work, since they require an id, which i don't have
res = AsyncResult(id_i_don_have,app=app)
It should be possible, since I can get the results manually from redis with redis-cli --scan
and then do my_task.AsyncResult('id_from_redis').status
to check the result. Something similar to flower could also work, but that would't work so well with the state-less nature of a web-application, I think.
this is not a duplicate of these questions, since they don't assume a redis-backend. Also they are 4+ years out-of-date:
- Celery: list all tasks, scheduled, active *and* finished
- Python celery: Retrieve tasks arguments if there's an exception
- How can I get a list of succeeded Celery task ids?
this is not a duplicate of these questions, since my redis-backend is in fact working:
this is not a duplicate of this questions, since it is exactly the opposite to my questions. They care about old results, while I care explicitly only about recent results: How to read celery results from redis result backend
Solution 1:[1]
in the end my solution was to fetch the IDs directly form the backend and then convert them to Object via my celery-instance:
task_results: List[AsyncResult] = []
for key in my_celery_project.backend.client.scan_iter("celery-task-meta-*"):
task_id = str(key).split("celery-task-meta-", 1)[1].replace("'", "")
task_results.append(self.celery.AsyncResult(task_id))
return task_results
then I used async_result.ready()
to filter out the ones I'm interested on.
on a side note: Now I also call async_result.forget()
to cleanup old tasks, which I didn't do before.
Solution 2:[2]
you should use signal ,like this:
from celery import signals
@signals.task_failure.connect
def exception_handle(sender, task_id, exception, **kwargs):
if isinstance(exception, redis.exceptions.LockError):
loggert.warning(f"{sender.__qualname__}[{task_id}] can't get lock")
return
loggert.exception(f"{sender.__qualname__}[{task_id}] args={kwargs['args']} kwargs={kwargs['kwargs']} Exception:\n")
@signals.after_setup_logger.connect
def celery_log(logger, **kwargs):
check_console(logger, **kwargs)
@signals.after_setup_task_logger.connect
def task_log(logger, **kwargs):
# todo: add your loggre handle herre...
check_console(logger, **kwargs)
@signals.worker_ready.connect
def clean_lock(**kwargs):
loggert.info('worker_ready')
@signals.worker_init.connect
def hook_prefork(sender, **kwargs):
...
def check_console(logger, format, **kwargs):
if not list(filter(lambda x: type(x) is logging.StreamHandler, logger.handlers)):
console = logging.StreamHandler()
console.setFormatter(logging.Formatter(format))
console.setLevel(logging.INFO)
logger.addHandler(console)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | wotanii |
Solution 2 |