If the task is in pending state or if it is executing currently, you can see the arguments of the task. The easiest way is to use celery inspect method.
from celery.task.control
import inspect
i = inspect()
active_tasks = i.active()
reserved_tasks = i.reserved()
scheduled_tasks = i.scheduled()
You can iterate over them and by using task id, you can get all the task details like this
{
'acknowledged': True,
'args': '(1000,)',
'delivery_info': {
'exchange': '',
'priority': 0,
'redelivered': None,
'routing_key': 'celery'
},
'hostname': 'celery@pavilion',
'id': '30d41ba2-3e71-49ce-8e7d-830ba1152256',
'kwargs': '{}',
'name': 't.wait',
'time_start': 1007.945882783,
'type': 't.wait',
'worker_pid': 10560
}
Results How do I get the result of a task if I have the ID that points there? ,I’ve discarded messages, but there are still messages left in the queue?,How do I get the result of a task if I have the ID that points there?,This shows that there’s 2891 messages waiting to be processed in the task queue, and there are two consumers processing them.
[mysqld] transaction - isolation = READ - COMMITTED
$ rabbitmqctl list_queues -p <myvhost> name messages consumers
Listing queues ...
celery 2891 2
ps auxww | grep celeryd | awk '{print $2}' | xargs kill
ps auxww | grep celeryd | awk '{print $2}' | xargs kill - 9
>>> from myapp.tasks
import MyPeriodicTask
>>>
MyPeriodicTask.delay()
>>> from celery.task.control
import discard_all
>>>
discard_all()
1753
A task is a class that can be created out of any callable. It performs dual roles in that it defines both what happens when a task is called (sends a message), and what happens when a worker receives that message.,Any keyword argument passed to the task decorator will actually be set as an attribute of the resulting task class, and this is a list of the built-in attributes.,A best practice is to use the module name as a name-space, this way names won't collide if there's already a task with that name defined in another module.,When you call retry it'll send a new message, using the same task-id, and it'll take care to make sure the message is delivered to the same queue as the originating task.
connect_timeout, read_timeout = 5.0, 30.0 response = requests.get(URL, timeout = (connect_timeout, read_timeout))
from.models
import User
@app.task
def create_user(username, password):
User.objects.create(username = username, password = password)
@app.task(serializer = 'json')
def create_user(username, password):
User.objects.create(username = username, password = password)
from celery
import shared_task
@shared_task
def add(x, y):
return x + y
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
logger = get_task_logger(__name__) @app.task(bind = True) def add(self, x, y): logger.info(self.request.id)
If you have no previous experience with Celery, I encourage you first to try it out following the official tutorial.,The first case we will cover is report generation and export. In this example, you will learn how to define a task that produces a CSV report and schedule it at regular intervals with celerybeat.,In this article, I will try to give you a good understanding of which scenarios could be covered by Celery. Not only will you see interesting examples, but will also learn how to apply Celery with real world tasks such as background mailing, report generation, logging and error reporting. I will share my own way of testing tasks beyond emulation and explain a few tricks that go beyond the official documentation and took me hours of research to discover myself. ,Now what you need to do is to repeat the steps of launching Celery, starting up the shell, and testing execution of this task on different scenarios. Fixtures are located under the celery_uncovered/tricks/fixtures/locales/ directory.
The project layout was generated by Cookiecutter Django; however, I only kept a few dependencies that, in my opinion, facilitate the development and preparation of these use cases. I also removed unnecessary modules for this post and applications to reduce noise and make the code easier to understand.
-celery_uncovered / -celery_uncovered / __init__.py - celery_uncovered / { toyex, tricks, advex } - celery_uncovered / celery.py - config / settings / { base, local, test }.py - config / urls.py - manage.py
File: celery_uncovered/celery.py
:
from __future__ import absolute_import import os from celery import Celery, signals # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local') app = Celery('celery_uncovered') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings', namespace = 'CELERY') app.autodiscover_tasks()
File: celery_uncovered/__init__.py
:
from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from.celery import app as celery_app # noqa __all__ = ['celery_app'] __version__ = '0.0.1' __version_info__ = tuple([int(num) if num.isdigit() else num for num in __version__.replace('-', '.', 1).split('.') ])
File: celery_uncovered/toyex/local.py
@shared_task
def fetch_hot_repos(since, per_page, page):
payload = {
'sort': 'stars',
'order': 'desc',
'q': 'created:>={date}'.format(date = since),
'per_page': per_page,
'page': page,
'access_token': settings.GITHUB_OAUTH
}
headers = {
'Accept': 'application/vnd.github.v3+json'
}
connect_timeout, read_timeout = 5.0, 30.0
r = requests.get(
'https://api.github.com/search/repositories',
params = payload,
headers = headers,
timeout = (connect_timeout, read_timeout))
items = r.json()[u 'items']
return items
Ideally task functions should be idempotent: meaning the function won’t cause unintended effects even if called multiple times with the same arguments. Since the worker cannot detect if your tasks are idempotent, the default behavior is to acknowledge the message in advance, just before it’s executed, so that a task invocation that already started is never executed again.,Celery will verify the arguments passed when you call the task, just like Python does when calling a normal function:,The second example will cause the task to be named differently since the worker and the client imports the modules under different names:,Note: This means the task may be executed multiple times should the worker crash in the middle of execution. Make sure your tasks are idempotent.
connect_timeout, read_timeout = 5.0, 30.0 response = requests.get(URL, timeout = (connect_timeout, read_timeout))
from.models
import User
@app.task
def create_user(username, password):
User.objects.create(username = username, password = password)
@app.task(serializer = 'json')
def create_user(username, password):
User.objects.create(username = username, password = password)
from celery
import shared_task
@shared_task
def add(x, y):
return x + y
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
logger = get_task_logger(__name__) @task(bind = True) def add(self, x, y): logger.info(self.request.id)