I need to change:
@mock.patch('django.core.mail.mail_managers')
with
@mock.patch('path.to.tasks.mail_managers')
To illustrate, this could be like:
# views.py from path.to.tasks import my_task def my_view(requtest): # do stuff my_task.delay('foo', 'bar') return HttpResponse('whatever') # test_my_task.py from views import my_view from path.to.tasks import my_task class MyTest(TestCase): @mock.patch('path.to.tasks.my_task') def test_my_task_is_called(self, mocked_task): client = Client() client.get('/') my_task.assert_called_with('foo', 'bar') def test_my_task_works(self): my_task('foo', 'bar') # note I don 't use .delay(...), .apply_async(...), etc assert 'my task did what I expected it to do'
Don’t show irrelevant/duplicated/”internal” Task attrs in UI (#22812),Make REST API patch user endpoint work the same way as the UI (#18757),Add release date for when an endpoint/field is added in the REST API (#19203),Run airflow webserver to start the new UI. This will bring up a log in page, enter the recently created admin username and password.
from airflow.utils.log.timezone_aware import TimezoneAware # before class YourCustomFormatter(logging.Formatter): ... # after class YourCustomFormatter(TimezoneAware): ... AIRFLOW_FORMATTER = LOGGING_CONFIG["formatters"]["airflow"] AIRFLOW_FORMATTER["class"] = "somewhere.your.custom_config.YourCustomFormatter" # or use TimezoneAware class directly.If you don 't have custom Formatter. AIRFLOW_FORMATTER["class"] = "airflow.utils.log.timezone_aware.TimezoneAware"
@dag.task(params = {
"a": {
1,
2,
3
},
"b": pendulum.now()
})
def datetime_param(value):
print(value)
datetime_param("{{ params.a }} | {{ params.b }}")
> raise exc.NoSuchModuleError(
"Can't load plugin: %s:%s" % (self.group, name)
)
E sqlalchemy.exc.NoSuchModuleError: Can 't load plugin: sqlalchemy.dialects:postgres
INSERT INTO log_template(id, filename, elasticsearch_id, created_at) VALUES(0, '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log', '{dag_id}-{task_id}-{execution_date}-{try_number}', NOW());
airflow connections
export -2 > & 1 > /dev/null | grep 'non-JSON'
Param(None, type = ["null", "string"])
Mocking Celery `self.request` attribute for bound tasks when called directly,Another option is to have the task be a anycodings_celery wrapper method, such as:,Was able to get something working by anycodings_celery using setattr on the task method, not anycodings_celery sure if there is a better/other ways to anycodings_celery do this:,And then you can test some_method anycodings_celery directly, passing in whatever ID you anycodings_celery like.
helpers.py:
from task
import some_task
def some_helper():
some_task.delay(123)
in task.py:
@app.task(queue = "abc", bind = True)
def some_task(self, some_number: int):
print(self.id) # how to mock this attribute access ?
Simple test case:
from django.test.testcases
import TestCase
from helpers
import some_helper
class SomeTest(TestCase):
def test_some_helper(self):
some_helper()
I also tried:
class MockResult(dict):
def __getattr__(self, x):
return self[x]
...
def test_some_task(self):
cls = MockResult({
"id": "asdf"
})
bound_some_task = some_task.__get__(cls, MockResult)
bound_some_task(123)
Given a celery task that looks like:
@my_celery_app.task(bind = True)
def my_task(self):
if self.request.retries == 1:
my_method_to_invoke()
# Do work
for first retry
elif self.request.retries == 2:
# Do work
for second retry
# do work
for
main task
In the unit test the following can be anycodings_celery done
@patch("path.to.task.my_method_to_invoke")
@patch("celery.app.task.Task.request")
def my_test_method(self, mock_task_request, mock_my_method_to_invoke):
# Set the value of retries directly
mock_task_request.retries = 1
# Call the task and assert the inside method was
# called
my_task()
mock_my_method_to_invoke.assert_called_once()
Was able to get something working by anycodings_celery using setattr on the task method, not anycodings_celery sure if there is a better/other ways to anycodings_celery do this:
from django.test.testcases
import TestCase
from helpers
import some_helper
class SomeTest(TestCase):
def test_some_helper(self):
from task
import some_task
setattr(some_task, 'id', 'hello-id')
some_helper()
In addition to this it is possible to anycodings_celery mock the request.id or "task id" like anycodings_celery so:
@patch("task.some_task.request.id", return_value = "hello-id")
def test_some_helper(...): ....
What helped me was to create a mock anycodings_celery class mimicking the task
class CeleryTaskHelper:
""
"Mock task id"
""
class Request:
@property
def id(self):
return ''.join(random.choices(string.ascii_uppercase + string.digits, k = 20))
@property
def request(self):
return self.Request()
def revoke(self):
return
and then
@patch('apps.orders.tasks.activate_order.apply_async', return_value = CeleryTaskHelper())
One options is to call the task anycodings_celery synchronously using
task = some_task.s(<args>).apply()
Another option is to have the task be a anycodings_celery wrapper method, such as:
@app.task
def some_task(self, ...args):
some_method(self.request.id, ...args)