celery/redis same task being executed multiple times in parallel

  • Last Update :
  • Techknowledgy :

It is one of the known issue with Celery and Redis. In one of my project I used to assign a unique identifier in cache for each task and then in the beginning of the task just check if the key already exist or not. You can write a context manager for such thing. Something like this

def task_lock(lock_id, oid, lock_expire_seconds = 600, unlock_after_finish = False):
Be sure that task runs only once
   : param lock_id: unique id of task: param oid: unique id of current job(needs
      for debug only): param lock_expire_seconds: task will be unlocked in x seconds: param unlock_after_finish: bool, allow run next task after finish of current one ""
timeout_at = datetime.utcnow() + timedelta(seconds = lock_expire_seconds)
oid = "{}-{}".format(os.environ.get("HOSTNAME", ""), oid)
# cache.add fails
if the key already exists
status = cache.add(lock_id, oid, lock_expire_seconds)
yield status
# cache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if unlock_after_finish and datetime.utcnow() < timeout_at:
   # don 't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone

And then in your task code you can do

def some_task():
   with task_lock("task-lcok", current_time, lock_expire_seconds = 10) as acquired:
   if acquired:
   # do something

Suggestion : 2

I switched my celery broker to SQLalchemy to avoid this issue. It solved the multiple execution for ETA tasks bug.,Django with celery: scheduled task (ETA) executed multiple times in parallel,Celery with Redis broker and multiple queues: all tasks are registered to each queue,Multiple celery server but same redis broker executing task twice

installed SQLAlchemy by:

pip install SQLAlchemy

updated settings.py:

BROKER_URL='sqla+mysql://<mysql user>:<mysql password>@localhost/<mysql db_name>'

Suggestion : 3

By default multiprocessing is used to perform concurrent execution of tasks, but you can also use Eventlet. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of CPUs available on the machine.,With this option you can configure the maximum number of tasks a worker can execute before it’s replaced by a new process.,or if you use celery multi you want to create one file per worker instance so use the %n format to expand the current node name:,With this option you can configure the maximum amount of resident memory a worker can execute before it’s replaced by a new process.

$ celery - A proj worker - l INFO
$ celery worker--help
$ celery - A proj worker--loglevel = INFO--concurrency = 10 - n worker1 @ % h
$ celery - A proj worker--loglevel = INFO--concurrency = 10 - n worker2 @ % h
$ celery - A proj worker--loglevel = INFO--concurrency = 10 - n worker3 @ % h
$ pkill - 9 - f 'celery worker'
$ ps auxww | awk '/celery worker/ {print $2}' | xargs kill - 9
$ celery multi start 1 - A proj - l INFO - c4--pidfile = /var/run / celery / % n.pid
$ celery multi restart 1--pidfile = /var/run / celery / % n.pid

Suggestion : 4

Posted on November 25, 2021

				$ touch docker - compose.yml requirements.txt
				$ touch tasks.py
				# create & activate the virtualenv
				$ python - m venv env
				$ source env / bin / activate
				pip install celery == 5.0 .5 redis
				version: "3"
				   image: rabbitmq: latest
				   -RABBITMQ_DEFAULT_USER = guest -

				   image: redis: latest
				from celery
				import Celery
				from time
				import sleep

				broker_url = "amqp://localhost"
				redis_url = "redis://localhost"
				app = Celery('tasks', broker = broker_url, backend = redis_url)

				def say_hello(name: str):
				return f "Hello {name}"
				$ docker - compose up - d
				$ docker ps