getting an error when writing a test for celery task

  • Last Update :
  • Techknowledgy :

You can also register the marker, by adding it to your pytest.ini file:

[pytest]
markers =
   celery( ** overrides): override celery configuration
for a test
case

Suggestion : 2

I have already search for this error on bing, and I found a question Pytest with Celery tasks works but shows "ERROR in consumer: Received unregistered task of type X which answer looks a little weird.,I found a solution written in another way, and it work for me!,Hi, when I try to run the example from Testing with Celery, I got an exception as follow:

tests├── celery│├── __init__.py│└── test_celery.py├── conftest.py
import pytest

pytest_plugins = (
   'celery.contrib.pytest',
)

@pytest.fixture(scope = 'session')
def celery_config():
   print('>>>>>>>>>>>>>>>>call conftest celery_config')
return {
   'broker_url': 'redis://localhost:6379/3',
   'result_backend': 'redis://localhost:6379/3',
   'timezone': 'Asia/Shanghai',
   'accept_content': ('json', 'pickle')
}
def test_create_task(celery_app, celery_worker):
   @celery_app.task
def mul(x, y):
   return x * y

assert mul.delay(4, 4).get(timeout = 10) == 16
@pytest.fixture(scope = 'session')
def task_warpper(celery_session_app):
   @celery_session_app.task
def mul(x, y):
   return x * y

return mul

def test_create_task(task_warpper, celery_worker):
   assert task_warpper.delay(4, 4).get(timeout = 10) == 16

Suggestion : 3

You could write unit tests for this task, using mocking like in this example:,Celery also makes a pytest plugin available that adds fixtures that you can use in your integration (or unit) test suites.,By default the fixture will wait up to 10 seconds for the worker to complete outstanding tasks and will raise an exception if the time limit is exceeded. The timeout can be customized by setting the shutdown_timeout key in the dictionary returned by the celery_worker_parameters() fixture.,This fixture starts a worker that lives throughout the testing session (it won’t be started/stopped for every test).

from.models
import Product

@app.task(bind = True)
def send_order(self, product_pk, quantity, price):
   price = Decimal(price) # json serializes this to string.

# models are passed by id, not serialized.
product = Product.objects.get(product_pk)

try:
product.order(quantity, price)
except OperationalError as exc:
   raise self.retry(exc = exc)
from pytest
import raises

from celery.exceptions
import Retry

#
for python 2: use mock.patch from `pip install mock`.
from unittest.mock
import patch

from proj.models
import Product
from proj.tasks
import send_order

class test_send_order:

   @patch('proj.tasks.Product.order') # < patching Product in module above
def test_success(self, product_order):
   product = Product.objects.create(
      name = 'Foo',
   )
send_order(product.pk, 3, Decimal(30.3))
product_order.assert_called_with(3, Decimal(30.3))

@patch('proj.tasks.Product.order')
@patch('proj.tasks.send_order.retry')
def test_failure(self, send_order_retry, product_order):
   product = Product.objects.create(
      name = 'Foo',
   )

# Set a side effect on the patched methods
# so that they raise the errors we want.
send_order_retry.side_effect = Retry()
product_order.side_effect = OperationalError()

with raises(Retry):
   send_order(product.pk, 3, Decimal(30.6))
@pytest.mark.celery(result_backend = 'redis://')
def test_something():
   ...
@pytest.mark.celery(result_backend = 'redis://')
class test_something:

   def test_one(self):
   ...

   def test_two(self):
   ...
def test_create_task(celery_app, celery_worker):
   @celery_app.task
def mul(x, y):
   return x * y

assert mul.delay(4, 4).get(timeout = 10) == 16
# Put this in your conftest.py
@pytest.fixture(scope = 'session')
def celery_config():
   return {
      'broker_url': 'amqp://',
      'result_backend': 'redis://'
   }

def test_add(celery_worker):
   mytask.delay()

# If you wish to override some setting in one test cases
# only - you can use the ``
celery``
mark:
   @pytest.mark.celery(result_backend = 'rpc')
def test_other(celery_worker):
   ...

Suggestion : 4

Automatically Retrying Failed Celery Tasks (this article!),In this Celery article, we looked at how to automatically retry failed celery tasks.,In this article, we'll look at how to automatically retry failed Celery tasks.,Retry a failed Celery task with both the retry method and a decorator argument

@shared_task
def task_process_notification():
   if not random.choice([0, 1]):
   # mimic random error
raise Exception()

requests.post('https://httpbin.org/delay/5')
@shared_task(bind = True)
def task_process_notification(self):
   try:
   if not random.choice([0, 1]):
   # mimic random error
raise Exception()

requests.post('https://httpbin.org/delay/5')
except Exception as e:
   logger.error('exception raised, it would be retry after 5 seconds')
raise self.retry(exc = e, countdown = 5)
>>> from polls.tasks
import task_process_notification
   >>>
   task_process_notification.delay()
Task polls.tasks.task_process_notification[06e1 f985 - 90 d4 - 4453 - 9870 - fab57c5885c4] retry: Retry in 5 s: Exception()
Task polls.tasks.task_process_notification[06e1 f985 - 90 d4 - 4453 - 9870 - fab57c5885c4] retry: Retry in 5 s: Exception()
Task polls.tasks.task_process_notification[06e1 f985 - 90 d4 - 4453 - 9870 - fab57c5885c4] succeeded in 3.3638455480104312 s: None
@shared_task(bind = True, autoretry_for = (Exception, ), retry_kwargs = {
   'max_retries': 7,
   'countdown': 5
})
def task_process_notification(self):
   if not random.choice([0, 1]):
   # mimic random error
raise Exception()

requests.post('https://httpbin.org/delay/5')
@shared_task(bind = True, autoretry_for = (Exception, ), retry_backoff = True, retry_kwargs = {
   'max_retries': 5
})
def task_process_notification(self):
   if not random.choice([0, 1]):
   # mimic random error
raise Exception()

requests.post('https://httpbin.org/delay/5')