how can you catch a custom exception from celery worker, or stop it being prefixed with `celery.backends.base`?

  • Last Update :
  • Techknowledgy :
import celery
from celery
import shared_task

class NonTransientProcessingError(Exception):
   pass

class CeleryTask(celery.Task):

   def on_failure(self, exc, task_id, args, kwargs, einfo):
   if isinstance(exc, NonTransientProcessingError):
   ""
"
deal with NonTransientProcessingError
   ""
"
pass

def run(self, * args, ** kwargs):
   pass

@shared_task(base = CeleryTask)
def add(x, y):
   raise NonTransientProcessingError

I had the same issue. I wanted to catch the exception produced in the celery task, but the result was of the class celery.backends.base.CustomException. The solution is in the following form:

import celery, time
from celery.result
import AsyncResult
from celery.states
import FAILURE

def reraise_celery_exception(info):
   exec("raise {class_name}('{message}')".format(class_name = info.__class__.__name__, message = info.__str__()))

class CustomException(Exception):
   pass

@celery.task(name = "test")
def test():
   time.sleep(10)
raise CustomException("Exception is raised!")

def run_test():
   task = test.delay()
return task.id

def get_result(id):
   task = AsyncResult(id)

if task.state == FAILURE:
   reraise_celery_exception(task.info)

Suggestion : 2

The backend used to store task results (tombstones). Disabled by default. Can be one of the following:, rpc Send results back as AMQP messages See RPC backend settings. ,This is the old AMQP result backend that creates one queue per task, if you want to send results back as message please consider using the RPC backend instead, or if you need the results to be persistent use a result backend designed for that purpose (e.g. Redis, or a database).,This is the total number of results to cache before older results are evicted. The default is 100. 0 or None means no limit, and a value of -1 will disable the cache.

# # Broker settings.
BROKER_URL = 'amqp://guest:guest@localhost:5672//'

# List of modules to
import when celery starts.
CELERY_IMPORTS = ('myapp.tasks', )

# # Using the database to store task state and results.
CELERY_RESULT_BACKEND = 'db+sqlite:///results.db'

CELERY_ANNOTATIONS = {
   'tasks.add': {
      'rate_limit': '10/s'
   }
}
CELERY_ANNOTATIONS = {
   'tasks.add': {
      'rate_limit': '10/s'
   }
}
CELERY_ANNOTATIONS = {
   '*': {
      'rate_limit': '10/s'
   }
}
def my_on_failure(self, exc, task_id, args, kwargs, einfo):
   print('Oh no! Task failed: {0!r}'.format(exc))

CELERY_ANNOTATIONS = {
   '*': {
      'on_failure': my_on_failure
   }
}
class MyAnnotate(object):

   def annotate(self, task):
   if task.name.startswith('tasks.'):
   return {
      'rate_limit': '10/s'
   }

CELERY_ANNOTATIONS = (MyAnnotate(), {
   …})
CELERY_RESULT_BACKEND = 'db+scheme://user:password@host:port/dbname'

Suggestion : 3

Long-running jobs — Jobs that are expensive in resources, where users need to wait while they compute their results, e.g., complex workflow execution (DAG workflows), graph generation, Map-Reduce like tasks, and serving of media content (video, audio).,Now imagine that we have many such tasks, but each of those tasks accepts a locale argument. In this case, wouldn’t it be better to solve it on a higher level of abstraction? Here, we see just how to do that.,In one of my projects, I was developing an app that provides the end user with an Extract, Transform, Load (ETL)-like tool that was able to ingest and then filter a huge amount of hierarchical data. The back-end was split into two modules:,Our team made the choice to use Celery as an orchestration back-end for background jobs and long-running tasks. We use it extensively for a variety of use-cases, of which only a few were mentioned in this post. We ingest and analyze gigabytes of data every day, but this is only the beginning of horizontal scaling techniques.

The project layout was generated by Cookiecutter Django; however, I only kept a few dependencies that, in my opinion, facilitate the development and preparation of these use cases. I also removed unnecessary modules for this post and applications to reduce noise and make the code easier to understand.

    -celery_uncovered /
       -celery_uncovered / __init__.py -
       celery_uncovered / {
          toyex,
          tricks,
          advex
       } -
       celery_uncovered / celery.py -
       config / settings / {
          base,
          local,
          test
       }.py -
       config / urls.py -
       manage.py

File: celery_uncovered/celery.py:

from __future__
import absolute_import

import os
from celery
import Celery, signals

# set the
default Django settings module
for the 'celery'
program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')

app = Celery('celery_uncovered')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace = 'CELERY')
app.autodiscover_tasks()

File: celery_uncovered/__init__.py:

from __future__
import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from.celery
import app as celery_app # noqa

__all__ = ['celery_app']

__version__ = '0.0.1'
__version_info__ = tuple([int(num) if num.isdigit()
   else num
   for num in __version__.replace('-', '.', 1).split('.')
])

File: celery_uncovered/toyex/local.py

@shared_task
def fetch_hot_repos(since, per_page, page):
   payload = {
      'sort': 'stars',
      'order': 'desc',
      'q': 'created:>={date}'.format(date = since),
      'per_page': per_page,
      'page': page,
      'access_token': settings.GITHUB_OAUTH
   }
headers = {
   'Accept': 'application/vnd.github.v3+json'
}
connect_timeout, read_timeout = 5.0, 30.0
r = requests.get(
   'https://api.github.com/search/repositories',
   params = payload,
   headers = headers,
   timeout = (connect_timeout, read_timeout))
items = r.json()[u 'items']
return items