Présentation de Celery
Celery est un système de file d'attente distribué conçu pour le traitement en temps réel et la planification de tâches. Il fournit également les outils nécessaires à l'exploitation et la maintenance de systèmes distribués.
Caractéristiques principales :
- Exécution concurrente via plusieurs modes : multiprocessing, Eventlet ou Gevent
- Mécanisme de gestion d'erreurs intégré
- Support de multiples courtiers de messages (RabbitMQ, Redis) et backends de stockage
- Outils d'administration via interface web et ligne de commande
- Suivi de l'exécution des tâches planifiées
Architecture fondamentale
Les composants clés de Celery sont :
- Client Celery : publie les tâches dans la file d'attente
- Worker Celery : exécute les tâches consommées depuis la file
- Courtier de messages : assure la communication entre le client et le worker
La sérialisation et la désérialisation des données transitant entre le producteur et le consommateur sont assurées par différents modules : msgpack (sérialisation binaire de type JSON, recommandée), pickle, json ou yaml.
Installation
# Installation de base
pip install -U Celery
# Installation avec toutes les dépendances recommandées
pip install "celery[librabbitmq, redis, msgpack, gevent]"
Utilisation basique
Définition et publication de tâches
# tasks.py
from celery import Celery
broker_url = 'amqp://guest@localhost'
result_backend = 'redis://localhost:6379/0'
queue_app = Celery('tasks', broker=broker_url, backend=result_backend)
@queue_app.task
def addition(x, y):
return x + y
Appel des tâches depuis un client
# caller.py
from tasks import addition
result_ref = addition.delay(8, 7)
computed_value = result_ref.get(timeout=2)
print('Résultat calculé :', computed_value)
Lancement du worker
celery -A tasks worker -l info
Sur Windows 10, ajouter le paramètre -P threads.
Options utiles :
-A: spécifie l'emplacement de l'instance Celery-l: définit le niveau de verbosité des logs du worker-Q: cible une file d'attente spécifique
Lors de l'exécution, l'application crée automatiquement un échange de type direct nommé "celery" ainsi qu'une file d'attente avec la clé de routage "celery".
Configuration avancée
Configuration inline
queue_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Europe/Paris',
enable_utc=True,
)
Configuration via fichier dédié
# settings_celery.py
BROKER_URL = "redis://localhost:6379/1"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TASK_RESULT_EXPIRES = 120
CELERY_ACCEPT_CONTENT = ['json']
# Chargement du fichier de configuration
queue_app = Celery(__name__, include=['tasks'])
queue_app.config_from_object('settings_celery')
Paramètres de configuration courants
# settings_celery.py complet
# Adresse du courtier de messages
BROKER_URL = 'amqp://utilisateur:motdepasse@hote:port/vhost'
# Backend pour stocker les résultats
CELERY_RESULT_BACKEND = 'redis://utilisateur:motdepasse@hote:port/db'
# Sérialisation
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'msgpack'
CELERY_ACCEPT_CONTENT = ['msgpack']
# Expiration des résultats (secondes)
CELERY_TASK_RESULT_EXPIRES = 120
# Confirmation différée des tâches
CELERY_ACKS_LATE = True
# Compression des messages (zlib, bzip2)
CELERY_MESSAGE_COMPRESSION = 'zlib'
# Temps limite d'exécution d'une tâche (secondes)
CELERYD_TASK_TIME_LIMIT = 10
# Nombre de workers concurrents
CELERYD_CONCURRENCY = 4
# Nombre de tâches pré-extraites par worker
CELERYD_PREFETCH_MULTIPLIER = 4
# Nombre maximal de tâches avant redémarrage d'un worker
CELERYD_MAX_TASKS_PER_CHILD = 50
# File d'attente par défaut
CELERY_DEFAULT_QUEUE = "default"
# Configuration des files d'attente
CELERY_QUEUES = {
"default": {
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
},
"web_queue": {
"routing_key": "web.#",
"exchange": "web_exchange",
"exchange_type": "topic",
},
"broadcast_tasks": {
"exchange": "tasks_broadcast",
"exchange_type": "fanout",
"binding_key": "tasks",
},
}
Routage des tâches vers des files spécifiques
from kombu import Queue
queue_app.conf.task_default_queue = 'default'
queue_app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('web_queue', routing_key='web.#'),
)
queue_app.conf.task_default_exchange = 'task_exchange'
queue_app.conf.task_default_exchange_type = 'topic'
queue_app.conf.task_default_routing_key = 'task.default'
Pour lancer un worker ciblant une file spécifique :
celery -A mon_app worker -Q web_queue
Exemple pratique : envoi de SMS asynchrone
Prérequis : serveur Redis actif, instance RabbitMQ démarrée (par exemple via Docker), et accès à un service SMS avec les identifiants API appropriés.
# sms_tasks.py
from celery import Celery
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
sms_app = Celery('sms_tasks',
broker="amqp://guest@localhost//",
backend="redis://localhost:6379/1")
@sms_app.task(bind=True, max_retries=3)
def envoyer_sms(self, destinataire, code_verification):
try:
client = AcsClient('VOTRE_ACCESS_KEY', 'VOTRE_SECRET_KEY', 'cn-hangzhou')
requete = CommonRequest()
requete.set_accept_format('json')
requete.set_domain('dysmsapi.aliyuncs.com')
requete.set_method('POST')
requete.set_protocol_type('https')
requete.set_version('2017-05-25')
requete.set_action_name('SendSms')
requete.add_query_param('RegionId', "cn-hangzhou")
requete.add_query_param('PhoneNumbers', destinataire)
requete.add_query_param('SignName', "VotreSignature")
requete.add_query_param('TemplateCode', "SMS_TEMPLATE_ID")
requete.add_query_param('TemplateParam', '{"code":"%s"}' % code_verification)
resultat = client.do_action(requete)
return str(resultat, encoding="utf-8")
except Exception as exc:
self.retry(exc=exc, countdown=60)
# sms_caller.py
from celery import Celery
app_client = Celery("sms_caller",
broker="amqp://guest@localhost//",
backend="redis://localhost:6379/1")
app_client.send_task('sms_tasks.envoyer_sms',
args=["+33612345678", "482591"])
celery -A sms_tasks worker -l info -P threads
Exemple pratique : envoi d'emails asynchrone
# mail_tasks.py
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.utils import parseaddr, formataddr
from celery import Celery
SMTP_HOST = 'smtp.example.com'
SMTP_PORT = 587
SMTP_USER = 'noreply@example.com'
SMTP_PASSWORD = 'mot_de_passe_application'
SENDER_DISPLAY = 'Service Notification <noreply@example.com>'
mail_app = Celery('mail_tasks',
broker='redis://localhost:6379/1',
backend='redis://localhost:6379/2')
def formater_adresse(chaine):
nom, adresse = parseaddr(chaine)
return formataddr((Header(nom, 'utf-8').encode(), adresse))
@mail_app.task
def envoyer_email(destinataires, corps_message, sujet, expediteur=None):
msg = MIMEText(corps_message)
msg['from'] = formater_adresse(expediteur or SENDER_DISPLAY)
msg['to'] = ','.join(destinataires)
msg['subject'] = Header(sujet, 'utf-8')
serveur = smtplib.SMTP(SMTP_HOST, SMTP_PORT)
serveur.starttls()
serveur.login(SMTP_USER, SMTP_PASSWORD)
serveur.sendmail(SENDER_DISPLAY, destinataires, msg.as_string())
serveur.quit()
# mail_caller.py
from celery import Celery
client_mail = Celery('mail_caller',
broker='redis://localhost:6379/1',
backend='redis://localhost:6379/2')
client_mail.send_task('mail_tasks.envoyer_email',
args=[['destinataire@example.com'],
'Bonjour, votre compte a été créé avec succès.',
'Confirmation d\'inscription'])
celery -A mail_tasks worker -l info -P threads