Python Celery Redis: los 8 pasos clave para pasar de cero a operativo
Python Celery Redis : lo esencial en un artículo — código real, diagramas y pasos concretos, extractos de un curso de 24 lecciones.
Todo el mundo puede aprender Python Celery Redis — siempre que siga los pasos en el orden correcto. Hemos condensado un curso completo de 24 lecciones en un recorrido claro, con los extractos de código más útiles.
tl;dr
- ¿Por qué Celery
- Conceptos esenciales de Redis
- Configuración de Celery
- Tareas y flujos de trabajo
- Celery Beat
~$ cat ./parcours.md # Python Celery Redis — 8 capítulos
01
Por qué Celery
→ Límites del síncrono→ Arquitectura Broker / Worker / Backend+ 1 más lecciones
02
Redis essentials
→ Tipos de datos Redis→ Pub/Sub y Streams+ 1 más lecciones
03
Celery setup
→ Primer worker y primera task→ Configuración y best practices+ 1 más lecciones
04
Tareas y flujos de trabajo
→ Reintentos y timeouts→ Chain, Group, Chord+ 1 más lecciones
05
Celery Beat
→ Tareas periódicas→ Crontab schedules+ 1 más lecciones
06
Monitoring
→ Flower dashboard→ Logs estructurados y Sentry+ 1 más lecciones
07
Patrones avanzados
→ Rate limiting y cuotas→ Idempotencia y deduplicación+ 1 más lecciones
08
Proyecto final pipeline ML email
→ Proyecto - Especificaciones→ Implementación de las tasks+ 1 más lecciones
🏁
Proyecto final
→ Te vas con un proyecto concreto y demostrable
Programación dinámica mediante DB
NOTEObjetivo — Permitir que los usuarios/admins modifiquen las programaciones sin volver a desplegar.
Problema del beat_schedule en código
NOTESi quieres añadir o modificar una programación en producción, es necesario volver a desplegar. Para tareas definidas por el usuario (newsletters, informes de clientes), se necesita un programador dinámico.
django-celery-beat (recomendado)
pip install django-celery-beat # settings.py INSTALLED_APPS = [..., "django_celery_beat"] CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler" # Ejecutar migraciones python manage.py migrate # Beat lee la programación desde la DB cada 5s celery -A myproject beat -l info
Tablas creadas
Crear una programación mediante Python
from django_celery_beat.models import CrontabSchedule, PeriodicTask # 1. Definir el temporizador schedule, _ = CrontabSchedule.objects.get_or_create( minute="30", hour="9", day_of_week="mon-fri", day_of_month="*", month_of_year="*", timezone="Europe/Paris" ) # 2. Crear la tarea periódica PeriodicTask.objects.create( crontab=schedule, name="Newsletter Acme Corp", task="app.tasks.send_newsletter", args=json.dumps(["acme_corp"]), kwargs=json.dumps({"template": "weekly"}), enabled=True, one_off=False, )
API admin mediante Django admin
# El admin de Django ya expone los modelos # Puedes ir directamente a /admin/django_celery_beat/periodictask/ # para añadir, editar o desactivar mediante la interfaz
API REST personalizada
@app.post("/schedules") def create_schedule(data: ScheduleCreate, db: Session = Depends(get_db)): schedule = CrontabSchedule.objects.create( minute=data.minute, hour=data.hour, ... ) PeriodicTask.objects.create( name=data.name, crontab=schedule, task=data.task_name, args=json.dumps(data.args) ) return {"status": "created"} @app.delete("/schedules/{name}") def delete_schedule(name: str): PeriodicTask.objects.filter(name=name).delete()
Sin Django: celery-redbeat
pip install celery-redbeat # celery_app.py celery_app.conf.beat_scheduler = "redbeat.RedBeatScheduler" celery_app.conf.redbeat_redis_url = "redis://redis:6379/2" # Crear una programación from redbeat import RedBeatSchedulerEntry from celery.schedules import crontab entry = RedBeatSchedulerEntry( name="my-task", task="app.tasks.my_task", schedule=crontab(hour=9, minute=0), app=celery_app, args=["arg1"] ) entry.save() # Eliminar entry.delete()
Caso de uso: newsletters multi-tenant
# Modelo DB class Newsletter(models.Model): name = models.CharField(max_length=200) schedule_cron = models.CharField(max_length=100) # "0 9 * * mon" enabled = models.BooleanField(default=True) def save(self, *args, **kwargs): super().save(*args, **kwargs) self.sync_celery_beat() def sync_celery_beat(self): m, h, dom, mon, dow = self.schedule_cron.split() schedule, _ = CrontabSchedule.objects.get_or_create( minute=m, hour=h, day_of_month=dom, month_of_year=mon, day_of_week=dow ) PeriodicTask.objects.update_or_create( name=f"newsletter_{self.id}", defaults={ "crontab": schedule, "task": "app.tasks.send_newsletter", "args": json.dumps([self.id]), "enabled": self.enabled } )
Proyecto - Pliego de condiciones
NOTEMisión — Construir un sistema de recomendación de productos enviado por email a los clientes cada semana.
Especificaciones
NOTEFuncional:
- Para cada usuario activo: recuperar el historial de compras
- Generar 5 recomendaciones mediante ML (similitud del coseno o modelo más avanzado)
- Componer un email HTML personalizado
- Enviar mediante SendGrid con reintentos
- Registrar envíos, aperturas y clics
- Frecuencia: semanal, lunes a las 9h
- Escala: más de 100k usuarios
- Idempotencia: sin envíos duplicados
- Límite de tasa de SendGrid: 100 emails/segundo
- Recuperación ante errores sin volver a ejecutar todo
Arquitectura objetivo
+---------------+
| Celery Beat | cron mon 9h
| (1 instancia) |
+-------+-------+
|
v
+---------------+ +-----------+
| dispatch_ |---> | Redis |
| weekly_emails | | broker |
| (1 tarea) | +-----+-----+
+---------------+ |
v
+------------+------------+
| |
+-------v------+ +--------v-------+
| Worker emails| | Worker ML |
| (10 procs) | | (2 procs, GPU) |
+-------+------+ +--------+-------+
| |
v v
+-------+------+ +--------+-------+
| SendGrid | | Modelo Recomendación|
| (rate limit) | | + DB histórico|
+--------------+ +----------------+
|
v
+-------+------+
| Webhook |
| open/click |
+--------------+Estructura del proyecto
recommender/ ├── docker-compose.yml ├── Dockerfile ├── requirements.txt ├── app/ │ ├── celery_app.py │ ├── config.py │ ├── tasks/ │ │ ├── orchestrator.py # dispatch_weekly_emails │ │ ├── ml.py # compute_recommendations │ │ ├── email.py # render + send │ │ └── tracking.py # webhooks │ ├── models/ # SQLAlchemy │ ├── ml/ │ │ └── recommender.py │ └── templates/ │ └── weekly.html └── tests/
Modelos DB
class User(Base): id: Mapped[int] = mapped_column(primary_key=True) email: Mapped[str] active: Mapped[bool] = mapped_column(default=True) last_recos_sent: Mapped[datetime | None] class Product(Base): id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] category: Mapped[str] embedding: Mapped[list[float]] # vector para ML class Order(Base): id: Mapped[int] = mapped_column(primary_key=True) user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) product_id: Mapped[int] = mapped_column(ForeignKey("products.id")) created_at: Mapped[datetime] class EmailJob(Base): """Seguimiento de envíos para idempotencia""" id: Mapped[int] = mapped_column(primary_key=True) user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) week_key: Mapped[str] # "2026-W21" status: Mapped[str] # pending|sent|failed|opened|clicked sendgrid_msg_id: Mapped[str | None] created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) __table_args__ = (UniqueConstraint("user_id", "week_key"),)
Configuración multi-cola
celery_app.conf.task_queues = (
Queue("orchestrator"),
Queue("ml"),
Queue("emails"),
Queue("tracking"),
)
celery_app.conf.task_routes = {
"app.tasks.orchestrator.*": {"queue": "orchestrator"},
"app.tasks.ml.*": {"queue": "ml"},
"app.tasks.email.*": {"queue": "emails"},
"app.tasks.tracking.*": {"queue": "tracking"},
}
celery_app.conf.beat_schedule = {
"weekly-recos": {
"task": "app.tasks.orchestrator.dispatch_weekly_emails",
"schedule": crontab(hour=9, minute=0, day_of_week="mon")
}
}SLOs objetivo
NOTE
- Todos los emails enviados en < 2h para 100k usuarios
- Tasa de error final < 0.5%
- Latencia del webhook de seguimiento < 1s
- Resiliencia: si SendGrid cae 10min, reanudación automática
Resumen
NOTEPara recordar
- Separar ML (pesado), emails (caudal), orchestrator (control)
- EmailJob con UniqueConstraint = idempotencia
- Beat para activación semanal
- Seguimiento mediante webhook de SendGrid
Persistencia y expulsión
NOTEObjetivo — Configurar la persistencia y la expulsión de memoria en producción.
RDB: instantáneas
# redis.conf save 900 1 # instantánea si >=1 modificación en 15min save 300 10 # instantánea si >=10 modificaciones en 5min save 60 10000 # instantánea si >=10000 modificaciones en 1min dbfilename dump.rdb dir /var/lib/redis
NOTERDB: instantánea binaria compacta, restauración rápida. Pero puede perder algunos minutos en caso de caída.
AOF: registro solo de anexos
# redis.conf appendonly yes appendfilename "appendonly.aof" appendfsync always # lento pero 0 pérdida appendfsync everysec # predeterminado: 0-1s de pérdida appendfsync no # decide el SO auto-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb
NOTEAOF: cada escritura se registra. Más garantías, pero ocupa más espacio en disco.
Híbrido: RDB + AOF
# Recomendado en producción: ambos activos save 900 1 appendonly yes appendfsync everysec # Al reiniciar: Redis carga AOF (más actualizado)
maxmemory y expulsión
# Límite de memoria maxmemory 2gb # Política cuando está lleno maxmemory-policy allkeys-lru # elimina las menos usadas recientemente (caché)
Políticas de expulsión
| Política | Comportamiento |
|---|---|
| noeviction | Rechaza las escrituras (predeterminado) |
| allkeys-lru | Elimina LRU en todas las claves |
| volatile-lru | LRU en claves con TTL |
| allkeys-lfu | Menos frecuentemente utilizadas |
| allkeys-random | Aleatorio |
| volatile-ttl | Clave con TTL más corta primero |
Copias de seguridad
# Instantánea manual redis-cli BGSAVE # asíncrono redis-cli SAVE # síncrono (¡bloqueante!) # Copiar el volcado cp /var/lib/redis/dump.rdb /backup/dump-$(date +%F).rdb # Restaurar systemctl stop redis cp /backup/dump-2026-05-27.rdb /var/lib/redis/dump.rdb systemctl start redis
Replicación maestro-réplica
# En la réplica replicaof master.redis.local 6379 # Verificar redis-cli INFO replication # role:slave # master_link_status:up # Conmutación por error automática: Redis Sentinel # Fragmentación: Redis Cluster
va-plus-loin
Este artículo cubre los extractos más útiles — el curso completo Python Celery Redis (8 capítulos, 24 lecciones, ejercicios corregidos y proyecto final) te lleva hasta el final.
./acceder-al-curso-completo curso gratuito: Vibe CodingFAQ
¿Cuánto tiempo se tarda en aprender Python Celery Redis?
Con una progresión estructurada (8 capítulos, 24 lecciones cortas y prácticas), se alcanza un nivel operativo en unas semanas dedicando entre 30 y 60 minutos al día. Lo importante es practicar cada concepto de inmediato.
¿Se necesitan requisitos previos?
Basta con nociones básicas de informática. Si sabes usar un terminal y leer código sencillo, estás listo.
¿Por dónde empezar de forma concreta?
Reproduce los comandos de este artículo y sigue el curso completo Python Celery Redis: encadena las 24 lecciones en orden, con ejercicios y proyecto final.
./a-lire-aussi
→ Lánzate con Portfolio IA SEO Vercel: tu primer paso concreto hoy→ IA Stripe GitHub SaaS en la práctica: el código y los comandos que realmente importan→ Python Requests APIs explicado de forma sencilla (con diagramas y código real)📬 ¿Quieres recibir este tipo de guía cada semana? Suscríbete gratis — código real, sin rodeos.