Guide Complet des Outils de Concurrence Java : Sémaphore, CountDownLatch, CyclicBarrier et Phaser

Orchestration de la Concurrence en Java : Au-delà des Verrous Classiques

L'AbstractQueuedSynchronizer (AQS) ne se limite pas à fournir le fondement de ReentrantLock ; il constitue l'épine dorsale d'une suite d'outils de synchronisation avancés. Dans les architectures à haute concurrence, les verrous mutuels exclusifs ne suffisent pas toujours. Les développeurs doivent souvent orchestrer des flux d'exécution complexes :

  • Limiter l'accès simultané à des ressources partagées.
  • Synchroniser le démarrage de multiples composants.
  • Coordonner l'exécution parallèle par lots.
  • Gérer des pipelines de traitement multi-étapes.

Le package java.util.concurrent (JUC) répond à ces besoins via quatre classes synchronisatrices majeures. Voici une analyse approfondie de leurs mécanismes et de leurs cas d'usage respectifs.

1. Le Sémaphore : Contrôle d'Accès par Jetons

Le Semaphore régule le nombre de fils d'exécution pouvant interagir simultanément avec une ressource donnée. Il s'appuie sur le mode partagé de l'AQS pour maintenir un compteur interne de jetons (permis). Les opérations fondamentales sont acquire() pour bloquer en attendant un jeton disponible, et release() pour resttiuer un jeton et réveiller un thread en attente.

Exemple : Limitation de débit (Rate Limiting) pour une API

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class ApiRateLimiter {
    public static void main(String[] args) {
        int maxConcurrentRequests = 5;
        Semaphore rateLimiter = new Semaphore(maxConcurrentRequests);
        ExecutorService executor = Executors.newFixedThreadPool(15);

        for (int i = 0; i < 15; i++) {
            executor.submit(() -> {
                try {
                    rateLimiter.acquire();
                    System.out.println(Thread.currentThread().getName() + " accède à l'API");
                    Thread.sleep(1500); // Simule le traitement de la requête
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    rateLimiter.release();
                }
            });
        }
        executor.shutdown();
    }
}

Cas d'usage : Pools de connexions aux bases de données, protection contre la surcharge d'API, limitation de l'accès à des fichiers ou périphériques matériels.

2. CountDownLatch : Synchronisation à Usage Unique

Le CountDownLatch agit comme un compte à rebours. Il permet à un ou plusieurs threads appelants de bloquer sur la méthode await() jusqu'à ce que d'autres threads aient décrémenté le compteur interne via countDown(). Une fois le zéro atteint, le latch est ouvert de manière irréversible.

Exemple : Initialisation de Microservices

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ServiceBootstrap {
    public static void main(String[] args) throws InterruptedException {
        int servicesToInitialize = 4;
        CountDownLatch startupLatch = new CountDownLatch(servicesToInitialize);
        ExecutorService executor = Executors.newFixedThreadPool(servicesToInitialize);

        String[] services = {"Database", "Cache", "MessageBroker", "ConfigServer"};
        for (String service : services) {
            executor.submit(() -> {
                try {
                    System.out.println("Initialisation de " + service + "...");
                    Thread.sleep((long) (Math.random() * 2000));
                    System.out.println(service + " est prêt.");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    startupLatch.countDown();
                }
            });
        }

        startupLatch.await();
        System.out.println("Tous les services sont opérationnels. Démarrage de l'application principale.");
        executor.shutdown();
    }
}

Cas d'usage : Agrégation de résultats asynchrones, attente de la fin de multiples tâches de fond, vérification des dépendances au démarrage d'une application.

3. CyclicBarrier : Point de Rendez-vous Réutilisable

Contrairement au CountDownLatch qui est à sens unique, la CyclicBarrier permet à un groupe de threads de s'attendre mutuellement. Lorsque le dernier thread atteint la barrière, celle-ci s'ouvre, exécute une action optionnelle de fin de phase, et peut être réinitialisée pour l'étape suivante.

