Python Celery Redis: as 8 etapas-chave para passar do zero ao operacional

Python Celery Redis : o essencial em um artigo — código real, diagramas e etapas concretas, extraídos de um curso de 24 lições.

Python Celery Redis: as 8 etapas-chave para passar do zero ao operacional

Todo mundo pode aprender Python Celery Redis — desde que siga as etapas na ordem correta. Condensamos um curso completo de 24 lições em um percurso claro, com os trechos de código mais úteis.

tl;dr
  • Por que Celery
  • Essenciais do Redis
  • Configuração do Celery
  • Tarefas e workflows
  • Celery Beat
~$ cat ./parcours.md # Python Celery Redis — 8 capítulos
01
Por que Celery
→ Limites do síncrono→ Arquitetura Broker / Worker / Backend+ 1 mais lições
02
Redis essentials
→ Tipos de dados Redis→ Pub/Sub e Streams+ 1 mais lições
03
Celery setup
→ Primeiro worker e primeira task→ Configuração e best practices+ 1 mais lições
04
Tasks e workflows
→ Retries e timeouts→ Chain, Group, Chord+ 1 mais lições
05
Celery Beat
→ Tasks periódicas→ Crontab schedules+ 1 mais lições
06
Monitoring
→ Flower dashboard→ Logs estruturados e Sentry+ 1 mais lições
07
Patterns avançados
→ Rate limiting e quotas→ Idempotência e deduplicação+ 1 mais lições
08
Projeto final pipeline ML email
→ Projeto - Especificações→ Implementação das tasks+ 1 mais lições
🏁
Projeto final
→ Você sai com um projeto concreto e demonstrável

Agendamento dinâmico via DB

NOTEObjetivo — Permitir que usuários/admins modifiquem os agendamentos sem precisar de novo deploy.

Problema do beat_schedule no código

NOTESe quiser adicionar/modificar um agendamento em produção, é preciso fazer novo deploy. Para tasks definidas pelo usuário (newsletters, relatórios de clientes), é necessário um scheduler dinâmico.

django-celery-beat (recomendado)

output
pip install django-celery-beat

