Stratégies de Récupération des Résultats Asynchrones avec les Pools de Threads en Java

Annotation Spring @Async et Configuration Personnalisée

L'annotation @Async fournie par le framework Spring permet de déléguer l'exécution d'une méthode à un pool de threads, libérant ainsi le thread appelant pour qu'il poursuive son exécution sans attendre le résultat. Pour activer ce comportement, l'annotation @EnableAsync doit être présente sur la classe de configuration principale.

Voici un exemple de configuration d'un exécuteur personnalisé :

@Configuration
public class ConcurrencyConfiguration {

    @Bean(name = "customTaskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(5000);
        executor.setThreadNamePrefix("AsyncWorker-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}

Pour récupérer les résultats de manière non bloquante, les méthodes annotées avec @Async peuvent retourner un objet CompletableFuture. Cela permet d'encapsuler le résultat via CompletableFuture.completedFuture() et d'utiliser des méthodes de composition comme thenCombine() pour fusionner plusieurs résultats asynchrones.

Ordonnancement par Ordre d'Achèvement avec CompletionService

Contrairement à une soumission classique où le thread consommateur doit attendre les tâches dans l'ordre de leur soumission, CompletionService découple la production de la consommation. Les résultats des tâches exécutées via l'Executor sont placés dans une file d'attente bloquante dès qu'ils sont prêts. Ainsi, les tâches rapides sont récupérées en premier, évitant qu'un thread ne soit bloqué par une tâche longue alors que d'autres résultats sont déjà disponibles.

L'implémentation standard, ExecutorCompletionService, propose deux constructeurs : l'un utilisant une file d'attente LinkedBlockingQueue par défaut, et l'autre acceptant une implémentation personnalisée de BlockingQueue.

La récupération des résultats se fait via take() (qui bloque jusqu'à ce qu'un résultat soit disponible) ou poll() (qui retourne immédiatement null si la file est vide). Les deux méthodes retirent l'élément de la file lors de sa récupération.

ThreadPoolExecutor workerPool = ThreadPoolManager.getDataProcessorPool();
List<DataRecord> aggregatedResults = new ArrayList<>();
CompletionService<DataRecord> completionService = new ExecutorCompletionService<>(workerPool);

// Soumission des tâches
payloadList.forEach(item -> completionService.submit(() -> {
    // Logique de traitement des données
    return DataProcessor.compute(item);
}));

// Consommation des résultats au fur et à mesure de leur achèvement
try {
    for (int i = 0; i < payloadList.size(); i++) {
        Future<DataRecord> futureResult = completionService.take();
        aggregatedResults.add(futureResult.get());
    }
} catch (InterruptedException ex) {
    Thread.currentThread().interrupt();
    log.error("Interruption lors de la récupération des résultats asynchrones", ex);
}

Exécution en Lot via invokeAll d'ExecutorService

La méthode invokeAll de l'interface ExecutorService permet de soumettre une collection entière de tâches Callable. Elle retourne une liste de Future une fois que toutes les tâches sont terminées (ou annulées). Cette approche est idéale lorsque le traitement global ne peut progresser que si l'ensemble des sous-tâches est achevé.

Lors de l'utilisation de invokeAll, il est crucial de configurer correctement la politique de rejet du pool de threads sous-jacent pour gérer les pics de charge :

  • AbortPolicy (par défaut) : Rejette la tâche et lève une RejectedExecutionException.
  • DiscardPolicy : Rejette silencieusement la tâche sans lever d'exception.
  • DiscardOldestPolicy : Supprime la tâche la plus ancienne en attente dans la file pour y insérer la nouvelle.
  • CallerRunsPolicy : Exécute la tâche rejetée directement dans le thread qui a initié l'appel, ce qui ralentit naturellement le producteur.

Chaînage Asynchrone avec CompletableFuture

L'API CompletableFuture offre une approche fonctionnelle et fluide pour la programmation asynchrone. Par défaut, les méthodes statiques comme supplyAsync utilisent le ForkJoinPool.commonPool(). Cependant, pour des charges de travail spécifiques ou intensives, il est fortement recommandé de fournir un Executor personnalisé.

Les méthodes de composition permettent de chaîner les opérations : thenCombine attend l'achèvement de deux étapes pour fusionner leurs résultats, tandis que thenApply permet de transformer le type du résultat de manière synchrone une fois l'étape précédente terminée.

// Utilisation d'un pool dédié pour éviter de saturer le pool commun
Executor pricingPool = Executors.newFixedThreadPool(4);

CompletableFuture<BigDecimal> amazonPriceFuture = CompletableFuture.supplyAsync(
    PricingService::fetchAmazonPrice, 
    pricingPool
);

CompletableFuture<BigDecimal> ebayPriceFuture = CompletableFuture.supplyAsync(
    PricingService::fetchEbayPrice, 
    pricingPool
);

// Combinaison des deux résultats asynchrones pour obtenir un total
CompletableFuture<BigDecimal> totalCostFuture = amazonPriceFuture.thenCombine(
    ebayPriceFuture, 
    BigDecimal::add
);

Étiquettes: Java Concurrency CompletableFuture spring-async executorservice

Publié le 27 juin à 05h05