Exemple : Calcul Matriciel Parallèle par Phases

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ParallelMatrixComputation {
    public static void main(String[] args) {
        int workerCount = 4;
        CyclicBarrier phaseBarrier = new CyclicBarrier(workerCount, () ->
            System.out.println(">>> Tous les nœuds ont terminé la phase. Passage à la suivante.")
        );

        ExecutorService executor = Executors.newFixedThreadPool(workerCount);

        for (int i = 0; i < workerCount; i++) {
            final int nodeId = i + 1;
            executor.submit(() -> {
                try {
                    System.out.println("Noeud " + nodeId + " traite les données de la phase 1.");
                    Thread.sleep((long) (Math.random() * 1500));
                    phaseBarrier.await();

                    System.out.println("Noeud " + nodeId + " traite les données de la phase 2.");
                    Thread.sleep((long) (Math.random() * 1500));
                    phaseBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        executor.shutdown();
    }
}

Cas d'usage : Algorithmes itératifs (comme PageRank), simulations scientifiques, traitement de données par lots nécessitant une synchronisation stricte entre les étapes.

4. Phaser : Orchestration Multi-étapes Dynamique

Introduit dans Java 7, le Phaser offre une flexibilité supérieure aux barrières classiques. Il permet l'enregistrement et la désinscription dynamiques des participants à tout moment et gère nativement les synchronisations en plusieurs phases sans nécessiter la recréation de l'objet.

Exemple : Pipeline ETL (Extract, Transform, Load)

import java.util.concurrent.Phaser;

public class DataPipeline {
    public static void main(String[] args) {
        Phaser pipelinePhaser = new Phaser(1); // Enregistrement du thread principal

        int initialWorkers = 3;
        for (int i = 0; i < initialWorkers; i++) {
            pipelinePhaser.register();
            new Thread(() -> {
                try {
                    // Phase 1: Extraction
                    System.out.println(Thread.currentThread().getName() + " - Extraction des données");
                    Thread.sleep(1000);
                    pipelinePhaser.arriveAndAwaitAdvance();

                    // Phase 2: Transformation
                    System.out.println(Thread.currentThread().getName() + " - Transformation des données");
                    Thread.sleep(1500);
                    pipelinePhaser.arriveAndAwaitAdvance();

                    // Phase 3: Chargement
                    System.out.println(Thread.currentThread().getName() + " - Chargement des données");
                    Thread.sleep(1000);
                    pipelinePhaser.arriveAndDeregister();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }

        // Le thread principal attend la fin de toutes les phases
        pipelinePhaser.arriveAndDeregister();
        System.out.println("Pipeline ETL terminé.");
    }
}

Cas d'usage : Workflows dynamiques où le nombre de travailleurs peut varier, pipelines de traitement de données complexes, tâches MapReduce personnalisées.

Analyse Comparative et Bonnes Pratiques

Outil Mécanisme Central Réutilisable Scénario Idéal
Semaphore Compteur de jetons (AQS partagé) Oui Contrôle d'accès concurrent, rate limiting, pools de ressources.
CountDownLatch Décrémentation de compteur Non Attente d'événements multiples, agrégation de tâches asynchrones.
CyclicBarrier Point de rendez-vous mutuel Oui Synchronisation de phases itératives avec un nombre fixe de threads.
Phaser Barrière de phase dynamique Oui Tâches multi-étapes avec inscription/déscription dynamique des workers.

Recommandations d'Ingénierie

  • Contrôle de concurrence strict : Optez pour Semaphore lorsque vous devez protéger une ressource partagée ayant une capacité limitée (ex: connexions DB, accès disque).
  • Attente de complétion : Utilisez CountDownLatch pour les scénarios "one-shot" où un thread coordinateur doit attendre la fin de plusieurs travailleurs indépendants.
  • Synchronisation itérative : Préférez CyclicBarrier pour les algorithmes parallèles où tous les threads doivent atteindre un point précis avant de passer à l'itération suivante.
  • Workflows complexes : Choisissez Phaser si votre application nécessite des phases multiples et si le nombre de threads participants peut fluctuer au cours de l'exécution.

Étiquettes: Java Concurrency juc Semaphore CountDownLatch

Publié le 24 juin à 00h59