python: mock doesn't work inside celery task [duplicate]

  • Last Update :
  • Techknowledgy :

I need to change:

@mock.patch('django.core.mail.mail_managers')

with

@mock.patch('path.to.tasks.mail_managers')

To illustrate, this could be like:

# views.py
from path.to.tasks
import my_task

def my_view(requtest):
   # do stuff
   my_task.delay('foo', 'bar')
return HttpResponse('whatever')

# test_my_task.py
from views
import my_view
from path.to.tasks
import my_task

class MyTest(TestCase):
   @mock.patch('path.to.tasks.my_task')
def test_my_task_is_called(self, mocked_task):
   client = Client()
client.get('/')
my_task.assert_called_with('foo', 'bar')

def test_my_task_works(self):
   my_task('foo', 'bar') # note I don 't use .delay(...), .apply_async(...), etc
assert 'my task did what I expected it to do'

Suggestion : 2

Don’t show irrelevant/duplicated/”internal” Task attrs in UI (#22812),Make REST API patch user endpoint work the same way as the UI (#18757),Add release date for when an endpoint/field is added in the REST API (#19203),Run airflow webserver to start the new UI. This will bring up a log in page, enter the recently created admin username and password.

from airflow.utils.log.timezone_aware
import TimezoneAware

# before
class YourCustomFormatter(logging.Formatter):
   ...

   # after
class YourCustomFormatter(TimezoneAware):
   ...

   AIRFLOW_FORMATTER = LOGGING_CONFIG["formatters"]["airflow"]
AIRFLOW_FORMATTER["class"] = "somewhere.your.custom_config.YourCustomFormatter"
# or use TimezoneAware class directly.If you don 't have custom Formatter.
AIRFLOW_FORMATTER["class"] = "airflow.utils.log.timezone_aware.TimezoneAware"
@dag.task(params = {
   "a": {
      1,
      2,
      3
   },
   "b": pendulum.now()
})
def datetime_param(value):
   print(value)

datetime_param("{{ params.a }} | {{ params.b }}")
> raise exc.NoSuchModuleError(
   "Can't load plugin: %s:%s" % (self.group, name)
)
E sqlalchemy.exc.NoSuchModuleError: Can 't load plugin: sqlalchemy.dialects:postgres
INSERT INTO log_template(id, filename, elasticsearch_id, created_at) VALUES(0, '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log', '{dag_id}-{task_id}-{execution_date}-{try_number}', NOW());
airflow connections
export -2 > & 1 > /dev/null | grep 'non-JSON'
Param(None, type = ["null", "string"])

Suggestion : 3

Mocking Celery `self.request` attribute for bound tasks when called directly,Another option is to have the task be a anycodings_celery wrapper method, such as:,Was able to get something working by anycodings_celery using setattr on the task method, not anycodings_celery sure if there is a better/other ways to anycodings_celery do this:,And then you can test some_method anycodings_celery directly, passing in whatever ID you anycodings_celery like.

helpers.py:

from task
import some_task

def some_helper():
   some_task.delay(123)

in task.py:

@app.task(queue = "abc", bind = True)
def some_task(self, some_number: int):
   print(self.id) # how to mock this attribute access ?

Simple test case:

from django.test.testcases
import TestCase
from helpers
import some_helper

class SomeTest(TestCase):

   def test_some_helper(self):
   some_helper()

I also tried:

class MockResult(dict):
   def __getattr__(self, x):
   return self[x]

      ...
      def test_some_task(self):
      cls = MockResult({
         "id": "asdf"
      })
bound_some_task = some_task.__get__(cls, MockResult)
bound_some_task(123)

Given a celery task that looks like:

@my_celery_app.task(bind = True)
def my_task(self):
   if self.request.retries == 1:
   my_method_to_invoke()
# Do work
for first retry
elif self.request.retries == 2:
   # Do work
for second retry
# do work
for
main task

In the unit test the following can be anycodings_celery done

@patch("path.to.task.my_method_to_invoke")
@patch("celery.app.task.Task.request")
def my_test_method(self, mock_task_request, mock_my_method_to_invoke):

   # Set the value of retries directly
mock_task_request.retries = 1

# Call the task and assert the inside method was
# called
my_task()

mock_my_method_to_invoke.assert_called_once()

Was able to get something working by anycodings_celery using setattr on the task method, not anycodings_celery sure if there is a better/other ways to anycodings_celery do this:

from django.test.testcases
import TestCase
from helpers
import some_helper

class SomeTest(TestCase):

   def test_some_helper(self):

   from task
import some_task
setattr(some_task, 'id', 'hello-id')

some_helper()

In addition to this it is possible to anycodings_celery mock the request.id or "task id" like anycodings_celery so:

@patch("task.some_task.request.id", return_value = "hello-id")
def test_some_helper(...): ....

What helped me was to create a mock anycodings_celery class mimicking the task

class CeleryTaskHelper:
   ""
"Mock task id"
""

class Request:
   @property
def id(self):
   return ''.join(random.choices(string.ascii_uppercase + string.digits, k = 20))

@property
def request(self):
   return self.Request()

def revoke(self):
   return

and then

@patch('apps.orders.tasks.activate_order.apply_async', return_value = CeleryTaskHelper())

One options is to call the task anycodings_celery synchronously using

task = some_task.s(<args>).apply()

Another option is to have the task be a anycodings_celery wrapper method, such as:

@app.task
def some_task(self, ...args):
   some_method(self.request.id, ...args)