Intégration de Files de Messages avec SpringBoot

Dans les applications modernes, deux types de composants de messagerie asynchrone sont couramment utilisés :

  • JMS : Java Message Service, avec ActiveMQ comme représentant principal. Cepednant, ses performances sont limitées car il est entièrement implémenté en Java.
  • AMQP : Advanced Message Queuing Protocol, implémenté directement au niveau du protocole. Les représentants populaires incluent RabbitMQ, tandis que Kafka est reconnu pour ses hautes performances.
  1. Intégration d'ActiveMQ avec SpringBoot

2.1. Configuration du projet

Pour utiliser ActiveMQ dans un projet SpringBoot, ajoutez d'abord la dépendance nécessaire dans votre fichier pom.xml :

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

Configurez ensuite votre fichier application.yml pour ActiveMQ :

serveur:
  port: 8080
spring:
  jms:
    pub-sub-domain: false   # true pour topic, false pour Queue
  activemq:
    utilisateur: admin    
    motDePasse: admin123   
    url-broker: tcp://localhost:61616

2.2. Création du consommateur de messages

Le consommateur écoute les messages à l'aide de l'annotation @JmsListener :

package com.example.service;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

@Service
public class ConsommateurMessage {
    @JmsListener(destination="file.messages")
    public void receptionMessage(String contenu) {
        System.out.println("Message reçu : " + contenu);
    }
}

2.3. Création du producteur de messages

Définissez d'abord une interface pour le service d'envoi de messages :

package com.example.service;

public interface ProducteurMessage {
    void envoyerMessage(String contenu) ;
}

Créez ensuite une configuration pour ActiveMQ :

package com.example.config;

import javax.jms.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;

@Configuration
@EnableJms
public class ConfigActiveMQ {
    @Bean
    public Queue fileMessages() {
        return new ActiveMQQueue("file.messages") ;
    }
}

Implémentez le service d'envoi de messages :

package com.example.service.impl;

import javax.annotation.Resource;
import javax.jms.Queue;

import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import com.example.service.ProducteurMessage;

@Service
public class ProducteurMessageImpl implements ProducteurMessage {
    @Resource
    private JmsMessagingTemplate templateJms;
    @Resource
    private Queue fileMessages;
    
    @Override
    public void envoyerMessage(String contenu) {
        this.templateJms.convertAndSend(this.fileMessages, contenu);
    }
}

2.4. Test de l'intégration

Créez une classe de test pour valider le fonctionnement :

package com.example.test;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.example.Application;
import com.example.service.ProducteurMessage;

@SpringBootTest(classes = Application.class)
@RunWith(SpringRunner.class)
public class TestActiveMQ {
    @Resource
    private ProducteurMessage producteur;
    
    @Test
    public void testEnvoi() throws Exception {
        for (int i = 0; i < 5; i++) {
            this.producteur.envoyerMessage("Test message - " + i);
        }
        Thread.sleep(5000); // Attendre la réception des messages
    }
}
  1. Intégration de RabbitMQ avec SpringBoot

3.1. Configuration du projet

Ajoutez la dépendance RabbitMQ à votre projet :

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Configurez le fichier application.yml :

serveur:
  port: 8080
spring:
  rabbitmq:
    adresses: rabbitmq-server
    utilisateur: admin
    motDePasse: admin123
    virtual-host: /

3.2. Configuration du producteur

Créez une configuration pour le producteur de messages :

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfigProducteur {
    public static final String ECHANGE = "echange.messages"; 
    public static final String CLE_ROUTAGE = "cle.routage"; 
    public static final String FILE_ATTENTE = "file.attente"; 
    
    @Bean
    public Binding lierEchangeFile(DirectExchange echange, File file) {
        return BindingBuilder.bind(file).to(echange).with(CLE_ROUTAGE);
    }
    
    @Bean
    public DirectExchange getEchangeDirect() {
        return new DirectExchange(ECHANGE, true, true);
    }
    
    @Bean
    public File fileAttente() {
        return new File(FILE_ATTENTE);
    }
}

3.3. Implémentation du producteur

package com.example.service.impl;

import javax.annotation.Resource;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import com.example.config.ConfigProducteur;
import com.example.service.ProducteurMessage;

@Service
public class ProducteurMessageRabbitImpl implements ProducteurMessage {
    @Resource
    private RabbitTemplate templateRabbit;
    
    @Override
    public void envoyerMessage(String contenu) {
        this.templateRabbit.convertAndSend(ConfigProducteur.ECHANGE,
                ConfigProducteur.CLE_ROUTAGE, contenu);
    }
}

