2025/01/17 | AI
with openai

Buscar

Celery, Redis, Celery Beat y Celery Results

Descripción de la imagen

En el desarrollo de aplicaciones modernas, la capacidad de realizar tareas en segundo plano y manejar procesos asíncronos es fundamental para garantizar un rendimiento óptimo y una experiencia de usuario fluida. Aquí es donde entra en juego Celery, una poderosa herramienta que permite gestionar tareas asíncronas y programadas en aplicaciones basadas en Python. En combinación con Redis como broker de mensajes, Celery Beat para la programación de tareas periódicas, y Celery Results para almacenar resultados, puedes crear una solución robusta y eficiente para manejar cargas de trabajo complejas.

En este artículo, te guiaremos paso a paso a través del proceso de instalación y configuración de este stack. Desde la instalación inicial de Redis, pasando por la configuración básica de Celery en un proyecto Django, hasta la implementación de tareas periódicas con Celery Beat y el almacenamiento de resultados con Celery Results. Al final, tendrás una infraestructura sólida para gestionar tareas asíncronas que puedes aplicar en una amplia gama de proyectos.

 

 

Para tareas asíncronas o en segundo plano:

  • celery[redis]
  • redis

Para tarea periódicas:

  • django_celery_beat

Para guardar resultado de tareas en base de datos:

  • django_celery_results

 

Para visualizar los logs:

celery -A retegi worker --loglevel=info

 

 

Habrá mínimo 3 terminales en marcha tras la configuración:

  • Django:
python3 manage.py runserver
  • Celery Worker:
celery -A retegi worker --loglevel=info
  • Celery Beat
celery -A retegi beat --loglevel=info

 


Tareas en segundo plano o asíncronsa con Celery + Redis:

Instala celery[redis] y redis (no hay que añadir nada en installed_apps)

pip install celery[redis]
pip install redis

 

A la altura de settings.py añade un archivo llamado celery.py con el siguiente código (en vez de myproject, el nombre de tu proyecto):

# myproject/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Establece la configuración predeterminada de Django para Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# Lee la configuración de Celery desde el archivo settings.py
# Todas las variables que comiencen con 'CELERY_' serán consideradas.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Auto-detecta tareas en los módulos `tasks.py` de las aplicaciones instaladas.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

 

Modifica el archivo __init__.py añadiendo este código. En mi caso tengo la app home dentro de applications, por lo que es applications/home/__init__.py el que tengo que modificar (no hay que modificar el código):

# myproject/__init__.py
from __future__ import absolute_import, unicode_literals

# Importa Celery cuando se inicialice Django
from .celery import app as celery_app

__all__ = ('celery_app',)

 

En settings.py añade estas cuatro variables:

# Configuración de Redis como broker
CELERY_BROKER_URL = 'redis://localhost:6379/0'

# Opcional: Backend para almacenar resultados (también en Redis)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# Otros ajustes (opcional)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

 

Comprobar configuración:

python manage.py check

 

Si el resultado es correcto deberá devolver lo siguiente:

System check identified no issues (0 silenced).

 

Crea una tarea añadiendo un archivo task.py en junto a views.py models.py de nuestra app. En mi caso dentro de applications/home/ quedando applications/home/tasks.py

Ejemplo de tarea:

# app_name/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

 

Podemos llamar a la tarea:

# En cualquier archivo de Django (views.py, etc.)
from app_name.tasks import add

# Llama a la tarea de forma asíncrona
add.delay(4, 6)  # Esto se ejecutará en segundo plano

 

Se puede llamar a la tarea desde una vista:

# applications/home/views.py
from django.http import JsonResponse
from .tasks import example_task

def trigger_task(request):
    result = example_task.delay(4, 6)  # Ejecuta la tarea en segundo plano
    return JsonResponse({'task_id': result.id, 'status': 'Task triggered!'})

 

O ejecutar tras un evento, como guardar un modelo, se puede usar señales de Django y llamar a la tarea:

# applications/home/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from .models import YourModel
from .tasks import example_task

@receiver(post_save, sender=YourModel)
def execute_task_after_save(sender, instance, **kwargs):
    example_task.delay(instance.some_field, 10)  # Envía datos relacionados con el modelo

 

