Les Files d'Attente Mortes et les Files d'Attente Retardées dans RabbitMQ

Les Files d'Attente Mortes (Dead Letter Queues)

Les files d'attente mortes ne diffèrent pas fondamentalement des files d'attente standard. Elles nécessitent la création d'une Queue et d'un Exchange, suivis d'un lien via un RoutingKey. La particularité des files d'attente mortes est que leur RoutingKey et leur Exchange sont configurés comme paramètres liés à la file d'attente principale.

Un scénario d'utilisation courant est lorsque les messages d'une file d'attente normale sont rejetés avec basicNack ou reject, ils sont alors routés vers la file d'attente morte associée. Un autre scénario fréquent est lorsque le consommateur rencontre des exceptions lors du traitement des messages avec accusé de réception automatiuqe activé, dépassant le nombre de tentatives de reprise. Dans ce cas, le message est également transféré vers la file d'attente morte si celle-ci est configurée.

Configuration des Files d'Attente Mortes et des Échangeurs

Dans cet exemple, nous utilisons deux files d'attente : la file d'attente métier messageQueue et sa file d'attente morte associée. Nous démontrerons le rejet manuel des messages avec accusé de réception manuel.

Configuration YAML


spring:
 rabbitmq:
    host: 192.168.99.12
    port: 5672
    username: guest
    password: guest
    # Confirmation d'envoi
    publisher-confirms: true
    # Rappel en cas d'échec de routage
    publisher-returns: true
    template:
        # Doit être true pour notifier l'écouteur en cas d'échec de routage, false pour ignorer le message
        mandatory: true
    listener:
      simple:
        # Nombre de messages récupérés de RabbitMQ à chaque fois
        prefetch: 1
        default-requeue-rejected: false
        # Nombre de consommateurs démarrés par file d'attente
        concurrency: 1
        # Nombre maximum de consommateurs par file d'attente
        max-concurrency: 1
        # Mode d'accusé de réception manuel - nécessite un ACK manuel dans le code
        acknowledge-mode: manual
        
# File d'attente de messages
message:
  queue:
    name: notification.messages
	
# Nom de l'échangeur de messages
exchange:
  name: notificationTopicExchange

# File d'attente morte
dead:
  letter:
    queue:
      name: notification.dead.letter
    exchange:
      name: notificationDeadLetterTopicExchange

Configuration Java de la File d'Attente Morte


/**
 * Configuration RabbitMQ
 */
@Configuration
@Slf4j
public class RabbitmqConfiguration {

    @Value("${message.queue.name}")
    private String messageQueueName;
    @Value("${exchange.name}")
    private String exchangeName;
    @Value("${dead.letter.queue.name}")
    private String deadLetterQueueName;
    @Value("${dead.letter.exchange.name}")
    private String deadLetterExchangeName;

    @Bean
    public Queue messageQueue() {
        Map<string object=""> arguments = new HashMap<>(2);
        // Liaison avec l'échangeur de lettres mortes
        arguments.put("x-dead-letter-exchange", deadLetterExchangeName);
        // Liaison avec la clé de routage des lettres mortes
        arguments.put("x-dead-letter-routing-key", deadLetterQueueName + ".#");

        return new Queue(messageQueueName, true, false, false, arguments);
    }

    @Bean
    TopicExchange messageExchange() {
        return new TopicExchange(exchangeName);
    }

    @Bean
    Binding bindMessageQueue() {
        return BindingBuilder.bind(messageQueue()).to(messageExchange()).with(messageQueueName + ".#");
    }

    @Bean
    public Queue deadLetterQueue() {
        return new Queue(deadLetterQueueName);
    }

    @Bean
    TopicExchange deadLetterExchange() {
        return new TopicExchange(deadLetterExchangeName);
    }

    @Bean
    Binding bindDeadLetterQueue() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueueName + ".#");
    }
}
</string>

Producteur de Messages


@Configuration
@EnableScheduling
@Slf4j
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${exchange.name}")
    private String topicExchange;

    @Scheduled(cron = "0 0/2 * * * ?")
    public void sendNotificationMessage() {
        String content = RandomStringUtils.randomAlphanumeric(8);
        JSONObject notification = new JSONObject();
        notification.put("content", content);
        notification.put("recipient", "test@example.com");
        
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(topicExchange, "notification.message.x", 
                                    notification.toJSONString(), correlationData);
        
        log.info("--- Envoi du message de notification ---{}---ID du message---{}", 
                notification, correlationData.getId());
    }
}

Consommateur de Messages


@Component
@Slf4j
public class MessageConsumer {

