It is one of the known issue with Celery and Redis. In one of my project I used to assign a unique identifier in cache for each task and then in the beginning of the task just check if the key already exist or not. You can write a context manager for such thing. Something like this
@contextmanager def task_lock(lock_id, oid, lock_expire_seconds = 600, unlock_after_finish = False): "" " Be sure that task runs only once : param lock_id: unique id of task: param oid: unique id of current job(needs for debug only): param lock_expire_seconds: task will be unlocked in x seconds: param unlock_after_finish: bool, allow run next task after finish of current one "" " timeout_at = datetime.utcnow() + timedelta(seconds = lock_expire_seconds) oid = "{}-{}".format(os.environ.get("HOSTNAME", ""), oid) # cache.add fails if the key already exists status = cache.add(lock_id, oid, lock_expire_seconds) try: yield status finally: # cache delete is very slow, but we have to use it to take # advantage of using add() for atomic locking if unlock_after_finish and datetime.utcnow() < timeout_at: # don 't release the lock if we exceeded the timeout # to lessen the chance of releasing an expired lock # owned by someone else. cache.delete(lock_id)
And then in your task code you can do
def some_task():
with task_lock("task-lcok", current_time, lock_expire_seconds = 10) as acquired:
if acquired:
# do something
I switched my celery broker to SQLalchemy to avoid this issue. It solved the multiple execution for ETA tasks bug.,Django with celery: scheduled task (ETA) executed multiple times in parallel,Celery with Redis broker and multiple queues: all tasks are registered to each queue,Multiple celery server but same redis broker executing task twice
installed SQLAlchemy by:
pip install SQLAlchemy
updated settings.py:
BROKER_URL='sqla+mysql://<mysql user>:<mysql password>@localhost/<mysql db_name>'
By default multiprocessing is used to perform concurrent execution of tasks, but you can also use Eventlet. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of CPUs available on the machine.,With this option you can configure the maximum number of tasks a worker can execute before it’s replaced by a new process.,or if you use celery multi you want to create one file per worker instance so use the %n format to expand the current node name:,With this option you can configure the maximum amount of resident memory a worker can execute before it’s replaced by a new process.
$ celery - A proj worker - l INFO
$ celery worker--help
$ celery - A proj worker--loglevel = INFO--concurrency = 10 - n worker1 @ % h $ celery - A proj worker--loglevel = INFO--concurrency = 10 - n worker2 @ % h $ celery - A proj worker--loglevel = INFO--concurrency = 10 - n worker3 @ % h
$ pkill - 9 - f 'celery worker'
$ ps auxww | awk '/celery worker/ {print $2}' | xargs kill - 9
$ celery multi start 1 - A proj - l INFO - c4--pidfile = /var/run / celery / % n.pid
$ celery multi restart 1--pidfile = /var/run / celery / % n.pid
Posted on November 25, 2021
$ touch docker - compose.yml requirements.txt $ touch tasks.py # create & activate the virtualenv $ python - m venv env $ source env / bin / activate
pip install celery == 5.0 .5 redis
version: "3"
services:
rabbitmq:
image: rabbitmq: latest
environment:
-RABBITMQ_DEFAULT_USER = guest -
RABBITMQ_DEFAULT_PASS = guest
ports:
-"5672:5672"
redis:
image: redis: latest
ports:
-"6379:6379"
from celery
import Celery
from time
import sleep
broker_url = "amqp://localhost"
redis_url = "redis://localhost"
app = Celery('tasks', broker = broker_url, backend = redis_url)
@app.task
def say_hello(name: str):
sleep(5)
return f "Hello {name}"
$ docker - compose up - d
$ docker ps