# Celery

* Task queue, runs on its own server
* Worker jobs runs in background
* All data is serialized/deserialized, default is json
* Need a message broker(SQS, RabbitMQ) and a backend(Redis, Postgres/Mongo DB)
* v5.2 allows max of Python 3.10

## Setup

```bash
pip install celery
pip install "celery[sqs]" #to use with sqs
```

Need to choose a message broker(see Deployment/Task Queues)

#### File Structure

* module\_name/
  * \_\_init\_\_.py
  * celery.py
  * tasks.py

### SQS

* Make sure to turn debug mode off in Django to not leak url and use amqp
* SQS doesn’t yet support worker remote control command or events, celery events, celerymon, or Django Admin monitor

```python
from kombu.utils.url import safequote

aws_access_key = safequote("ABCDEFGHIJKLMNOPQRST")
aws_secret_key = safequote("ZYXK7NiynG/TogH8Nj+P9nlE73sq3")

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
```

#### Predefined Queues if no create/delete queues

```python
broker_transport_options = {
    'predefined_queues': {
        'my-q': {
            'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
            'access_key_id': 'xxx',
            'secret_access_key': 'xxx',
        }
    }
}
```

## Basics

* backend url can be db external url in form of `db+postgresql:user:password@url/db_name`

module\_name/celery.py

```python
from celery import Celery

app = Celery('module_name', broker=some_broker_url, backend='rpc://', include=["module_name.tasks"]) # name of current module, broker url, backend results storage, name of task modules

# Optional configuration, see the app user guide
app.conf.update(result_expires=3600)

if __name__ == '__main__':
    app.start()
```

module\_name/tasks.py

```python
from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def xsum(numbers):
    return sum(numbers)
```

Then run the worker in folder above:

```bash
celery -A module_name worker -l INFO
```

*run as background process using* [*daemon*](https://docs.celeryq.dev/en/stable/userguide/daemonizing.html#daemonizing) *in prod*

Then in seperate terminal python script inside the folder you can [schedule](https://docs.celeryq.dev/en/stable/userguide/calling.html#guide-calling) tasks that return a [result](https://docs.celeryq.dev/en/stable/reference/celery.result.html#module-celery.result):

```python
>> from proj.tasks import add
>> result = add.delay(4, 4) # look at original server for background logs
>> result.ready() #will return true/false
```

## Storage

[Results can be stored](https://docs.celeryq.dev/en/stable/userguide/tasks.html#task-result-backends) in Redis, SQLAlchemy DB, Mongo, RPC(transient messages sent back) etc; set in backend opt

#### SQLAlchemy

```
poetry add sqlalchemy, psycopg2-binary
```

* Just add SQLAlchemy connection string as backend and it will store task info in two new tables
* `db+postgresql://postgres:XXXXXXXXXXXXXXX@teachingassistant.aaaaaaaaa.us-west-2.rds.amazonaws.com:5432`, note the db+

## Advanced

* Tasks can [scheduled](https://github.com/jsfuentes/Code-Cheatsheet/blob/master/All/Python/packages/]\(https:/docs.celeryq.dev/en/stable/userguide/calling.html#guide-calling\)) with with start time, state updates listener, and task chaining/linking
* `result.get(timeout=1)` can wait for result making sync
* `add(4, 5)` will just run it in the current process
* Can do complex workflows with primitives [group](https://docs.celeryq.dev/en/stable/userguide/canvas.html#canvas-group), [chain](https://docs.celeryq.dev/en/stable/userguide/canvas.html#canvas-chain), [chord](https://docs.celeryq.dev/en/stable/userguide/canvas.html#canvas-chord), [map](https://docs.celeryq.dev/en/stable/userguide/canvas.html#canvas-map), [starmap](https://docs.celeryq.dev/en/stable/userguide/canvas.html#canvas-map), [chunks](https://docs.celeryq.dev/en/stable/userguide/canvas.html#canvas-chunks)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://openai.gitbook.io/code-cheatsheets/all/python/packages/celery.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
