Celery

  • Task queue, runs on its own server

  • Worker jobs runs in background

  • All data is serialized/deserialized, default is json

  • Need a message broker(SQS, RabbitMQ) and a backend(Redis, Postgres/Mongo DB)

  • v5.2 allows max of Python 3.10

Setup

pip install celery
pip install "celery[sqs]" #to use with sqs

Need to choose a message broker(see Deployment/Task Queues)

File Structure

  • module_name/

    • __init__.py

    • celery.py

    • tasks.py

SQS

  • Make sure to turn debug mode off in Django to not leak url and use amqp

  • SQS doesn’t yet support worker remote control command or events, celery events, celerymon, or Django Admin monitor

from kombu.utils.url import safequote

aws_access_key = safequote("ABCDEFGHIJKLMNOPQRST")
aws_secret_key = safequote("ZYXK7NiynG/TogH8Nj+P9nlE73sq3")

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)

Predefined Queues if no create/delete queues

broker_transport_options = {
    'predefined_queues': {
        'my-q': {
            'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
            'access_key_id': 'xxx',
            'secret_access_key': 'xxx',
        }
    }
}

Basics

  • backend url can be db external url in form of db+postgresql:user:password@url/db_name

module_name/celery.py

from celery import Celery

app = Celery('module_name', broker=some_broker_url, backend='rpc://', include=["module_name.tasks"]) # name of current module, broker url, backend results storage, name of task modules

# Optional configuration, see the app user guide
app.conf.update(result_expires=3600)

if __name__ == '__main__':
    app.start()

module_name/tasks.py

from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def xsum(numbers):
    return sum(numbers)

Then run the worker in folder above:

celery -A module_name worker -l INFO

run as background process using daemon in prod

Then in seperate terminal python script inside the folder you can schedule tasks that return a result:

>> from proj.tasks import add
>> result = add.delay(4, 4) # look at original server for background logs
>> result.ready() #will return true/false

Storage

Results can be stored in Redis, SQLAlchemy DB, Mongo, RPC(transient messages sent back) etc; set in backend opt

SQLAlchemy

poetry add sqlalchemy, psycopg2-binary
  • Just add SQLAlchemy connection string as backend and it will store task info in two new tables

  • db+postgresql://postgres:XXXXXXXXXXXXXXX@teachingassistant.aaaaaaaaa.us-west-2.rds.amazonaws.com:5432, note the db+

Advanced

  • Tasks can scheduled with with start time, state updates listener, and task chaining/linking

  • result.get(timeout=1) can wait for result making sync

  • add(4, 5) will just run it in the current process

  • Can do complex workflows with primitives group, chain, chord, map, starmap, chunks

Last updated