# settings.py
INSTALLED_APPS = [..., "django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"

# Executar migrações
python manage.py migrate

# Beat lê o schedule do banco a cada 5s
celery -A myproject beat -l info

Tabelas criadas

Criar um schedule via Python

output
from django_celery_beat.models import CrontabSchedule, PeriodicTask

# 1. Definir o timing
schedule, _ = CrontabSchedule.objects.get_or_create(
    minute="30",
    hour="9",
    day_of_week="mon-fri",
    day_of_month="*",
    month_of_year="*",
    timezone="Europe/Paris"
)

# 2. Criar a task periódica
PeriodicTask.objects.create(
    crontab=schedule,
    name="Newsletter Acme Corp",
    task="app.tasks.send_newsletter",
    args=json.dumps(["acme_corp"]),
    kwargs=json.dumps({"template": "weekly"}),
    enabled=True,
    one_off=False,
)

API admin via Django admin

output
# O admin do Django já expõe os modelos
# Você pode ir diretamente para /admin/django_celery_beat/periodictask/
# para adicionar, editar, desativar via UI

API REST personalizada

output
@app.post("/schedules")
def create_schedule(data: ScheduleCreate, db: Session = Depends(get_db)):
    schedule = CrontabSchedule.objects.create(
        minute=data.minute, hour=data.hour, ...
    )
    PeriodicTask.objects.create(
        name=data.name,
        crontab=schedule,
        task=data.task_name,
        args=json.dumps(data.args)
    )
    return {"status": "created"}

@app.delete("/schedules/{name}")
def delete_schedule(name: str):
    PeriodicTask.objects.filter(name=name).delete()

Sem Django : celery-redbeat

output
pip install celery-redbeat

# celery_app.py
celery_app.conf.beat_scheduler = "redbeat.RedBeatScheduler"
celery_app.conf.redbeat_redis_url = "redis://redis:6379/2"

# Criar um schedule
from redbeat import RedBeatSchedulerEntry
from celery.schedules import crontab

entry = RedBeatSchedulerEntry(
    name="my-task",
    task="app.tasks.my_task",
    schedule=crontab(hour=9, minute=0),
    app=celery_app,
    args=["arg1"]
)
entry.save()

# Excluir
entry.delete()

Caso de uso : newsletters multi-tenant

output
# Modelo DB
class Newsletter(models.Model):
    name = models.CharField(max_length=200)
    schedule_cron = models.CharField(max_length=100)        # "0 9 * * mon"
    enabled = models.BooleanField(default=True)
    
    def save(self, *args, **kwargs):
        super().save(*args, **kwargs)
        self.sync_celery_beat()
    
    def sync_celery_beat(self):
        m, h, dom, mon, dow = self.schedule_cron.split()
        schedule, _ = CrontabSchedule.objects.get_or_create(
            minute=m, hour=h, day_of_month=dom,
            month_of_year=mon, day_of_week=dow
        )
        PeriodicTask.objects.update_or_create(
            name=f"newsletter_{self.id}",
            defaults={
                "crontab": schedule,
                "task": "app.tasks.send_newsletter",
                "args": json.dumps([self.id]),
                "enabled": self.enabled
            }
        )

Projeto - Caderno de encargos

NOTEMissão — Construir um sistema de recomendação de produtos enviado por e-mail aos clientes toda semana.

Especificações

NOTEFuncional :
  • Para cada usuário ativo : recuperar histórico de compras
  • Gerar 5 recomendações via ML (cosine similarity ou modelo mais avançado)
  • Compor um e-mail HTML personalizado
  • Enviar via SendGrid com retry
  • Rastrear delivery, open, click
  • Frequência : semanal, segunda-feira 9h
Não funcional :
  • Escala : 100k+ usuários
  • Idempotência : sem envio duplicado
  • Rate limit SendGrid : 100 e-mails/segundo
  • Retomada em caso de erro sem repetir tudo

Arquitetura alvo

output
+---------------+
| Celery Beat   |  cron mon 9h
| (1 instance)  |
+-------+-------+
        |
        v
+---------------+      +-----------+
| dispatch_     |--->  | Redis     |
| weekly_emails |      | broker    |
| (1 task)      |      +-----+-----+
+---------------+            |
                             v
                +------------+------------+
                |                         |
        +-------v------+         +--------v-------+
        | Worker emails|         | Worker ML      |
        | (10 procs)   |         | (2 procs, GPU) |
        +-------+------+         +--------+-------+
                |                         |
                v                         v
        +-------+------+         +--------+-------+
        | SendGrid     |         | Modelo Recomendação|
        | (rate limit) |         | + DB histórico|
        +--------------+         +----------------+
                |
                v
        +-------+------+
        | Webhook      |
        | open/click   |
        +--------------+

Estrutura do projeto

output
recommender/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── app/
│   ├── celery_app.py
│   ├── config.py
│   ├── tasks/
│   │   ├── orchestrator.py      # dispatch_weekly_emails
│   │   ├── ml.py                # compute_recommendations
│   │   ├── email.py             # render + send
│   │   └── tracking.py          # webhooks
│   ├── models/                  # SQLAlchemy
│   ├── ml/
│   │   └── recommender.py
│   └── templates/
│       └── weekly.html
└── tests/

Modelos DB

output
class User(Base):
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str]
    active: Mapped[bool] = mapped_column(default=True)
    last_recos_sent: Mapped[datetime | None]

class Product(Base):
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]
    category: Mapped[str]
    embedding: Mapped[list[float]]    # vetor para ML

class Order(Base):
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    product_id: Mapped[int] = mapped_column(ForeignKey("products.id"))
    created_at: Mapped[datetime]

class EmailJob(Base):
    """Rastrear envios para idempotência"""
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    week_key: Mapped[str]                  # "2026-W21"
    status: Mapped[str]                     # pending|sent|failed|opened|clicked
    sendgrid_msg_id: Mapped[str | None]
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    
    __table_args__ = (UniqueConstraint("user_id", "week_key"),)

Configuração multi-queue

output
celery_app.conf.task_queues = (
    Queue("orchestrator"),
    Queue("ml"),
    Queue("emails"),
    Queue("tracking"),
)

