# Celery

***

**Background**

Celery is a distributed task queue that allows you to run tasks asynchronously in a distributed manner. It is often used to improve the performance and scalability of applications by offloading long-running or resource-intensive tasks to a separate worker process.

**1. Simple Task Scheduling**

```python
from celery import Celery

app = Celery('tasks')

@app.task
def add(x, y):
    return x + y

add.delay(10, 20)
```

This schedules the `add` task to be executed asynchronously. The `delay` method returns a task object that you can use to check its status or retrieve its result.

**2. Periodic Tasks**

```python
from celery.schedules import crontab

@app.task
def send_email():
    # Send an email

app.conf.beat_schedule = {
    'send-email': {
        'task': 'tasks.send_email',
        'schedule': crontab(hour=10, minute=0),
    }
}
```

This creates a periodic task that will run the `send_email` task every day at 10:00 AM.

**3. Chaining Tasks**

```python
@app.task
def task1():
    return 'task1'

@app.task
def task2(x):
    return x + 'task2'

task2.s(task1.delay()).delay()
```

This chains the `task1` and `task2` tasks together. The `task2` task will be executed with the result of the `task1` task as input.

**4. Retrying Tasks**

```python
@app.task(retry_backoff=True, retry_jitter=True)
def task():
    raise Exception('Error occurred')

task.delay()
```

This configures the `task` to be retried automatically if it fails. The `retry_backoff` and `retry_jitter` options control the delay between retries and the variation in retry delay, respectively.

**5. Limiting Concurrency**

```python
@app.task(max_retries=5, soft_time_limit=300)
def task():
    # Run a long-running task

task.delay()
```

This limits the number of concurrent executions of the `task` to 5 and terminates the task if it takes longer than 5 minutes to complete.

**6. Task Groups**

```python
from celery import group

tasks = group(add.s(x, y) for x in range(10) for y in range(10))
result = tasks.delay()

for task in result:
    print(task.get())
```

This creates a group of tasks that are executed in parallel. The `result` object can be used to retrieve the results of all the tasks in the group.

**7. Task Chords**

```python
from celery import chord

add_tasks = [add.s(x, y) for x in range(10) for y in range(10)]
result = chord(add_tasks)(sum.s())

print(result.get())
```

This creates a chord that depends on the completion of the `add_tasks` tasks. The `result` object can be used to retrieve the final result of the chord, which is the sum of the results of the `add_tasks`.

**8. Queues and Routing**

```
app = Celery('tasks', broker='amqp://guest:guest@localhost:5672/', backend='redis://localhost:6379')

@app.task(queue='high-priority')
def task():
    # Run a high-priority task

@app.task(queue='low-priority')
def task():
    # Run a low-priority task
```

This creates two different queues, `high-priority` and `low-priority`. The tasks are routed to the appropriate queue based on their queue configuration.

**9. Custom Task Classes**

```python
from celery import Task

class MyTask(Task):
    def run(self, x, y):
        return x + y

my_task = MyTask.delay(10, 20)
```

This creates a custom task class that inherits from the `Celery.Task` class. You can override the `run` method to customize the behavior of the task.

**10. Custom Serializer**

```python
from celery import Serializer

class MySerializer(Serializer):
    def dumps(self, task_data):
        # Custom encoding logic

    def loads(self, task_data):
        # Custom decoding logic

app = Celery('tasks', serializer='tasks.MySerializer')
```

This creates a custom serializer that serializes and deserializes task data. You can use this to customize the way task data is stored and transmitted.
