Python Celery Redis: los 8 pasos clave para pasar de cero a operativo

Python Celery Redis : lo esencial en un artículo — código real, diagramas y pasos concretos, extractos de un curso de 24 lecciones.

Python Celery Redis: los 8 pasos clave para pasar de cero a operativo

Todo el mundo puede aprender Python Celery Redis — siempre que siga los pasos en el orden correcto. Hemos condensado un curso completo de 24 lecciones en un recorrido claro, con los extractos de código más útiles.

tl;dr
  • ¿Por qué Celery
  • Conceptos esenciales de Redis
  • Configuración de Celery
  • Tareas y flujos de trabajo
  • Celery Beat
~$ cat ./parcours.md # Python Celery Redis — 8 capítulos
01
Por qué Celery
→ Límites del síncrono→ Arquitectura Broker / Worker / Backend+ 1 más lecciones
02
Redis essentials
→ Tipos de datos Redis→ Pub/Sub y Streams+ 1 más lecciones
03
Celery setup
→ Primer worker y primera task→ Configuración y best practices+ 1 más lecciones
04
Tareas y flujos de trabajo
→ Reintentos y timeouts→ Chain, Group, Chord+ 1 más lecciones
05
Celery Beat
→ Tareas periódicas→ Crontab schedules+ 1 más lecciones
06
Monitoring
→ Flower dashboard→ Logs estructurados y Sentry+ 1 más lecciones
07
Patrones avanzados
→ Rate limiting y cuotas→ Idempotencia y deduplicación+ 1 más lecciones
08
Proyecto final pipeline ML email
→ Proyecto - Especificaciones→ Implementación de las tasks+ 1 más lecciones
🏁
Proyecto final
→ Te vas con un proyecto concreto y demostrable

Programación dinámica mediante DB

NOTEObjetivo — Permitir que los usuarios/admins modifiquen las programaciones sin volver a desplegar.

Problema del beat_schedule en código

NOTESi quieres añadir o modificar una programación en producción, es necesario volver a desplegar. Para tareas definidas por el usuario (newsletters, informes de clientes), se necesita un programador dinámico.

django-celery-beat (recomendado)

output
pip install django-celery-beat