    /**
     * Consommateur de notifications
     */
    @RabbitListener(queues = "notification.messages")
    @RabbitHandler
    public void handleMessage(Message message, Channel channel, 
                            @Headers Map<string object=""> headers) throws IOException {
        
        try {
            String msgContent = new String(message.getBody(), CharEncoding.UTF_8);
            JSONObject jsonObject = JSON.parseObject(msgContent);
            jsonObject.put("messageId", headers.get("spring_returned_message_correlation"));
            log.info("--- Réception du message ---{}", jsonObject);
            
            // Simuler une exception
            int result = 1 / 0;
            
            // Accusé de réception manuel
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } 
        catch (Exception e) {
            log.info("Exception dans handleMessage, rejet sans réinsertion --- ID du message ---{}", 
                    headers.get("spring_returned_message_correlation"));
            
            // Exception, false pour ne pas réinsérer dans la file, true pour réinsérer
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }

    /**
     * Consommateur de lettres mortes
     */
    @RabbitListener(queues = "notification.dead.letter")
    public void handleDeadLetterMessage(Message message, Channel channel, 
                                      @Headers Map<string object=""> headers) throws IOException {
        
        // Enregistrement en base de données possible pour un suivi et une alerte
        log.info("Réception d'une lettre morte :---{}---ID du message---{}", 
                new String(message.getBody()), headers.get("spring_returned_message_correlation"));
        
        // Envoyer un accusé de réception
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
</string></string>

Les Files d'Attente Retardées (Delay Queues)

Les files d'attente retardées, comme leur nom l'indique, ne traitent pas les messages immédiatement. Les messages envoyés par l'émetteur ne sont disponibles pour le consommateur qu'après un délai spécifié. Un scénario d'application typique est la fermeture automatique d'une commande non payée sous 30 minutes. Bien que cela puisse être implémenté avec des files d'attentes mortes, nous nous concentrerons ici sur la logique de consommation retardée des messages.

Configuration des Files d'Attente Retardées et des Échangeurs

Avant d'utiliser les files d'attente retardées, il est nécessaire d'installer le plugin de file d'attente retardée. L'installation de ce plugin a été décrite précédemment.

Configuration YAML


spring:
    rabbitmq:
        host: 192.168.99.12
        port: 5672
        username: guest
        password: guest
        # Confirmation d'envoi
        publisher-confirms: true
        # Rappel en cas d'échec de routage
        publisher-returns: true
        template:
            # Doit être true pour notifier l'écouteur en cas d'échec de routage, false pour ignorer le message
            mandatory: true
        # Consommateur
        listener:
            simple:
                # Nombre de messages récupérés de RabbitMQ à chaque fois
                prefetch: 1
                default-requeue-rejected: false
                # Nombre de consommateurs démarrés par file d'attente
                concurrency: 1
                # Nombre maximum de consommateurs par file d'attente
                max-concurrency: 1
                # Mode d'accusé de réception manuel
                acknowledge-mode: manual
                
# File d'attente de messages
message:
    queue:
        name: notification.messages

# Nom de l'échangeur de messages
exchange:
    name: notificationTopicExchange

# File d'attente morte
dead:
    letter:
        queue:
            name: notification.dead.letter
        exchange:
            name: notificationDeadLetterTopicExchange

# File d'attente retardée
delay:
    queue:
        name: notification.delay
    exchange:
        name: notificationDelayTopicExchange

Configuration Java de la File d'Attente Retardée


/**
 * Configuration RabbitMQ
 */
@Configuration
@Slf4j
public class RabbitmqConfiguration {

    @Value("${message.queue.name}")
    private String messageQueueName;
    @Value("${exchange.name}")
    private String exchangeName;
    @Value("${dead.letter.queue.name}")
    private String deadLetterQueueName;
    @Value("${dead.letter.exchange.name}")
    private String deadLetterExchangeName;
    @Value("${delay.queue.name}")
    private String delayQueueName;
    @Value("${delay.exchange.name}")
    private String delayExchangeName;

    @Bean
    public Queue messageQueue() {
        Map<string object=""> arguments = new HashMap<>(2);
        // Liaison avec l'échangeur de lettres mortes
        arguments.put("x-dead-letter-exchange", deadLetterExchangeName);
        // Liaison avec la clé de routage des lettres mortes
        arguments.put("x-dead-letter-routing-key", deadLetterQueueName + ".#");

        return new Queue(messageQueueName, true, false, false, arguments);
    }

    @Bean
    TopicExchange messageExchange() {
        return new TopicExchange(exchangeName);
    }

    @Bean
    Binding bindMessageQueue() {
        return BindingBuilder.bind(messageQueue()).to(messageExchange()).with(messageQueueName + ".#");
    }

    @Bean
    public Queue deadLetterQueue() {
        return new Queue(deadLetterQueueName);
    }

    @Bean
    TopicExchange deadLetterExchange() {
        return new TopicExchange(deadLetterExchangeName);
    }

    @Bean
    Binding bindDeadLetterQueue() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueueName + ".#");
    }
    
    // File d'attente retardée
    @Bean
    public Queue delayQueue() {
        return new Queue(delayQueueName);
    }

    @Bean
    CustomExchange delayExchange() {
        Map<string object=""> args = new HashMap<>();
        args.put("x-delayed-type", "topic");
        // Le deuxième paramètre est le type : doit être x-delayed-message
        return new CustomExchange(delayExchangeName, "x-delayed-message", true, false, args);
    }

    @Bean
    Binding bindDelayQueue() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(delayQueueName + ".#").noargs();
    }
}
</string></string>

