mirror of
https://github.com/aljazceru/Tutorial-Codebase-Knowledge.git
synced 2025-12-19 07:24:20 +01:00
252 lines
14 KiB
Markdown
252 lines
14 KiB
Markdown
---
|
||
layout: default
|
||
title: "Task"
|
||
parent: "Celery"
|
||
nav_order: 3
|
||
---
|
||
|
||
# Chapter 3: Task - The Job Description
|
||
|
||
In [Chapter 1: The Celery App](01_celery_app.md), we set up our Celery headquarters, and in [Chapter 2: Configuration](02_configuration.md), we learned how to give it instructions. Now, we need to define the *actual work* we want Celery to do. This is where **Tasks** come in.
|
||
|
||
## What Problem Does a Task Solve?
|
||
|
||
Imagine you have a specific job that needs doing, like "Resize this image to thumbnail size" or "Send a welcome email to this new user." In Celery, each of these specific jobs is represented by a **Task**.
|
||
|
||
A Task is like a **job description** or a **recipe**. It contains the exact steps (the code) needed to complete a specific piece of work. You write this recipe once as a Python function, and then you can tell Celery to follow that recipe whenever you need that job done, potentially many times with different inputs (like resizing different images or sending emails to different users).
|
||
|
||
The key benefit is that you don't run the recipe immediately yourself. You hand the recipe (the Task) and the ingredients (the arguments, like the image file or the user's email) over to Celery. Celery then finds an available helper (a [Worker](05_worker.md)) who knows how to follow that specific recipe and lets them do the work in the background. This keeps your main application free to do other things.
|
||
|
||
## Defining Your First Task
|
||
|
||
Defining a task in Celery is surprisingly simple. You just take a regular Python function and "decorate" it using `@app.task`. Remember our `app` object from [Chapter 1](01_celery_app.md)? We use its `task` decorator.
|
||
|
||
Let's create a file, perhaps named `tasks.py`, to hold our task definitions:
|
||
|
||
```python
|
||
# tasks.py
|
||
import time
|
||
from celery_app import app # Import the app instance we created
|
||
|
||
@app.task
|
||
def add(x, y):
|
||
"""A simple task that adds two numbers."""
|
||
print(f"Task 'add' starting with ({x}, {y})")
|
||
# Simulate some work taking time
|
||
time.sleep(5)
|
||
result = x + y
|
||
print(f"Task 'add' finished with result: {result}")
|
||
return result
|
||
|
||
@app.task
|
||
def send_welcome_email(user_id):
|
||
"""A task simulating sending a welcome email."""
|
||
print(f"Task 'send_welcome_email' starting for user {user_id}")
|
||
# Simulate email sending process
|
||
time.sleep(3)
|
||
print(f"Welcome email supposedly sent to user {user_id}")
|
||
return f"Email sent to {user_id}"
|
||
|
||
# You can have many tasks in one file!
|
||
```
|
||
|
||
**Explanation:**
|
||
|
||
1. **`from celery_app import app`**: We import the `Celery` app instance we configured earlier. This instance holds the knowledge about our broker and backend.
|
||
2. **`@app.task`**: This is the magic decorator! When Celery sees this above a function (`add` or `send_welcome_email`), it says, "Ah! This isn't just a regular function; it's a job description that my workers need to know about."
|
||
3. **The Function (`add`, `send_welcome_email`)**: This is the actual Python code that performs the work. It's the core of the task – the steps in the recipe. It can take arguments (like `x`, `y`, or `user_id`) and can return a value.
|
||
4. **Registration**: The `@app.task` decorator automatically *registers* this function with our Celery `app`. Now, `app` knows about a task named `tasks.add` and another named `tasks.send_welcome_email` (Celery creates the name from `module_name.function_name`). Workers connected to this `app` will be able to find and execute this code when requested.
|
||
|
||
*Self-Host Note:* If you are running this code, make sure you have a `celery_app.py` file containing your Celery app instance as shown in previous chapters, and that the `tasks.py` file can import `app` from it.
|
||
|
||
## Sending a Task for Execution
|
||
|
||
Okay, we've written our recipes (`add` and `send_welcome_email`). How do we tell Celery, "Please run the `add` recipe with the numbers 5 and 7"?
|
||
|
||
We **don't call the function directly** like `add(5, 7)`. If we did that, it would just run immediately in our current program, which defeats the purpose of using Celery!
|
||
|
||
Instead, we use special methods on the task object itself, most commonly `.delay()` or `.apply_async()`.
|
||
|
||
Let's try this in a separate Python script or an interactive Python session:
|
||
|
||
```python
|
||
# run_tasks.py
|
||
from tasks import add, send_welcome_email
|
||
|
||
print("Let's send some tasks!")
|
||
|
||
# --- Using .delay() ---
|
||
# Tell Celery to run add(5, 7) in the background
|
||
result_promise_add = add.delay(5, 7)
|
||
print(f"Sent task add(5, 7). Task ID: {result_promise_add.id}")
|
||
|
||
# Tell Celery to run send_welcome_email(123) in the background
|
||
result_promise_email = send_welcome_email.delay(123)
|
||
print(f"Sent task send_welcome_email(123). Task ID: {result_promise_email.id}")
|
||
|
||
|
||
# --- Using .apply_async() ---
|
||
# Does the same thing as .delay() but allows more options
|
||
result_promise_add_later = add.apply_async(args=(10, 20), countdown=10) # Run after 10s
|
||
print(f"Sent task add(10, 20) to run in 10s. Task ID: {result_promise_add_later.id}")
|
||
|
||
print("Tasks have been sent to the broker!")
|
||
print("A Celery worker needs to be running to pick them up.")
|
||
```
|
||
|
||
**Explanation:**
|
||
|
||
1. **`from tasks import add, send_welcome_email`**: We import our *task functions*. Because they were decorated with `@app.task`, they are now special Celery Task objects.
|
||
2. **`add.delay(5, 7)`**: This is the simplest way to send a task.
|
||
* It *doesn't* run `add(5, 7)` right now.
|
||
* It takes the arguments `(5, 7)`.
|
||
* It packages them up into a **message** along with the task's name (`tasks.add`).
|
||
* It sends this message to the **message broker** (like Redis or RabbitMQ) that we configured in our `celery_app.py`. Think of it like dropping a request slip into a mailbox.
|
||
3. **`send_welcome_email.delay(123)`**: Same idea, but for our email task. A message with `tasks.send_welcome_email` and the argument `123` is sent to the broker.
|
||
4. **`add.apply_async(args=(10, 20), countdown=10)`**: This is a more powerful way to send tasks.
|
||
* It does the same fundamental thing: sends a message to the broker.
|
||
* It allows for more options, like `args` (positional arguments as a tuple), `kwargs` (keyword arguments as a dict), `countdown` (delay execution by seconds), `eta` (run at a specific future time), and many others.
|
||
* `.delay(*args, **kwargs)` is just a convenient shortcut for `.apply_async(args=args, kwargs=kwargs)`.
|
||
5. **`result_promise_... = ...`**: Both `.delay()` and `apply_async()` return an `AsyncResult` object immediately. This is *not* the actual result of the task (like `12` for `add(5, 7)`). It's more like a receipt or a tracking number (notice the `.id` attribute). You can use this object later to check if the task finished and what its result was, but only if you've set up a [Result Backend](06_result_backend.md) (Chapter 6).
|
||
6. **The Worker**: Sending the task only puts the message on the queue. A separate process, the Celery [Worker](05_worker.md) (Chapter 5), needs to be running. The worker constantly watches the queue, picks up messages, finds the corresponding task function (using the name like `tasks.add`), and executes it with the provided arguments.
|
||
|
||
## How It Works Internally (Simplified)
|
||
|
||
Let's trace the journey of defining and sending our `add` task:
|
||
|
||
1. **Definition (`@app.task` in `tasks.py`)**:
|
||
* Python defines the `add` function.
|
||
* The `@app.task` decorator sees this function.
|
||
* It tells the `Celery` instance (`app`) about this function, registering it under the name `tasks.add` in an internal dictionary (`app.tasks`). The `app` instance knows the broker/backend settings.
|
||
2. **Sending (`add.delay(5, 7)` in `run_tasks.py`)**:
|
||
* You call `.delay()` on the `add` task object.
|
||
* `.delay()` (or `.apply_async()`) internally uses the `app` the task is bound to.
|
||
* It asks the `app` for the configured broker URL.
|
||
* It creates a message containing:
|
||
* Task Name: `tasks.add`
|
||
* Arguments: `(5, 7)`
|
||
* Other options (like a unique Task ID).
|
||
* It connects to the **Broker** (e.g., Redis) using the broker URL.
|
||
* It sends the message to a specific queue (usually named 'celery' by default) on the broker.
|
||
* It returns an `AsyncResult` object referencing the Task ID.
|
||
3. **Waiting**: The message sits in the queue on the broker, waiting.
|
||
4. **Execution (by a [Worker](05_worker.md))**:
|
||
* A separate Celery Worker process is running, connected to the same broker and `app`.
|
||
* The Worker fetches the message from the queue.
|
||
* It reads the task name: `tasks.add`.
|
||
* It looks up `tasks.add` in its copy of the `app.tasks` registry to find the actual `add` function code.
|
||
* It calls the `add` function with the arguments from the message: `add(5, 7)`.
|
||
* The function runs (prints logs, sleeps, calculates `12`).
|
||
* If a [Result Backend](06_result_backend.md) is configured, the Worker takes the return value (`12`) and stores it in the backend, associated with the Task ID.
|
||
* The Worker acknowledges the message to the broker, removing it from the queue.
|
||
|
||
```mermaid
|
||
sequenceDiagram
|
||
participant Client as Your Code (run_tasks.py)
|
||
participant TaskDef as @app.task def add()
|
||
participant App as Celery App Instance
|
||
participant Broker as Message Broker (e.g., Redis)
|
||
participant Worker as Celery Worker (separate process)
|
||
|
||
Note over TaskDef, App: 1. @app.task registers 'add' function with App's task registry
|
||
|
||
Client->>TaskDef: 2. Call add.delay(5, 7)
|
||
TaskDef->>App: 3. Get broker config
|
||
App-->>TaskDef: Broker URL
|
||
TaskDef->>Broker: 4. Send message ('tasks.add', (5, 7), task_id, ...)
|
||
Broker-->>TaskDef: Ack (Message Queued)
|
||
TaskDef-->>Client: 5. Return AsyncResult(task_id)
|
||
|
||
Worker->>Broker: 6. Fetch next message
|
||
Broker-->>Worker: Message ('tasks.add', (5, 7), task_id)
|
||
Worker->>App: 7. Lookup 'tasks.add' in registry
|
||
App-->>Worker: add function code
|
||
Worker->>Worker: 8. Execute add(5, 7) -> returns 12
|
||
Note over Worker: (Optionally store result in Backend)
|
||
Worker->>Broker: 9. Acknowledge message completion
|
||
```
|
||
|
||
## Code Dive: Task Creation and Sending
|
||
|
||
* **Task Definition (`@app.task`)**: This decorator is defined in `celery/app/base.py` within the `Celery` class method `task`. It ultimately calls `_task_from_fun`.
|
||
|
||
```python
|
||
# Simplified from celery/app/base.py
|
||
class Celery:
|
||
# ...
|
||
def task(self, *args, **opts):
|
||
# ... handles decorator arguments ...
|
||
def _create_task_cls(fun):
|
||
# Returns a Task instance or a Proxy that creates one later
|
||
ret = self._task_from_fun(fun, **opts)
|
||
return ret
|
||
return _create_task_cls
|
||
|
||
def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
|
||
# Generate name like 'tasks.add' if not given
|
||
name = name or self.gen_task_name(fun.__name__, fun.__module__)
|
||
base = base or self.Task # The base Task class (from celery.app.task)
|
||
|
||
if name not in self._tasks: # If not already registered...
|
||
# Dynamically create a Task class wrapping the function
|
||
task = type(fun.__name__, (base,), {
|
||
'app': self, # Link task back to this app instance!
|
||
'name': name,
|
||
'run': staticmethod(fun), # The actual function to run
|
||
'__doc__': fun.__doc__,
|
||
'__module__': fun.__module__,
|
||
# ... other options ...
|
||
})() # Instantiate the new Task class
|
||
self._tasks[task.name] = task # Add to app's registry!
|
||
task.bind(self) # Perform binding steps
|
||
else:
|
||
task = self._tasks[name] # Task already exists
|
||
return task
|
||
```
|
||
This shows how the decorator essentially creates a specialized object (an instance of a class derived from `celery.app.task.Task`) that wraps your original function and registers it with the `app` under a specific name.
|
||
|
||
* **Task Sending (`.delay`)**: The `.delay()` method is defined on the `Task` class itself in `celery/app/task.py`. It's a simple shortcut.
|
||
|
||
```python
|
||
# Simplified from celery/app/task.py
|
||
class Task:
|
||
# ...
|
||
def delay(self, *args, **kwargs):
|
||
"""Shortcut for apply_async(args, kwargs)"""
|
||
return self.apply_async(args, kwargs)
|
||
|
||
def apply_async(self, args=None, kwargs=None, ..., **options):
|
||
# ... argument checking, option processing ...
|
||
|
||
# Get the app associated with this task instance
|
||
app = self._get_app()
|
||
|
||
# If always_eager is set, run locally instead of sending
|
||
if app.conf.task_always_eager:
|
||
return self.apply(args, kwargs, ...) # Runs inline
|
||
|
||
# The main path: tell the app to send the task message
|
||
return app.send_task(
|
||
self.name, args, kwargs, task_type=self,
|
||
**options # Includes things like countdown, eta, queue etc.
|
||
)
|
||
```
|
||
You can see how `.delay` just calls `.apply_async`, which then (usually) delegates the actual message sending to the `app.send_task` method we saw briefly in [Chapter 1](01_celery_app.md). The `app` uses its configuration to know *how* and *where* to send the message.
|
||
|
||
## Conclusion
|
||
|
||
You've learned the core concept of a Celery **Task**:
|
||
|
||
* It represents a single, well-defined **unit of work** or **job description**.
|
||
* You define a task by decorating a normal Python function with `@app.task`. This **registers** the task with your Celery application.
|
||
* You **send** a task request (not run it directly) using `.delay()` or `.apply_async()`.
|
||
* Sending a task puts a **message** onto a queue managed by a **message broker**.
|
||
* A separate **Worker** process picks up the message and executes the corresponding task function.
|
||
|
||
Tasks are the fundamental building blocks of work in Celery. Now that you know how to define a task and request its execution, let's look more closely at the crucial component that handles passing these requests around: the message broker.
|
||
|
||
**Next:** [Chapter 4: Broker Connection (AMQP)](04_broker_connection__amqp_.md)
|
||
|
||
---
|
||
|
||
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) |