Python Celery Redis: الـ8 خطوات الرئيسية للانتقال من الصفر إلى التشغيل

بايثون سيليري ريديس: الأساسيات في مقال واحد — كود حقيقي، مخططات وخطوات ملموسة، مقتطفات من دورة تتكون من 24 درسًا.

Python Celery Redis: الـ8 خطوات الرئيسية للانتقال من الصفر إلى التشغيل

الجميع يمكنه تعلم Python Celery Redis — بشرط اتباع الخطوات بالترتيب الصحيح. لقد لخصنا دورة كاملة من 24 درسًا في مسار واضح، مع أكثر مقتطفات الكود فائدة.

tl;dr
  • لماذا Celery
  • أساسيات Redis
  • إعداد Celery
  • المهام وسير العمل
  • Celery Beat
~$ cat ./parcours.md # Python Celery Redis — 8 فصول
01
لماذا Celery
→ Limites du synchrone→ Architecture Broker / Worker / Backend+ 1 دروس أخرى
02
Redis essentials
→ Types de donnees Redis→ Pub/Sub et Streams+ 1 دروس أخرى
03
Celery setup
→ Premier worker et premiere task→ Configuration et best practices+ 1 دروس أخرى
04
Tasks et workflows
→ Retries et timeouts→ Chain, Group, Chord+ 1 دروس أخرى
05
Celery Beat
→ Tasks periodiques→ Crontab schedules+ 1 دروس أخرى
06
Monitoring
→ Flower dashboard→ Logs structures et Sentry+ 1 دروس أخرى
07
Patterns avancés
→ Rate limiting et quotas→ Idempotence et deduplication+ 1 دروس أخرى
08
Projet final pipeline ML email
→ Projet - Cahier des charges→ Implementation des tasks+ 1 دروس أخرى
🏁
Projet final
→ Tu repars avec un projet concret et démontrable

الجدولة الديناميكية عبر قاعدة البيانات

NOTEالهدف — السماح للمستخدمين/المسؤولين بتعديل الجداول الزمنية دون إعادة النشر.

مشكلة beat_schedule في الكود

NOTEإذا أردت إضافة/تعديل جدول زمني في الإنتاج، يجب إعادة النشر. بالنسبة للمهام المعرفة من قبل المستخدم (النشرات الإخبارية، تقارير العملاء)، تحتاج إلى مجدول ديناميكي.

django-celery-beat (موصى به)

output
pip install django-celery-beat

