Celery
Task queue, runs on its own server
Worker jobs runs in background
All data is serialized/deserialized, default is json
Need a message broker(SQS, RabbitMQ) and a backend(Redis, Postgres/Mongo DB)
v5.2 allows max of Python 3.10
Setup
pip install celery
pip install "celery[sqs]" #to use with sqs
Need to choose a message broker(see Deployment/Task Queues)
File Structure
module_name/
__init__.py
celery.py
tasks.py
SQS
Make sure to turn debug mode off in Django to not leak url and use amqp
SQS doesn’t yet support worker remote control command or events, celery events, celerymon, or Django Admin monitor
from kombu.utils.url import safequote
aws_access_key = safequote("ABCDEFGHIJKLMNOPQRST")
aws_secret_key = safequote("ZYXK7NiynG/TogH8Nj+P9nlE73sq3")
broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
Predefined Queues if no create/delete queues
broker_transport_options = {
'predefined_queues': {
'my-q': {
'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
'access_key_id': 'xxx',
'secret_access_key': 'xxx',
}
}
}
Basics
backend url can be db external url in form of
db+postgresql:user:password@url/db_name
module_name/celery.py
from celery import Celery
app = Celery('module_name', broker=some_broker_url, backend='rpc://', include=["module_name.tasks"]) # name of current module, broker url, backend results storage, name of task modules
# Optional configuration, see the app user guide
app.conf.update(result_expires=3600)
if __name__ == '__main__':
app.start()
module_name/tasks.py
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def xsum(numbers):
return sum(numbers)
Then run the worker in folder above:
celery -A module_name worker -l INFO
run as background process using daemon in prod
Then in seperate terminal python script inside the folder you can schedule tasks that return a result:
>> from proj.tasks import add
>> result = add.delay(4, 4) # look at original server for background logs
>> result.ready() #will return true/false
Storage
Results can be stored in Redis, SQLAlchemy DB, Mongo, RPC(transient messages sent back) etc; set in backend opt
SQLAlchemy
poetry add sqlalchemy, psycopg2-binary
Just add SQLAlchemy connection string as backend and it will store task info in two new tables
db+postgresql://postgres:XXXXXXXXXXXXXXX@teachingassistant.aaaaaaaaa.us-west-2.rds.amazonaws.com:5432
, note the db+
Advanced
Tasks can scheduled with with start time, state updates listener, and task chaining/linking
result.get(timeout=1)
can wait for result making syncadd(4, 5)
will just run it in the current process
Last updated