Introduction à Celery pour les tâches asynchrones et distribuées

Présentation de Celery

Celery est un système de file d'attente distribué conçu pour le traitement en temps réel et la planification de tâches. Il fournit également les outils nécessaires à l'exploitation et la maintenance de systèmes distribués.

Caractéristiques principales :

  • Exécution concurrente via plusieurs modes : multiprocessing, Eventlet ou Gevent
  • Mécanisme de gestion d'erreurs intégré
  • Support de multiples courtiers de messages (RabbitMQ, Redis) et backends de stockage
  • Outils d'administration via interface web et ligne de commande
  • Suivi de l'exécution des tâches planifiées

Architecture fondamentale

Les composants clés de Celery sont :

  • Client Celery : publie les tâches dans la file d'attente
  • Worker Celery : exécute les tâches consommées depuis la file
  • Courtier de messages : assure la communication entre le client et le worker

La sérialisation et la désérialisation des données transitant entre le producteur et le consommateur sont assurées par différents modules : msgpack (sérialisation binaire de type JSON, recommandée), pickle, json ou yaml.

Installation

# Installation de base
pip install -U Celery

# Installation avec toutes les dépendances recommandées
pip install "celery[librabbitmq, redis, msgpack, gevent]"

Utilisation basique

Définition et publication de tâches

# tasks.py
from celery import Celery

broker_url = 'amqp://guest@localhost'
result_backend = 'redis://localhost:6379/0'

queue_app = Celery('tasks', broker=broker_url, backend=result_backend)

@queue_app.task
def addition(x, y):
    return x + y

Appel des tâches depuis un client

# caller.py
from tasks import addition

result_ref = addition.delay(8, 7)
computed_value = result_ref.get(timeout=2)
print('Résultat calculé :', computed_value)

Lancement du worker

celery -A tasks worker -l info

Sur Windows 10, ajouter le paramètre -P threads.

Options utiles :

  • -A : spécifie l'emplacement de l'instance Celery
  • -l : définit le niveau de verbosité des logs du worker
  • -Q : cible une file d'attente spécifique

Lors de l'exécution, l'application crée automatiquement un échange de type direct nommé "celery" ainsi qu'une file d'attente avec la clé de routage "celery".

Configuration avancée

Configuration inline

queue_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Europe/Paris',
    enable_utc=True,
)

Configuration via fichier dédié

# settings_celery.py

BROKER_URL = "redis://localhost:6379/1"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TASK_RESULT_EXPIRES = 120
CELERY_ACCEPT_CONTENT = ['json']
# Chargement du fichier de configuration
queue_app = Celery(__name__, include=['tasks'])
queue_app.config_from_object('settings_celery')

Paramètres de configuration courants

# settings_celery.py complet

# Adresse du courtier de messages
BROKER_URL = 'amqp://utilisateur:motdepasse@hote:port/vhost'

# Backend pour stocker les résultats
CELERY_RESULT_BACKEND = 'redis://utilisateur:motdepasse@hote:port/db'

# Sérialisation
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'msgpack'
CELERY_ACCEPT_CONTENT = ['msgpack']

# Expiration des résultats (secondes)
CELERY_TASK_RESULT_EXPIRES = 120

# Confirmation différée des tâches
CELERY_ACKS_LATE = True

# Compression des messages (zlib, bzip2)
CELERY_MESSAGE_COMPRESSION = 'zlib'

# Temps limite d'exécution d'une tâche (secondes)
CELERYD_TASK_TIME_LIMIT = 10

# Nombre de workers concurrents
CELERYD_CONCURRENCY = 4

# Nombre de tâches pré-extraites par worker
CELERYD_PREFETCH_MULTIPLIER = 4

# Nombre maximal de tâches avant redémarrage d'un worker
CELERYD_MAX_TASKS_PER_CHILD = 50

# File d'attente par défaut
CELERY_DEFAULT_QUEUE = "default"

# Configuration des files d'attente
CELERY_QUEUES = {
    "default": {
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    },
    "web_queue": {
        "routing_key": "web.#",
        "exchange": "web_exchange",
        "exchange_type": "topic",
    },
    "broadcast_tasks": {
        "exchange": "tasks_broadcast",
        "exchange_type": "fanout",
        "binding_key": "tasks",
    },
}

