Une interrogation fréquente concerne la nécessité d'utiliser des bibliothèques comme RxJava ou Reactor lorsque les fonctionnalités natives de Java 8 (Streams, CompletableFutures, Optionals) semblent suffire. En réalité, ces outils standards sont efficaces pour des traitements simples. Leur utilisation pour des scénarios plus complexes engendre souvent du code intricate et difficile à maintenir. RxJava et Reactor offrent un ensemble de fonctionnalités robustes pour couvrir des besoins évolutifs sur le long terme. Pour objectiver la comparaison, nous analyserons huit critères distincts.
Critères d'évaluation
- Compositionnalité
- Paresse (Lazy evaluation)
- Réutilisabilité
- Exécution asynchrone
- Mise en cache (Caching)
- Modèle Push vs Pull
- Contre-pression (Backpressure)
- Fusion d'opérateurs (Operator fusion)
Classes à comparer
java.util.concurrent.CompletableFuturejava.util.stream.Streamjava.util.Optionalrx.Observable(RxJava 1)io.reactivex.Observable(RxJava 2)io.reactivex.Flowable(RxJava 2)reactor.core.publisher.Flux(Reactor Core)
Compositionnalité
Toutes les classes listées supportent la composition, favorisant une approche fonctionnelle.
- CompletableFuture : Propose des méthodes
.then*()pour enchaîner des étapes, transmettant une valeur unique et une éventuelle exception. - Stream : Offre une large gamme d'opérateurs chaînables pour la transformation, pouvant traiter une séquence de N éléments.
- Optional : Fournit les opérateurs intermédiaires basiques comme
.map(),.flatMap()et.filter(). - Observable, Flowable, Flux : Offrent des capacités de composition similaires à Stream.
Paresse (Lazy evaluation)
- CompletableFuture : Non paresseux. C'est un conteneur pour un résultat asynchrone déjà initié. Il ne permet pas de déclencher l'exécution en amont de la chaîne.
- Stream : Les opérations intermédiaires sont paresseuses. Les opérations terminales déclenchent le calcul.
- Optional : Non paresseux, les opérations s'exécutent immédiatement.
- Observable, Flowable, Flux : L'exécution ne commence qu'à la souscription (
subscribe).
Réutilisabilité
- CompletableFuture : Potentiellement réutilisable car il enveloppe une valeur, mais il est mutable. Son usage est sûr si personne n'invoque les méthodes
.obtrude*(). - Stream : Non réutilisable. La documentation Java spécifie qu'un Stream ne doit être opéré qu'une seule fois.
- Optional : Totalement réutilisable car immutable et évalué de manière eager.
- Observable, Flowable, Flux : Réutilisables par conception. Chaque souscription relance l'exécution depuis le début.
Exécution asynchrone
- CompletableFuture : Conçu pour chaîner des tâches de manière asynchrone. Il est associé à un
Executor(par défaut, leForkJoinPool.commonPool()). - Stream : Ne permet pas de traitement asynchrone intrinsèque, mais supporte l'exécution parallèle via
stream.parallel(). - Optional : Aucune capacité asynhcrone, c'est un simple conteneur.
- Observable, Flowable, Flux : Orientés vers la construction de systèmes asynchrones, mais synchrones par défaut. Les opérateurs
subscribeOnetobserveOnpermettent un contrôle fin des threads.
Exemple avec subscribeOn :
Observable<string> source = Observable.fromCallable(() -> {
log.info("Lecture sur le thread : " + Thread.currentThread().getName());
return lireFichier("entree.txt");
});
source
.map(texte -> {
log.info("Transformation sur le thread : " + Thread.currentThread().getName());
return texte.length();
})
.subscribeOn(Schedulers.io())
.subscribe(valeur -> log.info("Résultat sur le thread : " + Thread.currentThread().getName()));</string>
Sortie possible :
Lecture sur le thread : RxIoScheduler-2
Transformation sur le thread : RxIoScheduler-2
Résultat sur le thread : RxIoScheduler-2
Exemple avec observeOn :
Observable.fromCallable(() -> {
log.info("Lecture sur le thread : " + Thread.currentThread().getName());
return lireFichier("entree.txt");
})
.observeOn(Schedulers.computation())
.map(texte -> {
log.info("Transformation sur le thread : " + Thread.currentThread().getName());
return texte.length();
})
.subscribeOn(Schedulers.io())
.subscribe(valeur -> log.info("Résultat sur le thread : " + Thread.currentThread().getName()));
Sortie possible :
Lecture sur le thread : RxIoScheduler-2
Transformation sur le thread : RxComputationScheduler-1
Résultat sur le thread : RxComputationScheduler-1
Mise en cache (Caching)
Une classe est réutilisable si plusieurs chaînes dérivées (B, C) s'exécutent avec succès. Elle est mise en cache si, de plus, chaque étape de la chaîne originale (A) n'est exécutée qu'une seule fois pour les dérivés.
- CompletableFuture : Suit les mêmes règles que pour la réutilisabilité.
- Stream : Pas de mise en cache des résultats intermédiaires sans invoquer une opération terminale.
- Optional : « Mis en cache » car le calcul est eager.
- Observable, Flowable, Flux : Non mis en cache par défaut. L'appel à
.cache()active cette fonctionnalité.
Observable<integer> travail = Observable.fromCallable(() -> {
System.out.println("Exécution d'un travail intensif");
return 42;
}).cache(); // Activation du cache
travail.subscribe(System.out::println);
travail.map(n -> n * 2).subscribe(System.out::println);</integer>
Sortie :
Exécution d'un travail intensif
42
84
Modèle Push vs Pull
- Stream et Optional fonctionnent en mode Pull (tirage). Le consommateur appelle une méthode (comme
.collect()ou.get()) pour récupérer les données, ce qui peut bloquer le thread. - CompletableFuture, Observable, Flowable et Flux fonctionnent en mode Push (poussage). Le consommateur souscrit et reçoit les données via des notifications (
onNext,onComplete,onError).
Contre-pression (Backpressure)
La contre-pression est nécessaire dans un système asynchrone de type Push lorsque le producteur génère des données plus vite que le consommateur ne peut les traiter. Il est impératif de disposer d'une stratégie pour éviter un OutOfMemoryError ou des pertes de données.
- Stream et Optional : Ne sont pas concernés car ils sont en mode Pull.
- CompletableFuture : Gère au plus une seule valeur, le concept n'est pas applicable.
- Observable (RxJava 1), Flowable, Flux : Implémentent des stratégies de gestion (bufferisation, abandon, exception, etc.).
- Observable (RxJava 2) : Ne gère pas la contre-pression. RxJava 2 a créé une séparation nette :
Flowablepour les flux avec contre-pression,Observablepour les événements où elle n'est pas nécessaire ou applicable (comme les interactions UI).
Fusion d'opérateurs (Operator fusion)
Cette optimisation vise à réduire la surcharge architecturale en modifiant la chaîne d'opérateurs. Seules les versions récentes (RxJava 2 et Reactor) la supportent, via deux mécanismes :
- Fusion macro : Remplacement de plusieurs opérateurs consécutifs par un seul opérateur spécialisé.
- Fusion micro : Partage d'une même file d'attente (queue) entre un opérateur en fin de chaîne et le suivant, évitant ainsi des intermédiaires de communication.