Intégration de tâches asynchrones dans Python avec Celery et Redis

Dans le développement d'applications web, certaines opérations consommant beaucoup de temps : l'envoi d'e-mails, la génération de raports, ou l'exécution de suites de tests. L'exécution synchrone de ces tâches dans le flux principal dégrade fortement l'expérience utilisateur. Une file d'attente de tâches asynchrones constitue une solution idéale. L'écosystème Python propose Celery, un système de files d'atente de tâches distribué, qui, couplé à Redis, un magasin de données en mémoire performant, permet d'externaliser ces processus lents.

Configuration initiale de l'environnement

Pour commencer, assurez-vous que Redis est installé et fonctionne. Puis, installez les bibliothèques Python nécessaires à l'aide de pip.

pip install celery redis

La création d'une application Celery minimale requiert quelques lignes de code. Configurez un fichier de configuration, par exemple celery_settings.py.

# celery_settings.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
task_routes = {
    'app.tasks.envoyer_courriel_bienvenue': {'queue': 'courriels'},
    'app.tasks.executer_cas_test.*': {'queue': 'tests'}
}

Dans un fichier tasks.py, initialisez l'application Celery en important la configuration.

# tasks.py
from celery import Celery
from . import celery_settings

app = Celery('mon_application')
app.config_from_object(celery_settings)

@app.task(name='app.tasks.envoyer_courriel_bienvenue')
def envoyer_courriel_bienvenue(adresse_email):
    """Simule un envoi d'email long."""
    import time
    time.sleep(3)
    return f"Courriel envoyé à {adresse_email}"

Les paramètres clés sont le broker_url (l'adresse du message broker) et le result_backend (l'endroit où stocker les résultats des tâches).

Cas d'application : envoi asynchrone d'e-mails lors de l'inscription

Une approche traditionnelle et bloquante pourrait ressembler à ceci dans une vue Django :

def inscription_utilisateur(request):
    utilisateur = creer_utilisateur(request.POST)
    # Appel synchrone qui bloque la réponse
    envoyer_courriel_bienvenue(utilisateur.email)
    return HttpResponse("Inscription réussie.")

L'utilisateur doit attendre la fin de l'envoi du courriel. En utilisant Celery, la tâche est déléguée à un worker.

from .tasks import envoyer_courriel_bienvenue

def inscription_utilisateur(request):
    utilisateur = creer_utilisateur(request.POST)
    # Déclenchement asynchrone avec .delay()
    envoyer_courriel_bienvenue.delay(utilisateur.email)
    return HttpResponse("Inscription réussie ! Votre courriel de bienvenue est en cours d'envoi.")

La réponse HTTP est immédiate. L'état de la tâche peut être consulté ultérieurement via son identifiant unique.

requete = envoyer_courriel_bienvenue.delay('utilisateur@exemple.com')
print(requete.id)          # Identifiant unique de la tâche
print(requete.status)      # Statut (PENDING, SUCCESS, FAILURE)
print(requete.result)      # Résultat (peut être None si non terminée)

Exécution asynchrone de tests automatisés

Pour une plateforme de test, l'exécution synchrone d'une suite de tests bloque l'interface. Celery permet leur exécution parallèle.

# test_tasks.py
from celery import shared_task
from .models import CasDeTest

@shared_task(bind=True)
def executer_cas_test(self, identifiant_cas):
    cas = CasDeTest.objects.get(id=identifiant_cas)
    # Logique d'exécution du test
    resultat = effectuer_test(cas)
    return {
        'id_cas': identifiant_cas,
        'statut': resultat.statut,
        'duree': resultat.duree
    }

Pour exécuter plusieurs tests en parallèle, on utilise un groupe de tâches.

from celery import group
from .test_tasks import executer_cas_test

def lancer_suite_tests(identifiant_suite):
    cas_a_tester = CasDeTest.objects.filter(suite=identifiant_suite)
    # Création d'un groupe pour une exécution parallèle
    groupe = group(executer_cas_test.s(cas.id) for cas in cas_a_tester)
    # Exécution asynchrone du groupe
    return groupe.apply_async()

Le groupe permet de lancer tous les cas de test simultanément et de collecter leurs résultats individuels.

Configuration avancée et robustesse

En environnement de production, il est crucial de configurer la gestion des erreurs et les mécanismes de relance.

@app.task(bind=True, retry_backoff=True, max_retries=3)
def envoyer_courriel_important(self, destinataire, contenu):
    try:
        # Code pouvant échouer, ex: connexion SMTP
        envoyer_smtp(destinataire, contenu)
    except Exception as exc:
        # Relance automatique avec attente exponentielle
        raise self.retry(exc=exc)

Des paramètres comme worker_prefetch_multiplier (nombre de tâches pré-chargées par worker) ou task_acks_late (accusé de réception après traitement) optimisent les performances et la fiabilité.

Pour la surveillance, l'outil Flower offre une interface web pour visualiser les workers, les files d'attente et l'état des tâches en temps réel. L'intégration de logs structurés permet également un diagnostic efficace.

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def traiter_donnees_volumineuses(donnees):
    logger.info("Début du traitement des données.")
    try:
        resultat = transformer(donnees)
        logger.debug(f"Traitement terminé. Résultat : {resultat}")
        return resultat
    except Exception as e:
        logger.error(f"Échec du traitement : {str(e)}", exc_info=True)
        raise

Étiquettes: Celery Redis Python tâches asynchrones files d'attente de messages

Publié le 29 mai à 14h00