Les Files d'Attente Mortes (Dead Letter Queues)
Les files d'attente mortes ne diffèrent pas fondamentalement des files d'attente standard. Elles nécessitent la création d'une Queue et d'un Exchange, suivis d'un lien via un RoutingKey. La particularité des files d'attente mortes est que leur RoutingKey et leur Exchange sont configurés comme paramètres liés à la file d'attente principale.
Un scénario d'utilisation courant est lorsque les messages d'une file d'attente normale sont rejetés avec basicNack ou reject, ils sont alors routés vers la file d'attente morte associée. Un autre scénario fréquent est lorsque le consommateur rencontre des exceptions lors du traitement des messages avec accusé de réception automatiuqe activé, dépassant le nombre de tentatives de reprise. Dans ce cas, le message est également transféré vers la file d'attente morte si celle-ci est configurée.
Configuration des Files d'Attente Mortes et des Échangeurs
Dans cet exemple, nous utilisons deux files d'attente : la file d'attente métier messageQueue et sa file d'attente morte associée. Nous démontrerons le rejet manuel des messages avec accusé de réception manuel.
Configuration YAML
spring:
rabbitmq:
host: 192.168.99.12
port: 5672
username: guest
password: guest
# Confirmation d'envoi
publisher-confirms: true
# Rappel en cas d'échec de routage
publisher-returns: true
template:
# Doit être true pour notifier l'écouteur en cas d'échec de routage, false pour ignorer le message
mandatory: true
listener:
simple:
# Nombre de messages récupérés de RabbitMQ à chaque fois
prefetch: 1
default-requeue-rejected: false
# Nombre de consommateurs démarrés par file d'attente
concurrency: 1
# Nombre maximum de consommateurs par file d'attente
max-concurrency: 1
# Mode d'accusé de réception manuel - nécessite un ACK manuel dans le code
acknowledge-mode: manual
# File d'attente de messages
message:
queue:
name: notification.messages
# Nom de l'échangeur de messages
exchange:
name: notificationTopicExchange
# File d'attente morte
dead:
letter:
queue:
name: notification.dead.letter
exchange:
name: notificationDeadLetterTopicExchange
Configuration Java de la File d'Attente Morte
/**
* Configuration RabbitMQ
*/
@Configuration
@Slf4j
public class RabbitmqConfiguration {
@Value("${message.queue.name}")
private String messageQueueName;
@Value("${exchange.name}")
private String exchangeName;
@Value("${dead.letter.queue.name}")
private String deadLetterQueueName;
@Value("${dead.letter.exchange.name}")
private String deadLetterExchangeName;
@Bean
public Queue messageQueue() {
Map<string object=""> arguments = new HashMap<>(2);
// Liaison avec l'échangeur de lettres mortes
arguments.put("x-dead-letter-exchange", deadLetterExchangeName);
// Liaison avec la clé de routage des lettres mortes
arguments.put("x-dead-letter-routing-key", deadLetterQueueName + ".#");
return new Queue(messageQueueName, true, false, false, arguments);
}
@Bean
TopicExchange messageExchange() {
return new TopicExchange(exchangeName);
}
@Bean
Binding bindMessageQueue() {
return BindingBuilder.bind(messageQueue()).to(messageExchange()).with(messageQueueName + ".#");
}
@Bean
public Queue deadLetterQueue() {
return new Queue(deadLetterQueueName);
}
@Bean
TopicExchange deadLetterExchange() {
return new TopicExchange(deadLetterExchangeName);
}
@Bean
Binding bindDeadLetterQueue() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueueName + ".#");
}
}
</string>
Producteur de Messages
@Configuration
@EnableScheduling
@Slf4j
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${exchange.name}")
private String topicExchange;
@Scheduled(cron = "0 0/2 * * * ?")
public void sendNotificationMessage() {
String content = RandomStringUtils.randomAlphanumeric(8);
JSONObject notification = new JSONObject();
notification.put("content", content);
notification.put("recipient", "test@example.com");
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(topicExchange, "notification.message.x",
notification.toJSONString(), correlationData);
log.info("--- Envoi du message de notification ---{}---ID du message---{}",
notification, correlationData.getId());
}
}
Consommateur de Messages
@Component
@Slf4j
public class MessageConsumer {
/**
* Consommateur de notifications
*/
@RabbitListener(queues = "notification.messages")
@RabbitHandler
public void handleMessage(Message message, Channel channel,
@Headers Map<string object=""> headers) throws IOException {
try {
String msgContent = new String(message.getBody(), CharEncoding.UTF_8);
JSONObject jsonObject = JSON.parseObject(msgContent);
jsonObject.put("messageId", headers.get("spring_returned_message_correlation"));
log.info("--- Réception du message ---{}", jsonObject);
// Simuler une exception
int result = 1 / 0;
// Accusé de réception manuel
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
catch (Exception e) {
log.info("Exception dans handleMessage, rejet sans réinsertion --- ID du message ---{}",
headers.get("spring_returned_message_correlation"));
// Exception, false pour ne pas réinsérer dans la file, true pour réinsérer
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
/**
* Consommateur de lettres mortes
*/
@RabbitListener(queues = "notification.dead.letter")
public void handleDeadLetterMessage(Message message, Channel channel,
@Headers Map<string object=""> headers) throws IOException {
// Enregistrement en base de données possible pour un suivi et une alerte
log.info("Réception d'une lettre morte :---{}---ID du message---{}",
new String(message.getBody()), headers.get("spring_returned_message_correlation"));
// Envoyer un accusé de réception
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
</string></string>
Les Files d'Attente Retardées (Delay Queues)
Les files d'attente retardées, comme leur nom l'indique, ne traitent pas les messages immédiatement. Les messages envoyés par l'émetteur ne sont disponibles pour le consommateur qu'après un délai spécifié. Un scénario d'application typique est la fermeture automatique d'une commande non payée sous 30 minutes. Bien que cela puisse être implémenté avec des files d'attentes mortes, nous nous concentrerons ici sur la logique de consommation retardée des messages.
Configuration des Files d'Attente Retardées et des Échangeurs
Avant d'utiliser les files d'attente retardées, il est nécessaire d'installer le plugin de file d'attente retardée. L'installation de ce plugin a été décrite précédemment.
Configuration YAML
spring:
rabbitmq:
host: 192.168.99.12
port: 5672
username: guest
password: guest
# Confirmation d'envoi
publisher-confirms: true
# Rappel en cas d'échec de routage
publisher-returns: true
template:
# Doit être true pour notifier l'écouteur en cas d'échec de routage, false pour ignorer le message
mandatory: true
# Consommateur
listener:
simple:
# Nombre de messages récupérés de RabbitMQ à chaque fois
prefetch: 1
default-requeue-rejected: false
# Nombre de consommateurs démarrés par file d'attente
concurrency: 1
# Nombre maximum de consommateurs par file d'attente
max-concurrency: 1
# Mode d'accusé de réception manuel
acknowledge-mode: manual
# File d'attente de messages
message:
queue:
name: notification.messages
# Nom de l'échangeur de messages
exchange:
name: notificationTopicExchange
# File d'attente morte
dead:
letter:
queue:
name: notification.dead.letter
exchange:
name: notificationDeadLetterTopicExchange
# File d'attente retardée
delay:
queue:
name: notification.delay
exchange:
name: notificationDelayTopicExchange
Configuration Java de la File d'Attente Retardée
/**
* Configuration RabbitMQ
*/
@Configuration
@Slf4j
public class RabbitmqConfiguration {
@Value("${message.queue.name}")
private String messageQueueName;
@Value("${exchange.name}")
private String exchangeName;
@Value("${dead.letter.queue.name}")
private String deadLetterQueueName;
@Value("${dead.letter.exchange.name}")
private String deadLetterExchangeName;
@Value("${delay.queue.name}")
private String delayQueueName;
@Value("${delay.exchange.name}")
private String delayExchangeName;
@Bean
public Queue messageQueue() {
Map<string object=""> arguments = new HashMap<>(2);
// Liaison avec l'échangeur de lettres mortes
arguments.put("x-dead-letter-exchange", deadLetterExchangeName);
// Liaison avec la clé de routage des lettres mortes
arguments.put("x-dead-letter-routing-key", deadLetterQueueName + ".#");
return new Queue(messageQueueName, true, false, false, arguments);
}
@Bean
TopicExchange messageExchange() {
return new TopicExchange(exchangeName);
}
@Bean
Binding bindMessageQueue() {
return BindingBuilder.bind(messageQueue()).to(messageExchange()).with(messageQueueName + ".#");
}
@Bean
public Queue deadLetterQueue() {
return new Queue(deadLetterQueueName);
}
@Bean
TopicExchange deadLetterExchange() {
return new TopicExchange(deadLetterExchangeName);
}
@Bean
Binding bindDeadLetterQueue() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueueName + ".#");
}
// File d'attente retardée
@Bean
public Queue delayQueue() {
return new Queue(delayQueueName);
}
@Bean
CustomExchange delayExchange() {
Map<string object=""> args = new HashMap<>();
args.put("x-delayed-type", "topic");
// Le deuxième paramètre est le type : doit être x-delayed-message
return new CustomExchange(delayExchangeName, "x-delayed-message", true, false, args);
}
@Bean
Binding bindDelayQueue() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(delayQueueName + ".#").noargs();
}
}
</string></string>
Producteur de Messages Retardés
Pour des raisons pratiques, nous utiliserons un délai de 2 minutes au lieu de 30 minutes pour observer l'effet.
@Configuration
@EnableScheduling
@Slf4j
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${exchange.name}")
private String topicExchange;
@Value("${delay.exchange.name}")
private String delayExchangeName;
@Scheduled(cron = "0 0/1 * * * ?")
public void sendNotificationMessage() {
String content = RandomStringUtils.randomAlphanumeric(8);
JSONObject notification = new JSONObject();
notification.put("content", content);
notification.put("recipient", "test@example.com");
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(topicExchange, "notification.message.x",
notification.toJSONString(), correlationData);
log.info("--- Envoi du message de notification ---{}---ID du message---{}",
notification, correlationData.getId());
}
@Scheduled(cron = "0 0/1 * * * ?")
public void sendDelayedOrderMessage() throws Exception {
// Numéro de commande - en réalité obtenu après enregistrement de la commande
String orderId = UUID.randomUUID().toString();
// Simuler les informations de commande
JSONObject order = new JSONObject();
order.put("orderId", orderId);
order.put("product", "abonnement VIP");
order.put("amount", "99.00");
CorrelationData correlationData = new CorrelationData(orderId);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(orderId);
// 30 minutes serait trop long, nous utilisons 120 secondes pour la démonstration
messageProperties.setHeader("x-delay", 120000);
Message message = new Message(order.toJSONString().getBytes(CharEncoding.UTF_8), messageProperties);
rabbitTemplate.convertAndSend(delayExchangeName, "notification.delay.x",
message, correlationData);
log.info("--- Envoi du message de commande ---{}---ID de commande---{}",
order, correlationData.getId());
}
}
Consommateur de Messages Retardés
@Component
@Slf4j
public class MessageConsumer {
/**
* Consommateur de messages de notification
*/
@RabbitListener(queues = "notification.messages")
@RabbitHandler
public void handleMessage(Message message, Channel channel,
@Headers Map<string object=""> headers) throws IOException {
try {
String msgContent = new String(message.getBody(), CharEncoding.UTF_8);
JSONObject jsonObject = JSON.parseObject(msgContent);
jsonObject.put("messageId", headers.get("spring_returned_message_correlation"));
log.info("--- Réception du message ---{}", jsonObject);
// Simuler une exception
int result = 1 / 0;
// Accusé de réception manuel
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
catch (Exception e) {
log.info("Exception dans handleMessage, rejet sans réinsertion --- ID du message ---{}",
headers.get("spring_returned_message_correlation"));
// Exception, false pour ne pas réinsérer dans la file, true pour réinsérer
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
/**
* Consommateur de lettres mortes
*/
@RabbitListener(queues = "notification.dead.letter")
public void handleDeadLetterMessage(Message message, Channel channel,
@Headers Map<string object=""> headers) throws IOException {
log.info("Réception d'une lettre morte :---{}---ID du message---{}",
new String(message.getBody()), headers.get("spring_returned_message_correlation"));
// Envoyer un accusé de réception
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
/**
* Consommateur de file d'attente retardée
*/
@RabbitListener(queues = "notification.delay")
@RabbitHandler
public void handleDelayedOrderMessage(Message message, Channel channel,
@Headers Map<string object=""> headers) throws IOException {
try {
String msgContent = new String(message.getBody(), CharEncoding.UTF_8);
JSONObject order = JSON.parseObject(msgContent);
log.info("--- Réception du message de commande retardée---ID de commande---{}",
message.getMessageProperties().getMessageId());
log.info("--- Informations de commande---{}---", order);
// Logique métier : vérifier si la commande est payée
// Si non payée, marquer comme fermée
// Si déjà payée, ne rien faire
// Accusé de réception manuel
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
catch (Exception e) {
log.info("Exception dans handleDelayedOrderMessage, réinsertion dans la file---ID de commande---{}",
headers.get("spring_returned_message_correlation"));
// Exception, true pour réinsérer dans la file, false pour envoyer à la lettre morte
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
</string></string></string>
Les résultats d'exécution montrent que pour un même ID de commande, le message n'est reçu par le consommateur qu'après 2 minutes, conformément aux attentes.