Retrying in Celery

retrying in celery

In Django projects, we ship our long-running tasks in Celery and sometimes unfinished tasks require a retry mechanism.

For a long-running task, a user will be awaited at least two minutes and it’s not a reasonable amount of time to respond. Instead, the task can be sent to a taskmaster like Celery, and the user can be redirected to a dashboard, indicating the progress status with a loading animation. The animation can be changed to a check mark after the task is finished.

Generally, we expect a specific output from a worker, in the end, it must finish the task and return the result. So a code block like this is not reliable:

@celery_app.task()
def scanner(data: dict, *args, **kwargs):
    url = data["url"]
    headers = data["headers"]
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        response = response.json()
        if response.get("success"):
            celery_logger.info(f"Found: {url}")
            return response.get("data")
    except requests.exceptions.RequestException as e:
        celery_logger.error(e)

This code block will

  • Run only once
  • Return result only in a successful result
  • Won’t try again the same request if the success message is fail or throws exception

What we should do here is use the power of Celery and mutate this code into a structure that fits our needs here.

Here the only thing to make sure this function automatically retries is to bind the function to Celery. Binding assures that this function will be proxied as a class method so we can run specific methods that Celery provided to us like retry. So our code block becomes like this:

@celery_app.task(bind=True, soft_time_limit=600)
def scanner(self, data: dict, *args, **kwargs):
    url = data["url"]
    headers = data["headers"]
    found = False
    try_count = 0
    while not found:
        response = requests.get(url, headers=headers)
        if response.status_code < 400:
            found = True
            MyModel.objects.create(data=response.json())
        else:
            try_count += 1
            celery_logger.info(
                f"Tried {try_count} times - Status: {response.status_code} - {datetime.now().strftime('%I:%M:%S %p')}"
            )
            raise self.retry(countdown=15)

Oh, wait, I can’t try-catch this!

If you read this documentation section you will learn that the retry mechanism works for catchable errors. Consider this code block (as an example):

@app.task
def process_upload(filename, tmpfile):
    copy_file_to_destination(filename, tmpfile)

If there is a crash while copying a file to the destination on the fly, since the state will be incomplete probably you can’t catch it, and the retry mechanism won’t handle this. There’s another mechanism called ack-late.

Before going into detail we need to understand when the messages are removed from the queue.
When a message (in the queue) is received by a worker, the Broker (Redis, AMQP, etc.) waits for it to be acknowledged marking the message as processed. The first worker takes the message, processes it and in the end sends “acked” to the broker, broker removes the message from the queue in a successful scenario. By the way, the broker won’t resend that message to another worker until the current worker is shut down properly if it’s still not acknowledged.

The ack_late setting determines when the worker will acknowledge the task.

  • When set to true, the message is acked after the worker finishes executing the task.
  • When set to false, the message is acked before the worker starts executing the task.

So it’s a mechanism that ensures the tasks are executed until completion at once. If your tasks are idempotent you should better set acks_late to true as a best practice.

Moreover even if acks_late is enabled, the worker will acknowledge messages when the worker abruptly exits or is KILL signalled during execution. To override this behaviour reject_on_worker_lost setting can be enabled so the messages will be re-queued and executed again.

ack_late is false by default.

Extras

There are tons of other parameters that support the retry mechanism, you can find them in the documentation here.

Moreover, if there’s a time requirement for a task like this, the task should finish its job in 5 minutes, you can enforce time limits on it like:

@celery_app.task(bind=True, time_limit=600, soft_time_limit=540)
def scanner(self, data: dict, *args, **kwargs):
    ...

time_limit is a hard time limit, and kills the job directly on hit.

soft_time_limit throws an exception that you can catch. Think of this as the last warning before hitting the hard time limit. You can manually kill the task on soft time limit hit if you prefer.

Resources
5 tips for writing production-ready Celery tasks
Rusty Celery – Best Practices
Celery Best Practices
https://docs.celeryq.dev/