Así queda la estructura de archivos:

RETEGI/
├── applications/
│   └── home/
│       ├── migrations/
│       ├── __init__.py
│       ├── admin.py
│       ├── apps.py
│       ├── forms.py
│       ├── models.py
│       ├── tasks.py       <-- Aquí defines tus tareas
│       ├── tests.py
│       ├── urls.py
│       ├── views.py       <-- Aquí puedes llamar a tus tareas
│       ├── signals.py     <-- Opcional: Llamar tareas desde señales
├── retegi/
│   ├── __init__.py
│   ├── asgi.py
│   ├── celery.py          <-- Configuración de Celery
│   ├── settings.py
│   ├── urls.py
│   ├── views.py
│   ├── wsgi.py
├── manage.py

 

Inicia Celery primero asegúrate de que Redis esté ejecutándose:

redis-server

 

Ejecuta el trabajador Celery:

celery -A myproject worker --loglevel=info

 

Probar la tarea:

python manage.py shell

 

Luego:

from applications.home.tasks import example_task
result = example_task.delay(4, 6)  # Llama a la tarea
print(result.id)  # Muestra el ID de la tarea

 

Debe aparecer algo así:

[INFO/MainProcess] Connected to redis://localhost:6379/0
[INFO/MainProcess] mingle: searching for neighbors
[INFO/MainProcess] mingle: all alone

 

En urls.py añade una url:

path('trigger-task/', views.trigger_task, name='trigger_task'),

 

En el navegador:

http://localhost:8000/trigger-task/

 

Resultado del navegador:

 

¡Finalizado!

Ahora is se quiere programar tarea, se puede instalar y utilizar django_celery_beat


Programación de tareas con CELERY BEAT:

Instala celery_beat:

pip install django-celery-beat

 

Añade a INSTALLED_APPS:

INSTALLED_APPS += [
    'django_celery_beat',
]

 

Migra:

python manage.py migrate

 

Inicia Celery Beat:

celery -A retegi beat --loglevel=info

 

Registrar tareas periódicas. Hay 2 maneras:

  • Desde panel administrados
  • Desde código

 

Desde el panel de administrador (en vez de desde código) > Periodic Tasks:

  • Interval Schedule:
    • Crea un intervalo en el que deseas que se ejecute la tarea (por ejemplo cada 10 segundos)
  • Periodic Task:
    • Crea una nueva tarea periódica seleccionando una tarea registrada (por ejemplo applications.home.task.example_task).
    • Asigna el intervalo creado anteriomente

Desde código (en vez de desde panel de administrador):

from django_celery_beat.models import PeriodicTask, IntervalSchedule
import json

# Crear o recuperar un intervalo
schedule, created = IntervalSchedule.objects.get_or_create(
    every=10,  # Intervalo de tiempo (por ejemplo, 10 segundos)
    period=IntervalSchedule.SECONDS,  # Unidades: SECONDS, MINUTES, HOURS, DAYS, WEEKS
)

# Registrar una tarea periódica
PeriodicTask.objects.create(
    interval=schedule,  # Usa el intervalo creado
    name='Tarea periódica de ejemplo',  # Nombre único
    task='applications.home.tasks.example_task',  # Nombre completo de la tarea
    args=json.dumps([4, 6]),  # Argumentos opcionales
)

 

Verificar funcionamiento. Asegúrate de que tanto el trabajador Celery como Celery Beat están ejecutándose:

  • Worker:
celery -A retegi worker --loglevel=info

 

  • Celery Beat:
celery -A retegi beat --loglevel=info

 

Para revisar logs de Celery Beat:

[INFO/MainProcess] Scheduler: Sending due task Tarea periódica de ejemplo (applications.home.tasks.example_task)

 

Debería aparecer algo así:

[INFO/MainProcess] Received task: applications.home.tasks.example_task[<task_id>]
[INFO/ForkPoolWorker-1] Task applications.home.tasks.example_task[<task_id>] succeeded in 0.0012s: 10

 

Se observa que se realiza la tarea cada minuto (13:58, 13:59, etc)

 


Guardar resultados en base de datos con Celery Results:

 

 

 

 

 

Comentarios