3.4. Configuration du consommateur

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfigConsommateur {
    public static final String ECHANGE = "echange.messages"; 
    public static final String CLE_ROUTAGE = "cle.routage"; 
    public static final String FILE_ATTENTE = "file.attente"; 
    
    @Bean
    public File fileAttente() {
        return new File(FILE_ATTENTE);
    }
    
    @Bean
    public DirectExchange getEchangeDirect() {
        return new DirectExchange(ECHANGE, true, true);
    }
    
    @Bean
    public Binding lierEchangeFile(DirectExchange echange, File file) {
        return BindingBuilder.bind(file).to(echange).with(CLE_ROUTAGE);
    }
}

3.5. Implémentation du consommateur

package com.example.service;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class ConsommateurMessageRabbit {
    @RabbitListener(queues="file.attente")
    public void receptionMessage(String contenu) {
        System.out.println("Message reçu : " + contenu);
    }
}
  1. Intégration de Kafka avec SpringBoot

4.1. Configuration du projet

Ajoutez la dépendance Kafka :

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

4.2. Configuration du consommateur

Configurez le fichier application.yml pour le consommateur :

serveur:
  port: 8080
spring:
  kafka:
    bootstrap-servers:
    - kafka-server:9092
    template:
      default-topic: topic-messages
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: groupe-1

4.3. Implémantation du consommateur

package com.example.service;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class ConsommateurMessageKafka {
    @KafkaListener(topics = {"messages"})
    public void receptionMessage(ConsumerRecord<String, String> record) {
        System.out.println("Message reçu - clé: " + record.key() + ", valeur: " + record.value());
    }
}

4.4. Configuration du producteur

Configurez le fichier application.yml pour le producteur :

serveur:
  port: 8080
spring:
  kafka:
    bootstrap-servers:
    - kafka-server:9092
    template:
      default-topic: topic-messages
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

4.5. Implémentation du producteur

package com.example.service.impl;

import javax.annotation.Resource;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.example.service.ProducteurMessage;

@Service
public class ProducteurMessageKafkaImpl implements ProducteurMessage {
    @Resource
    private KafkaTemplate<String, String> templateKafka;
    
    @Override
    public void envoyerMessage(String contenu) {
        this.templateKafka.sendDefault("cle-" + System.currentTimeMillis(), contenu);
    }
}

4.6. Test de l'intégration

package com.example.test;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.example.Application;
import com.example.service.ProducteurMessage;

@SpringBootTest(classes = Application.class)
@RunWith(SpringRunner.class)
public class TestKafka {
    @Resource
    private ProducteurMessage producteur;
    
    @Test
    public void testEnvoi() throws Exception {
        for (int i = 0; i < 10; i++) {
            this.producteur.envoyerMessage("Message Kafka - " + i);
        }
        Thread.sleep(3000); // Attendre la réception des messages
    }
}
  1. Résumé et différences entre les solutions

Les trois composants de messagerie les plus courants dans les environnements de développement sont ActiveMQ, RabbitMQ et Kafka. Chacun offre des avantages spécifiques pour la gestion des messages asynchrones.

Différences entre ActiveMQ et RabbitMQ

  • ActiveMQ utilise le protocole JMS, ce qui entraîne des performances plus modestes. Sa structure est simple, basée sur des thèmes ou des files d'attente.
  • RabbitMQ utilise le protocole AMQP, offrant de meilleures performances. Il intègre les concepts d'échange (exchange), de file d'attente (queue) et de liaison (bind), permettant des modes de distribution comme fanout (diffusion), topic (sujet) et direct (direct). RabbitMQ supporte également les hôtes virtuels pour une gestion séparée des espaces et utilisateurs.

Différences entre RabbitMQ et Kafka

  • RabbitMQ est largement utilisé et offre de bonnes performances. Les messages sont supprimés après consommation. Son support de cluster est limité et nécessite des solutions externes comme HAProxy et Keepalived pour l'équilibrage de charge et la haute disponibilité.
  • Kafka utilise des technologies de copie zéro et de lecture par lots pour une efficacité maximale. Les messages sont conservés pendant une période définie (par défaut deux jours) et permettent un accès aux messages historiques via les offsets. Kafka intègre nativement la haute disponibilité et l'équilibrage de charge avec son mécanisme de réplication des données.

Le choix entre ces solutions dépend des exigences spécifiques de l'application en termes de performances, de persistance des messages et de tolérance aux pannes.

Étiquettes: SpringBoot ActiveMQ rabbitmq Kafka JMS

Publié le 26 juin à 02h00