celery_app.conf.task_routes = {
    "app.tasks.orchestrator.*": {"queue": "orchestrator"},
    "app.tasks.ml.*": {"queue": "ml"},
    "app.tasks.email.*": {"queue": "emails"},
    "app.tasks.tracking.*": {"queue": "tracking"},
}

celery_app.conf.beat_schedule = {
    "weekly-recos": {
        "task": "app.tasks.orchestrator.dispatch_weekly_emails",
        "schedule": crontab(hour=9, minute=0, day_of_week="mon")
    }
}

SLOs alvo

NOTE
  • Todos os e-mails enviados em < 2h para 100k usuários
  • Taxa de falha final < 0.5%
  • Latência do webhook de rastreamento < 1s
  • Resiliência : se SendGrid cair por 10min, retomada automática

Resumo

NOTEPara lembrar
  • Separar ML (pesado), e-mails (débito), orchestrator (controle)
  • EmailJob com UniqueConstraint = idempotência
  • Beat para disparo semanal
  • Rastreamento via webhook SendGrid

Persistência e eviction

NOTEObjetivo — Configurar a persistência e a eviction de memória em produção.

RDB : snapshots

output
# redis.conf
save 900 1          # snapshot se >=1 modificação em 15min
save 300 10         # snapshot se >=10 modificações em 5min
save 60 10000       # snapshot se >=10000 modificações em 1min

dbfilename dump.rdb
dir /var/lib/redis
NOTERDB : snapshot binário compacto, restauração rápida. Mas pode perder alguns minutos em caso de crash.

AOF : append-only log

output
# redis.conf
appendonly yes
appendfilename "appendonly.aof"

appendfsync always       # lento mas 0 perda
appendfsync everysec     # padrão : 0-1s de perda
appendfsync no           # SO decide

auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
NOTEAOF : cada escrita registrada. Mais garantias, porém maior no disco.

Híbrido : RDB + AOF

output
# Recomendado em prod : ambos ativos
save 900 1
appendonly yes
appendfsync everysec

# No restart : Redis carrega AOF (mais atualizado)

maxmemory e eviction

output
# Limite de memória
maxmemory 2gb

# Política quando cheio
maxmemory-policy allkeys-lru        # remove as menos usadas recentemente (cache)

Políticas de eviction

PolíticaComportamento
noevictionRejeita as escritas (padrão)
allkeys-lruRemove LRU em todas as chaves
volatile-lruLRU em chaves com TTL
allkeys-lfuMenos frequentemente usadas
allkeys-randomAleatório
volatile-ttlChave com TTL mais curto primeiro

Backups

output
# Snapshot manual
redis-cli BGSAVE              # assíncrono
redis-cli SAVE                # síncrono (bloqueante!)

# Copiar o dump
cp /var/lib/redis/dump.rdb /backup/dump-$(date +%F).rdb

# Restaurar
systemctl stop redis
cp /backup/dump-2026-05-27.rdb /var/lib/redis/dump.rdb
systemctl start redis

Replicação master-replica

output
# No replica
replicaof master.redis.local 6379

# Verificar
redis-cli INFO replication
# role:slave
# master_link_status:up

# Failover automático : Redis Sentinel
# Sharding : Redis Cluster
va-plus-loin

Este artigo cobre os trechos mais úteis — o curso completo Python Celery Redis (8 capítulos, 24 lições, exercícios corrigidos e projeto final) leva você até o fim.

./acceder-au-cours-complet curso gratuito : Vibe Coding

FAQ

Quanto tempo para aprender Python Celery Redis?
Com uma progressão estruturada (8 capítulos, 24 lições curtas e práticas), você atinge um nível operacional em algumas semanas dedicando 30 a 60 minutos por dia. O importante é praticar cada conceito imediatamente.
Precisa de pré-requisitos?
Básicos de informática são suficientes. Se você sabe usar um terminal e ler código simples, está pronto.
Por onde começar na prática?
Reproduza os comandos deste artigo e depois siga o curso completo Python Celery Redis: ele encadeia as 24 lições na ordem, com exercícios e projeto final.

📬 Quer receber este tipo de guia toda semana? Inscreva-se gratuitamente — código real, zero enrolação.