# settings.py
INSTALLED_APPS = [..., "django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"

# Ejecutar migraciones
python manage.py migrate

# Beat lee la programación desde la DB cada 5s
celery -A myproject beat -l info

Tablas creadas

Crear una programación mediante Python

output
from django_celery_beat.models import CrontabSchedule, PeriodicTask

# 1. Definir el temporizador
schedule, _ = CrontabSchedule.objects.get_or_create(
    minute="30",
    hour="9",
    day_of_week="mon-fri",
    day_of_month="*",
    month_of_year="*",
    timezone="Europe/Paris"
)

# 2. Crear la tarea periódica
PeriodicTask.objects.create(
    crontab=schedule,
    name="Newsletter Acme Corp",
    task="app.tasks.send_newsletter",
    args=json.dumps(["acme_corp"]),
    kwargs=json.dumps({"template": "weekly"}),
    enabled=True,
    one_off=False,
)

API admin mediante Django admin

output
# El admin de Django ya expone los modelos
# Puedes ir directamente a /admin/django_celery_beat/periodictask/
# para añadir, editar o desactivar mediante la interfaz

API REST personalizada

output
@app.post("/schedules")
def create_schedule(data: ScheduleCreate, db: Session = Depends(get_db)):
    schedule = CrontabSchedule.objects.create(
        minute=data.minute, hour=data.hour, ...
    )
    PeriodicTask.objects.create(
        name=data.name,
        crontab=schedule,
        task=data.task_name,
        args=json.dumps(data.args)
    )
    return {"status": "created"}

@app.delete("/schedules/{name}")
def delete_schedule(name: str):
    PeriodicTask.objects.filter(name=name).delete()

Sin Django: celery-redbeat

output
pip install celery-redbeat

# celery_app.py
celery_app.conf.beat_scheduler = "redbeat.RedBeatScheduler"
celery_app.conf.redbeat_redis_url = "redis://redis:6379/2"

# Crear una programación
from redbeat import RedBeatSchedulerEntry
from celery.schedules import crontab

entry = RedBeatSchedulerEntry(
    name="my-task",
    task="app.tasks.my_task",
    schedule=crontab(hour=9, minute=0),
    app=celery_app,
    args=["arg1"]
)
entry.save()

# Eliminar
entry.delete()

Caso de uso: newsletters multi-tenant

output
# Modelo DB
class Newsletter(models.Model):
    name = models.CharField(max_length=200)
    schedule_cron = models.CharField(max_length=100)        # "0 9 * * mon"
    enabled = models.BooleanField(default=True)
    
    def save(self, *args, **kwargs):
        super().save(*args, **kwargs)
        self.sync_celery_beat()
    
    def sync_celery_beat(self):
        m, h, dom, mon, dow = self.schedule_cron.split()
        schedule, _ = CrontabSchedule.objects.get_or_create(
            minute=m, hour=h, day_of_month=dom,
            month_of_year=mon, day_of_week=dow
        )
        PeriodicTask.objects.update_or_create(
            name=f"newsletter_{self.id}",
            defaults={
                "crontab": schedule,
                "task": "app.tasks.send_newsletter",
                "args": json.dumps([self.id]),
                "enabled": self.enabled
            }
        )

Proyecto - Pliego de condiciones

NOTEMisión — Construir un sistema de recomendación de productos enviado por email a los clientes cada semana.

Especificaciones

NOTEFuncional:
  • Para cada usuario activo: recuperar el historial de compras
  • Generar 5 recomendaciones mediante ML (similitud del coseno o modelo más avanzado)
  • Componer un email HTML personalizado
  • Enviar mediante SendGrid con reintentos
  • Registrar envíos, aperturas y clics
  • Frecuencia: semanal, lunes a las 9h
No funcional:
  • Escala: más de 100k usuarios
  • Idempotencia: sin envíos duplicados
  • Límite de tasa de SendGrid: 100 emails/segundo
  • Recuperación ante errores sin volver a ejecutar todo

Arquitectura objetivo

output
+---------------+
| Celery Beat   |  cron mon 9h
| (1 instancia) |
+-------+-------+
        |
        v
+---------------+      +-----------+
| dispatch_     |--->  | Redis     |
| weekly_emails |      | broker    |
| (1 tarea)     |      +-----+-----+
+---------------+            |
                             v
                +------------+------------+
                |                         |
        +-------v------+         +--------v-------+
        | Worker emails|         | Worker ML      |
        | (10 procs)   |         | (2 procs, GPU) |
        +-------+------+         +--------+-------+
                |                         |
                v                         v
        +-------+------+         +--------+-------+
        | SendGrid     |         | Modelo Recomendación|
        | (rate limit) |         | + DB histórico|
        +--------------+         +----------------+
                |
                v
        +-------+------+
        | Webhook      |
        | open/click   |
        +--------------+

Estructura del proyecto

output
recommender/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── app/
│   ├── celery_app.py
│   ├── config.py
│   ├── tasks/
│   │   ├── orchestrator.py      # dispatch_weekly_emails
│   │   ├── ml.py                # compute_recommendations
│   │   ├── email.py             # render + send
│   │   └── tracking.py          # webhooks
│   ├── models/                  # SQLAlchemy
│   ├── ml/
│   │   └── recommender.py
│   └── templates/
│       └── weekly.html
└── tests/

Modelos DB

output
class User(Base):
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str]
    active: Mapped[bool] = mapped_column(default=True)
    last_recos_sent: Mapped[datetime | None]

class Product(Base):
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]
    category: Mapped[str]
    embedding: Mapped[list[float]]    # vector para ML

class Order(Base):
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    product_id: Mapped[int] = mapped_column(ForeignKey("products.id"))
    created_at: Mapped[datetime]

class EmailJob(Base):
    """Seguimiento de envíos para idempotencia"""
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    week_key: Mapped[str]                  # "2026-W21"
    status: Mapped[str]                     # pending|sent|failed|opened|clicked
    sendgrid_msg_id: Mapped[str | None]
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    
    __table_args__ = (UniqueConstraint("user_id", "week_key"),)

Configuración multi-cola

output
celery_app.conf.task_queues = (
    Queue("orchestrator"),
    Queue("ml"),
    Queue("emails"),
    Queue("tracking"),
)

