'How can I get results/failures for celery tasks from a redis backend when I don't store their their task-ID?

In my web-application I start backgrounds jobs with celery without storing their id. Some of the task are periodic and some are triggered by user-interaction. The celery-tasks just do their thing and eventually the user will see the updated data in their browser. When a task has recently failed, I want to notify all logged-in admin-users about it (since they are usually the ones who triggered the recent failure). So they at least know something's up.

The relevant celery-methods I found, either require a valid task-id (e.g. celery.result.AsyncResult) or they only have infos about active tasks, but not about finished/failed tasks (e.g. celery.app.control.Inspect).

I am using a flask-frontend, a redis-backend for celery and also a regular DB for persistent data.

How would I collect information about recently finished or failed celery tasks in this scenario?

What I have tried:

# I setup celery with 
my_celery_project = Celery(__name__,
                backend='redis://localhost:1234/0',
                broker='redis://localhost:1234/0')

# later in the view I want to collect status information:

i = my_celery_project.control.inspect()

i.active() # this one exists, but I don't care about it
i.failed() # this is what I want, but it doesn't exist
i.results() # this also doesn't exist


# getting the result directly also doesn't work, since they require an id, which i don't have
res = AsyncResult(id_i_don_have,app=app)

It should be possible, since I can get the results manually from redis with redis-cli --scan and then do my_task.AsyncResult('id_from_redis').status to check the result. Something similar to flower could also work, but that would't work so well with the state-less nature of a web-application, I think.


this is not a duplicate of these questions, since they don't assume a redis-backend. Also they are 4+ years out-of-date:

this is not a duplicate of these questions, since my redis-backend is in fact working:

this is not a duplicate of this questions, since it is exactly the opposite to my questions. They care about old results, while I care explicitly only about recent results: How to read celery results from redis result backend



Solution 1:[1]

in the end my solution was to fetch the IDs directly form the backend and then convert them to Object via my celery-instance:


  task_results: List[AsyncResult] = []
  for key in my_celery_project.backend.client.scan_iter("celery-task-meta-*"):
    task_id = str(key).split("celery-task-meta-", 1)[1].replace("'", "")
    task_results.append(self.celery.AsyncResult(task_id))
  return task_results

then I used async_result.ready() to filter out the ones I'm interested on.

on a side note: Now I also call async_result.forget() to cleanup old tasks, which I didn't do before.

Solution 2:[2]

you should use signal ,like this:

from celery import signals

@signals.task_failure.connect
def exception_handle(sender, task_id, exception, **kwargs):
    if isinstance(exception, redis.exceptions.LockError):
        loggert.warning(f"{sender.__qualname__}[{task_id}] can't get lock")
        return
    loggert.exception(f"{sender.__qualname__}[{task_id}] args={kwargs['args']} kwargs={kwargs['kwargs']} Exception:\n")

@signals.after_setup_logger.connect
def celery_log(logger, **kwargs):
    check_console(logger, **kwargs)


@signals.after_setup_task_logger.connect
def task_log(logger, **kwargs):
    # todo: add your loggre handle herre...
    check_console(logger, **kwargs)


@signals.worker_ready.connect
def clean_lock(**kwargs):
    loggert.info('worker_ready')


@signals.worker_init.connect
def hook_prefork(sender, **kwargs):
    ...

def check_console(logger, format, **kwargs):
    if not list(filter(lambda x: type(x) is logging.StreamHandler, logger.handlers)):
        console = logging.StreamHandler()
        console.setFormatter(logging.Formatter(format))
        console.setLevel(logging.INFO)
        logger.addHandler(console)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 wotanii
Solution 2