# settings.py
INSTALLED_APPS = [..., "django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"

# تشغيل التهجيرات
python manage.py migrate

# يقرأ Beat الجدول الزمني من قاعدة البيانات كل 5 ثوانٍ
celery -A myproject beat -l info

الجداول المنشأة

إنشاء جدول زمني عبر Python

output
from django_celery_beat.models import CrontabSchedule, PeriodicTask

# 1. تحديد التوقيت
schedule, _ = CrontabSchedule.objects.get_or_create(
    minute="30",
    hour="9",
    day_of_week="mon-fri",
    day_of_month="*",
    month_of_year="*",
    timezone="Europe/Paris"
)

# 2. إنشاء المهمة الدورية
PeriodicTask.objects.create(
    crontab=schedule,
    name="Newsletter Acme Corp",
    task="app.tasks.send_newsletter",
    args=json.dumps(["acme_corp"]),
    kwargs=json.dumps({"template": "weekly"}),
    enabled=True,
    one_off=False,
)

واجهة API الإدارية عبر Django admin

output
# يعرض Django admin بالفعل النماذج
# يمكنك الذهاب مباشرة إلى /admin/django_celery_beat/periodictask/
# للإضافة والتعديل والتعطيل عبر واجهة المستخدم

واجهة REST API مخصصة

output
@app.post("/schedules")
def create_schedule(data: ScheduleCreate, db: Session = Depends(get_db)):
    schedule = CrontabSchedule.objects.create(
        minute=data.minute, hour=data.hour, ...
    )
    PeriodicTask.objects.create(
        name=data.name,
        crontab=schedule,
        task=data.task_name,
        args=json.dumps(data.args)
    )
    return {"status": "created"}

@app.delete("/schedules/{name}")
def delete_schedule(name: str):
    PeriodicTask.objects.filter(name=name).delete()

بدون Django : celery-redbeat

output
pip install celery-redbeat

# celery_app.py
celery_app.conf.beat_scheduler = "redbeat.RedBeatScheduler"
celery_app.conf.redbeat_redis_url = "redis://redis:6379/2"

# إنشاء جدول زمني
from redbeat import RedBeatSchedulerEntry
from celery.schedules import crontab

entry = RedBeatSchedulerEntry(
    name="my-task",
    task="app.tasks.my_task",
    schedule=crontab(hour=9, minute=0),
    app=celery_app,
    args=["arg1"]
)
entry.save()

# حذف
entry.delete()

حالة استخدام: النشرات الإخبارية متعددة المستأجرين

output
# نموذج قاعدة البيانات
class Newsletter(models.Model):
    name = models.CharField(max_length=200)
    schedule_cron = models.CharField(max_length=100)        # "0 9 * * mon"
    enabled = models.BooleanField(default=True)
    
    def save(self, *args, **kwargs):
        super().save(*args, **kwargs)
        self.sync_celery_beat()
    
    def sync_celery_beat(self):
        m, h, dom, mon, dow = self.schedule_cron.split()
        schedule, _ = CrontabSchedule.objects.get_or_create(
            minute=m, hour=h, day_of_month=dom,
            month_of_year=mon, day_of_week=dow
        )
        PeriodicTask.objects.update_or_create(
            name=f"newsletter_{self.id}",
            defaults={
                "crontab": schedule,
                "task": "app.tasks.send_newsletter",
                "args": json.dumps([self.id]),
                "enabled": self.enabled
            }
        )

المشروع - دفتر الشروط

NOTEالمهمة — بناء نظام توصية منتجات يُرسل بالبريد الإلكتروني للعملاء كل أسبوع.

المواصفات

NOTEالوظيفي:
  • لكل مستخدم نشط: استرجاع سجل المشتريات
  • توليد 5 توصيات عبر تعلم الآلة (تشابه جيب التمام أو نموذج أكثر تقدمًا)
  • تأليف بريد إلكتروني HTML مخصص
  • الإرسال عبر SendGrid مع إعادة المحاولة
  • تتبع التسليم والفتح والنقر
  • التكرار: أسبوعيًا، الاثنين الساعة 9 صباحًا
غير الوظيفي:
  • التوسع: أكثر من 100 ألف مستخدم
  • اللامركزية: عدم الإرسال المزدوج
  • حد معدل SendGrid: 100 بريد إلكتروني/ثانية
  • الاستئناف عند الخطأ دون إعادة كل شيء

الهندسة المستهدفة

output
+---------------+
| Celery Beat   |  cron mon 9h
| (1 instance)  |
+-------+-------+
        |
        v
+---------------+      +-----------+
| dispatch_     |--->  | Redis     |
| weekly_emails |      | broker    |
| (1 task)      |      +-----+-----+
+---------------+            |
                             v
                +------------+------------+
                |                         |
        +-------v------+         +--------v-------+
        | Worker emails|         | Worker ML      |
        | (10 procs)   |         | (2 procs, GPU) |
        +-------+------+         +--------+-------+
                |                         |
                v                         v
        +-------+------+         +--------+-------+
        | SendGrid     |         | Modele Recommandation|
        | (rate limit) |         | + DB historique|
        +--------------+         +----------------+
                |
                v
        +-------+------+
        | Webhook      |
        | open/click   |
        +--------------+

هيكل المشروع

output
recommender/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── app/
│   ├── celery_app.py
│   ├── config.py
│   ├── tasks/
│   │   ├── orchestrator.py      # dispatch_weekly_emails
│   │   ├── ml.py                # compute_recommendations
│   │   ├── email.py             # render + send
│   │   └── tracking.py          # webhooks
│   ├── models/                  # SQLAlchemy
│   ├── ml/
│   │   └── recommender.py
│   └── templates/
│       └── weekly.html
└── tests/

نماذج قاعدة البيانات

output
class User(Base):
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str]
    active: Mapped[bool] = mapped_column(default=True)
    last_recos_sent: Mapped[datetime | None]

class Product(Base):
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]
    category: Mapped[str]
    embedding: Mapped[list[float]]    # متجه لتعلم الآلة

class Order(Base):
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    product_id: Mapped[int] = mapped_column(ForeignKey("products.id"))
    created_at: Mapped[datetime]

class EmailJob(Base):
    """تتبع الإرسالات للامركزية"""
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    week_key: Mapped[str]                  # "2026-W21"
    status: Mapped[str]                     # pending|sent|failed|opened|clicked
    sendgrid_msg_id: Mapped[str | None]
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    
    __table_args__ = (UniqueConstraint("user_id", "week_key"),)

تكوين الصفوف المتعددة

output
celery_app.conf.task_queues = (
    Queue("orchestrator"),
    Queue("ml"),
    Queue("emails"),
    Queue("tracking"),
)

celery_app.conf.task_routes = {
    "app.tasks.orchestrator.*": {"queue": "orchestrator"},
    "app.tasks.ml.*": {"queue": "ml"},
    "app.tasks.email.*": {"queue": "emails"},
    "app.tasks.tracking.*": {"queue": "tracking"},
}

celery_app.conf.beat_schedule = {
    "weekly-recos": {
        "task": "app.tasks.orchestrator.dispatch_weekly_emails",
        "schedule": crontab(hour=9, minute=0, day_of_week="mon")
    }
}

