Dans les applications modernes, deux types de composants de messagerie asynchrone sont couramment utilisés :
- JMS : Java Message Service, avec ActiveMQ comme représentant principal. Cepednant, ses performances sont limitées car il est entièrement implémenté en Java.
- AMQP : Advanced Message Queuing Protocol, implémenté directement au niveau du protocole. Les représentants populaires incluent RabbitMQ, tandis que Kafka est reconnu pour ses hautes performances.
- Intégration d'ActiveMQ avec SpringBoot
2.1. Configuration du projet
Pour utiliser ActiveMQ dans un projet SpringBoot, ajoutez d'abord la dépendance nécessaire dans votre fichier pom.xml :
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
Configurez ensuite votre fichier application.yml pour ActiveMQ :
serveur:
port: 8080
spring:
jms:
pub-sub-domain: false # true pour topic, false pour Queue
activemq:
utilisateur: admin
motDePasse: admin123
url-broker: tcp://localhost:61616
2.2. Création du consommateur de messages
Le consommateur écoute les messages à l'aide de l'annotation @JmsListener :
package com.example.service;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
@Service
public class ConsommateurMessage {
@JmsListener(destination="file.messages")
public void receptionMessage(String contenu) {
System.out.println("Message reçu : " + contenu);
}
}
2.3. Création du producteur de messages
Définissez d'abord une interface pour le service d'envoi de messages :
package com.example.service;
public interface ProducteurMessage {
void envoyerMessage(String contenu) ;
}
Créez ensuite une configuration pour ActiveMQ :
package com.example.config;
import javax.jms.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
@Configuration
@EnableJms
public class ConfigActiveMQ {
@Bean
public Queue fileMessages() {
return new ActiveMQQueue("file.messages") ;
}
}
Implémentez le service d'envoi de messages :
package com.example.service.impl;
import javax.annotation.Resource;
import javax.jms.Queue;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import com.example.service.ProducteurMessage;
@Service
public class ProducteurMessageImpl implements ProducteurMessage {
@Resource
private JmsMessagingTemplate templateJms;
@Resource
private Queue fileMessages;
@Override
public void envoyerMessage(String contenu) {
this.templateJms.convertAndSend(this.fileMessages, contenu);
}
}
2.4. Test de l'intégration
Créez une classe de test pour valider le fonctionnement :
package com.example.test;
import javax.annotation.Resource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.example.Application;
import com.example.service.ProducteurMessage;
@SpringBootTest(classes = Application.class)
@RunWith(SpringRunner.class)
public class TestActiveMQ {
@Resource
private ProducteurMessage producteur;
@Test
public void testEnvoi() throws Exception {
for (int i = 0; i < 5; i++) {
this.producteur.envoyerMessage("Test message - " + i);
}
Thread.sleep(5000); // Attendre la réception des messages
}
}
- Intégration de RabbitMQ avec SpringBoot
3.1. Configuration du projet
Ajoutez la dépendance RabbitMQ à votre projet :
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Configurez le fichier application.yml :
serveur:
port: 8080
spring:
rabbitmq:
adresses: rabbitmq-server
utilisateur: admin
motDePasse: admin123
virtual-host: /
3.2. Configuration du producteur
Créez une configuration pour le producteur de messages :
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfigProducteur {
public static final String ECHANGE = "echange.messages";
public static final String CLE_ROUTAGE = "cle.routage";
public static final String FILE_ATTENTE = "file.attente";
@Bean
public Binding lierEchangeFile(DirectExchange echange, File file) {
return BindingBuilder.bind(file).to(echange).with(CLE_ROUTAGE);
}
@Bean
public DirectExchange getEchangeDirect() {
return new DirectExchange(ECHANGE, true, true);
}
@Bean
public File fileAttente() {
return new File(FILE_ATTENTE);
}
}
3.3. Implémentation du producteur
package com.example.service.impl;
import javax.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import com.example.config.ConfigProducteur;
import com.example.service.ProducteurMessage;
@Service
public class ProducteurMessageRabbitImpl implements ProducteurMessage {
@Resource
private RabbitTemplate templateRabbit;
@Override
public void envoyerMessage(String contenu) {
this.templateRabbit.convertAndSend(ConfigProducteur.ECHANGE,
ConfigProducteur.CLE_ROUTAGE, contenu);
}
}
3.4. Configuration du consommateur
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfigConsommateur {
public static final String ECHANGE = "echange.messages";
public static final String CLE_ROUTAGE = "cle.routage";
public static final String FILE_ATTENTE = "file.attente";
@Bean
public File fileAttente() {
return new File(FILE_ATTENTE);
}
@Bean
public DirectExchange getEchangeDirect() {
return new DirectExchange(ECHANGE, true, true);
}
@Bean
public Binding lierEchangeFile(DirectExchange echange, File file) {
return BindingBuilder.bind(file).to(echange).with(CLE_ROUTAGE);
}
}
3.5. Implémentation du consommateur
package com.example.service;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class ConsommateurMessageRabbit {
@RabbitListener(queues="file.attente")
public void receptionMessage(String contenu) {
System.out.println("Message reçu : " + contenu);
}
}
- Intégration de Kafka avec SpringBoot
4.1. Configuration du projet
Ajoutez la dépendance Kafka :
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
4.2. Configuration du consommateur
Configurez le fichier application.yml pour le consommateur :
serveur:
port: 8080
spring:
kafka:
bootstrap-servers:
- kafka-server:9092
template:
default-topic: topic-messages
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: groupe-1
4.3. Implémantation du consommateur
package com.example.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class ConsommateurMessageKafka {
@KafkaListener(topics = {"messages"})
public void receptionMessage(ConsumerRecord<String, String> record) {
System.out.println("Message reçu - clé: " + record.key() + ", valeur: " + record.value());
}
}
4.4. Configuration du producteur
Configurez le fichier application.yml pour le producteur :
serveur:
port: 8080
spring:
kafka:
bootstrap-servers:
- kafka-server:9092
template:
default-topic: topic-messages
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
4.5. Implémentation du producteur
package com.example.service.impl;
import javax.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.example.service.ProducteurMessage;
@Service
public class ProducteurMessageKafkaImpl implements ProducteurMessage {
@Resource
private KafkaTemplate<String, String> templateKafka;
@Override
public void envoyerMessage(String contenu) {
this.templateKafka.sendDefault("cle-" + System.currentTimeMillis(), contenu);
}
}
4.6. Test de l'intégration
package com.example.test;
import javax.annotation.Resource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.example.Application;
import com.example.service.ProducteurMessage;
@SpringBootTest(classes = Application.class)
@RunWith(SpringRunner.class)
public class TestKafka {
@Resource
private ProducteurMessage producteur;
@Test
public void testEnvoi() throws Exception {
for (int i = 0; i < 10; i++) {
this.producteur.envoyerMessage("Message Kafka - " + i);
}
Thread.sleep(3000); // Attendre la réception des messages
}
}
- Résumé et différences entre les solutions
Les trois composants de messagerie les plus courants dans les environnements de développement sont ActiveMQ, RabbitMQ et Kafka. Chacun offre des avantages spécifiques pour la gestion des messages asynchrones.
Différences entre ActiveMQ et RabbitMQ
- ActiveMQ utilise le protocole JMS, ce qui entraîne des performances plus modestes. Sa structure est simple, basée sur des thèmes ou des files d'attente.
- RabbitMQ utilise le protocole AMQP, offrant de meilleures performances. Il intègre les concepts d'échange (exchange), de file d'attente (queue) et de liaison (bind), permettant des modes de distribution comme fanout (diffusion), topic (sujet) et direct (direct). RabbitMQ supporte également les hôtes virtuels pour une gestion séparée des espaces et utilisateurs.
Différences entre RabbitMQ et Kafka
- RabbitMQ est largement utilisé et offre de bonnes performances. Les messages sont supprimés après consommation. Son support de cluster est limité et nécessite des solutions externes comme HAProxy et Keepalived pour l'équilibrage de charge et la haute disponibilité.
- Kafka utilise des technologies de copie zéro et de lecture par lots pour une efficacité maximale. Les messages sont conservés pendant une période définie (par défaut deux jours) et permettent un accès aux messages historiques via les offsets. Kafka intègre nativement la haute disponibilité et l'équilibrage de charge avec son mécanisme de réplication des données.
Le choix entre ces solutions dépend des exigences spécifiques de l'application en termes de performances, de persistance des messages et de tolérance aux pannes.