SoFunction
Updated on 2024-11-15

Use Celery Once to prevent Celery from performing the same task over and over again.

When I was using Celery, I realized that sometimes Celery would execute the same task twice. I encountered a situation where the same task was executed separately in different workers, and the time difference was only a few milliseconds. I thought I had a problem with the processing logic, but then I realized that other people also had similar problems, and basically those who had problems were using Redis as a broker, and I didn't want to replace Redis, so I had to add distributed locks when the task was executed.

However, after searching through the Celery issue, I found that some people have implemented distributed locking using Redis, and then some people have used Celery Once. I took a look at Celery Once, and found that it was a good fit for what I'm doing right now, so I went ahead and used it.

Celery Once also utilizes Redis locking, Celery Once implements the QueueOnce class on top of the Task class, which provides task de-weighting functionality, so when we use it, we need to set QueueOnce as the base of the method we implement ourselves.

@task(base=QueueOnce, once={'graceful': True})

The once parameter indicates how to handle the duplicate method when it is encountered. graceful is False by default, so Celery will throw an AlreadyQueued exception, and is set to True manually to handle it silently.

Alternatively, if you want to set the key for the task manually, you can specify the keys parameter

@(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

In a nutshell, there are several steps

Step 1: Installation

pip install -U celery_once

Step 2: Add configuration

from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://guest@localhost//')
 = {
  'backend': 'celery_once.',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

Step 3: Modify the delay method

(10)
# Revise to
result = example.apply_async(args=(10))

Step 4: Modify the task parameters

@(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

Reference Links /cameronmaske/celery-once

To this point this article on the use of Celery Once to prevent Celery repeated execution of the same task of the article is introduced to this, more related to Celery repeated execution of the same task content please search for my previous articles or continue to browse the following related articles I hope you will support me in the future more!