أهداف SLO

NOTE
  • إرسال جميع الرسائل الإلكترونية في أقل من ساعتين لـ 100 ألف مستخدم
  • معدل الفشل النهائي أقل من 0.5%
  • زمن استجابة webhook التتبع أقل من ثانية واحدة
  • المرونة: إذا تعطل SendGrid لمدة 10 دقائق، يتم الاستئناف تلقائيًا

الملخص

NOTEما يجب تذكره
  • فصل ML (الثقيل)، الرسائل الإلكترونية (الإنتاجية)، المنسق (التحكم)
  • EmailJob مع UniqueConstraint = اللامركزية
  • Beat للتشغيل الأسبوعي
  • التتبع عبر webhook SendGrid

الاستمرارية والإخلاء

NOTEالهدف — تكوين الاستمرارية وإخلاء الذاكرة في الإنتاج.

RDB : اللقطات

output
# redis.conf
save 900 1          # لقطة إذا كان هناك تعديل واحد على الأقل في 15 دقيقة
save 300 10         # لقطة إذا كان هناك 10 تعديلات على الأقل في 5 دقائق
save 60 10000       # لقطة إذا كان هناك 10000 تعديل على الأقل في دقيقة واحدة

dbfilename dump.rdb
dir /var/lib/redis
NOTERDB : لقطة ثنائية مدمجة، استعادة سريعة. لكن قد تفقد بضع دقائق في حالة التعطل.

AOF : سجل الإلحاق فقط

output
# redis.conf
appendonly yes
appendfilename "appendonly.aof"

appendfsync always       # بطيء لكنه بدون فقدان
appendfsync everysec     # الافتراضي: فقدان 0-1 ثانية
appendfsync no           # يقرر نظام التشغيل

auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
NOTEAOF : كل كتابة مسجلة. ضمانات أكثر، لكنه أكبر حجمًا على القرص.

الهجين: RDB + AOF

output
# موصى به في الإنتاج: تفعيل الاثنين
save 900 1
appendonly yes
appendfsync everysec

# عند إعادة التشغيل: يحمل Redis AOF (الأحدث)

maxmemory والإخلاء

output
# حد الذاكرة
maxmemory 2gb

# السياسة عند الامتلاء
maxmemory-policy allkeys-lru        # يحذف الأقل استخدامًا مؤخرًا (ذاكرة التخزين المؤقت)

سياسات الإخلاء

السياسةالسلوك
noevictionيرفض الكتابات (الافتراضي)
allkeys-lruيحذف LRU على جميع المفاتيح
volatile-lruLRU على المفاتيح ذات TTL
allkeys-lfuالأقل استخدامًا تكرارًا
allkeys-randomعشوائي
volatile-ttlالمفتاح ذو TTL الأقصر أولاً

النسخ الاحتياطية

output
# لقطة يدوية
redis-cli BGSAVE              # غير متزامن
redis-cli SAVE                # متزامن (يحظر!)

# نسخ التفريغ
cp /var/lib/redis/dump.rdb /backup/dump-$(date +%F).rdb

# استعادة
systemctl stop redis
cp /backup/dump-2026-05-27.rdb /var/lib/redis/dump.rdb
systemctl start redis

النسخ المتماثل master-replica

output
# على النسخة المتماثلة
replicaof master.redis.local 6379

# التحقق
redis-cli INFO replication
# role:slave
# master_link_status:up

# تجاوز الفشل التلقائي: Redis Sentinel
# التقسيم: Redis Cluster
va-plus-loin

يغطي هذا المقال المقتطفات الأكثر فائدة — الدورة الكاملة Python Celery Redis (8 فصول، 24 درسًا، تمارين مصححة ومشروع نهائي) تأخذك إلى النهاية.

./acceder-au-cours-complet cours gratuit : Vibe Coding

الأسئلة الشائعة

كم من الوقت يستغرق تعلم Python Celery Redis؟
مع تقدم منظم (8 فصول، 24 درسًا قصيرًا وعمليًا)، يمكن الوصول إلى مستوى تشغيلي في بضعة أسابيع بمعدل 30 إلى 60 دقيقة يوميًا. المهم هو ممارسة كل مفهوم فورًا.
هل هناك متطلبات مسبقة؟
تكفي أساسيات الحوسبة. إذا كنت تعرف استخدام الطرفية وقراءة كود بسيط، فأنت جاهز.
من أين نبدأ عمليًا؟
أعد إنتاج أوامر هذا المقال، ثم تابع الدورة الكاملة Python Celery Redis: فهي تربط الـ 24 درسًا بالترتيب، مع تمارين ومشروع نهائي.

📬 هل تريد تلقي هذا النوع من الأدلة كل أسبوع؟ اشترك مجانًا — كود حقيقي، بدون كلام فارغ.