Producteur de Messages Retardés

Pour des raisons pratiques, nous utiliserons un délai de 2 minutes au lieu de 30 minutes pour observer l'effet.


@Configuration
@EnableScheduling
@Slf4j
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${exchange.name}")
    private String topicExchange;

    @Value("${delay.exchange.name}")
    private String delayExchangeName;

    @Scheduled(cron = "0 0/1 * * * ?")
    public void sendNotificationMessage() {
        String content = RandomStringUtils.randomAlphanumeric(8);
        JSONObject notification = new JSONObject();
        notification.put("content", content);
        notification.put("recipient", "test@example.com");
        
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(topicExchange, "notification.message.x", 
                                    notification.toJSONString(), correlationData);
        
        log.info("--- Envoi du message de notification ---{}---ID du message---{}", 
                notification, correlationData.getId());
    }

    @Scheduled(cron = "0 0/1 * * * ?")
    public void sendDelayedOrderMessage() throws Exception {
        // Numéro de commande - en réalité obtenu après enregistrement de la commande
        String orderId = UUID.randomUUID().toString();
        
        // Simuler les informations de commande
        JSONObject order = new JSONObject();
        order.put("orderId", orderId);
        order.put("product", "abonnement VIP");
        order.put("amount", "99.00");
        
        CorrelationData correlationData = new CorrelationData(orderId);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(orderId);
        
        // 30 minutes serait trop long, nous utilisons 120 secondes pour la démonstration
        messageProperties.setHeader("x-delay", 120000);
        
        Message message = new Message(order.toJSONString().getBytes(CharEncoding.UTF_8), messageProperties);

        rabbitTemplate.convertAndSend(delayExchangeName, "notification.delay.x", 
                                    message, correlationData);

        log.info("--- Envoi du message de commande ---{}---ID de commande---{}", 
                order, correlationData.getId());
    }
}

Consommateur de Messages Retardés


@Component
@Slf4j
public class MessageConsumer {

    /**
     * Consommateur de messages de notification
     */
    @RabbitListener(queues = "notification.messages")
    @RabbitHandler
    public void handleMessage(Message message, Channel channel, 
                            @Headers Map<string object=""> headers) throws IOException {
        
        try {
            String msgContent = new String(message.getBody(), CharEncoding.UTF_8);
            JSONObject jsonObject = JSON.parseObject(msgContent);
            jsonObject.put("messageId", headers.get("spring_returned_message_correlation"));
            log.info("--- Réception du message ---{}", jsonObject);
            
            // Simuler une exception
            int result = 1 / 0;
            
            // Accusé de réception manuel
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } 
        catch (Exception e) {
            log.info("Exception dans handleMessage, rejet sans réinsertion --- ID du message ---{}", 
                    headers.get("spring_returned_message_correlation"));
            
            // Exception, false pour ne pas réinsérer dans la file, true pour réinsérer
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }

    /**
     * Consommateur de lettres mortes
     */
    @RabbitListener(queues = "notification.dead.letter")
    public void handleDeadLetterMessage(Message message, Channel channel, 
                                      @Headers Map<string object=""> headers) throws IOException {
        
        log.info("Réception d'une lettre morte :---{}---ID du message---{}", 
                new String(message.getBody()), headers.get("spring_returned_message_correlation"));
        
        // Envoyer un accusé de réception
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    /**
     * Consommateur de file d'attente retardée
     */
    @RabbitListener(queues = "notification.delay")
    @RabbitHandler
    public void handleDelayedOrderMessage(Message message, Channel channel, 
                                       @Headers Map<string object=""> headers) throws IOException {
        
        try {
            String msgContent = new String(message.getBody(), CharEncoding.UTF_8);
            JSONObject order = JSON.parseObject(msgContent);
            
            log.info("--- Réception du message de commande retardée---ID de commande---{}", 
                    message.getMessageProperties().getMessageId());
            log.info("--- Informations de commande---{}---", order);
            
            // Logique métier : vérifier si la commande est payée
            // Si non payée, marquer comme fermée
            // Si déjà payée, ne rien faire
            
            // Accusé de réception manuel
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } 
        catch (Exception e) {
            log.info("Exception dans handleDelayedOrderMessage, réinsertion dans la file---ID de commande---{}", 
                    headers.get("spring_returned_message_correlation"));
            
            // Exception, true pour réinsérer dans la file, false pour envoyer à la lettre morte
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}
</string></string></string>

Les résultats d'exécution montrent que pour un même ID de commande, le message n'est reçu par le consommateur qu'après 2 minutes, conformément aux attentes.

Étiquettes: rabittmq files-attente dead-letter queues delay-queues

Publié le 8 juin à 04h54