Routage des tâches vers des files spécifiques

from kombu import Queue

queue_app.conf.task_default_queue = 'default'
queue_app.conf.task_queues = (
    Queue('default', routing_key='task.#'),
    Queue('web_queue', routing_key='web.#'),
)
queue_app.conf.task_default_exchange = 'task_exchange'
queue_app.conf.task_default_exchange_type = 'topic'
queue_app.conf.task_default_routing_key = 'task.default'

Pour lancer un worker ciblant une file spécifique :

celery -A mon_app worker -Q web_queue

Exemple pratique : envoi de SMS asynchrone

Prérequis : serveur Redis actif, instance RabbitMQ démarrée (par exemple via Docker), et accès à un service SMS avec les identifiants API appropriés.

# sms_tasks.py
from celery import Celery
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest

sms_app = Celery('sms_tasks',
                 broker="amqp://guest@localhost//",
                 backend="redis://localhost:6379/1")


@sms_app.task(bind=True, max_retries=3)
def envoyer_sms(self, destinataire, code_verification):
    try:
        client = AcsClient('VOTRE_ACCESS_KEY', 'VOTRE_SECRET_KEY', 'cn-hangzhou')

        requete = CommonRequest()
        requete.set_accept_format('json')
        requete.set_domain('dysmsapi.aliyuncs.com')
        requete.set_method('POST')
        requete.set_protocol_type('https')
        requete.set_version('2017-05-25')
        requete.set_action_name('SendSms')

        requete.add_query_param('RegionId', "cn-hangzhou")
        requete.add_query_param('PhoneNumbers', destinataire)
        requete.add_query_param('SignName', "VotreSignature")
        requete.add_query_param('TemplateCode', "SMS_TEMPLATE_ID")
        requete.add_query_param('TemplateParam', '{"code":"%s"}' % code_verification)

        resultat = client.do_action(requete)
        return str(resultat, encoding="utf-8")
    except Exception as exc:
        self.retry(exc=exc, countdown=60)
# sms_caller.py
from celery import Celery

app_client = Celery("sms_caller",
                    broker="amqp://guest@localhost//",
                    backend="redis://localhost:6379/1")

app_client.send_task('sms_tasks.envoyer_sms',
                     args=["+33612345678", "482591"])
celery -A sms_tasks worker -l info -P threads

Exemple pratique : envoi d'emails asynchrone

# mail_tasks.py
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.utils import parseaddr, formataddr
from celery import Celery

SMTP_HOST = 'smtp.example.com'
SMTP_PORT = 587
SMTP_USER = 'noreply@example.com'
SMTP_PASSWORD = 'mot_de_passe_application'
SENDER_DISPLAY = 'Service Notification <noreply@example.com>'

mail_app = Celery('mail_tasks',
                  broker='redis://localhost:6379/1',
                  backend='redis://localhost:6379/2')


def formater_adresse(chaine):
    nom, adresse = parseaddr(chaine)
    return formataddr((Header(nom, 'utf-8').encode(), adresse))


@mail_app.task
def envoyer_email(destinataires, corps_message, sujet, expediteur=None):
    msg = MIMEText(corps_message)
    msg['from'] = formater_adresse(expediteur or SENDER_DISPLAY)
    msg['to'] = ','.join(destinataires)
    msg['subject'] = Header(sujet, 'utf-8')

    serveur = smtplib.SMTP(SMTP_HOST, SMTP_PORT)
    serveur.starttls()
    serveur.login(SMTP_USER, SMTP_PASSWORD)
    serveur.sendmail(SENDER_DISPLAY, destinataires, msg.as_string())
    serveur.quit()
# mail_caller.py
from celery import Celery

client_mail = Celery('mail_caller',
                     broker='redis://localhost:6379/1',
                     backend='redis://localhost:6379/2')

client_mail.send_task('mail_tasks.envoyer_email',
                      args=[['destinataire@example.com'],
                            'Bonjour, votre compte a été créé avec succès.',
                            'Confirmation d\'inscription'])
celery -A mail_tasks worker -l info -P threads

Ressources complémentaires

Étiquettes: Celery Python task-queue distributed-systems rabbitmq

Publié le 4 juin à 19h33