celery_app.conf.task_routes = {
    "app.tasks.orchestrator.*": {"queue": "orchestrator"},
    "app.tasks.ml.*": {"queue": "ml"},
    "app.tasks.email.*": {"queue": "emails"},
    "app.tasks.tracking.*": {"queue": "tracking"},
}

celery_app.conf.beat_schedule = {
    "weekly-recos": {
        "task": "app.tasks.orchestrator.dispatch_weekly_emails",
        "schedule": crontab(hour=9, minute=0, day_of_week="mon")
    }
}

SLOs objetivo

NOTE
  • Todos los emails enviados en < 2h para 100k usuarios
  • Tasa de error final < 0.5%
  • Latencia del webhook de seguimiento < 1s
  • Resiliencia: si SendGrid cae 10min, reanudación automática

Resumen

NOTEPara recordar
  • Separar ML (pesado), emails (caudal), orchestrator (control)
  • EmailJob con UniqueConstraint = idempotencia
  • Beat para activación semanal
  • Seguimiento mediante webhook de SendGrid

Persistencia y expulsión

NOTEObjetivo — Configurar la persistencia y la expulsión de memoria en producción.

RDB: instantáneas

output
# redis.conf
save 900 1          # instantánea si >=1 modificación en 15min
save 300 10         # instantánea si >=10 modificaciones en 5min
save 60 10000       # instantánea si >=10000 modificaciones en 1min

dbfilename dump.rdb
dir /var/lib/redis
NOTERDB: instantánea binaria compacta, restauración rápida. Pero puede perder algunos minutos en caso de caída.

AOF: registro solo de anexos

output
# redis.conf
appendonly yes
appendfilename "appendonly.aof"

appendfsync always       # lento pero 0 pérdida
appendfsync everysec     # predeterminado: 0-1s de pérdida
appendfsync no           # decide el SO

auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
NOTEAOF: cada escritura se registra. Más garantías, pero ocupa más espacio en disco.

Híbrido: RDB + AOF

output
# Recomendado en producción: ambos activos
save 900 1
appendonly yes
appendfsync everysec

# Al reiniciar: Redis carga AOF (más actualizado)

maxmemory y expulsión

output
# Límite de memoria
maxmemory 2gb

# Política cuando está lleno
maxmemory-policy allkeys-lru        # elimina las menos usadas recientemente (caché)

Políticas de expulsión

PolíticaComportamiento
noevictionRechaza las escrituras (predeterminado)
allkeys-lruElimina LRU en todas las claves
volatile-lruLRU en claves con TTL
allkeys-lfuMenos frecuentemente utilizadas
allkeys-randomAleatorio
volatile-ttlClave con TTL más corta primero

Copias de seguridad

output
# Instantánea manual
redis-cli BGSAVE              # asíncrono
redis-cli SAVE                # síncrono (¡bloqueante!)

# Copiar el volcado
cp /var/lib/redis/dump.rdb /backup/dump-$(date +%F).rdb

# Restaurar
systemctl stop redis
cp /backup/dump-2026-05-27.rdb /var/lib/redis/dump.rdb
systemctl start redis

Replicación maestro-réplica

output
# En la réplica
replicaof master.redis.local 6379

# Verificar
redis-cli INFO replication
# role:slave
# master_link_status:up

# Conmutación por error automática: Redis Sentinel
# Fragmentación: Redis Cluster
va-plus-loin

Este artículo cubre los extractos más útiles — el curso completo Python Celery Redis (8 capítulos, 24 lecciones, ejercicios corregidos y proyecto final) te lleva hasta el final.

./acceder-al-curso-completo curso gratuito: Vibe Coding

FAQ

¿Cuánto tiempo se tarda en aprender Python Celery Redis?
Con una progresión estructurada (8 capítulos, 24 lecciones cortas y prácticas), se alcanza un nivel operativo en unas semanas dedicando entre 30 y 60 minutos al día. Lo importante es practicar cada concepto de inmediato.
¿Se necesitan requisitos previos?
Basta con nociones básicas de informática. Si sabes usar un terminal y leer código sencillo, estás listo.
¿Por dónde empezar de forma concreta?
Reproduce los comandos de este artículo y sigue el curso completo Python Celery Redis: encadena las 24 lecciones en orden, con ejercicios y proyecto final.

📬 ¿Quieres recibir este tipo de guía cada semana? Suscríbete gratis — código real, sin rodeos.