Traitement aysnchrone des pics de requêtes avec MQ pour résoudre les problèmes de débit
Évolution du diagramme temporel d'achat de billets
Version originale :
Version améliorée :
Avantage de MQ par rapport à l'asynchronisme Spring Boot : Résistance à la perte de messages due à des exceptions intermédiaires
Initiation à RocketMQ
Installation
[Les étapes de configuration détaillées pour le téléchargement, l'ajustement des paramètres mémoire et la résolution des problèmes sous Windows sont omises ici pour la concision]
Configuration et démarrage
[Instructions détaillées sur la configuration des variables d'environnement, le démarrage des composants NameServer et Broker, ainsi que les solutions aux erreurs courantes sous Windows]
Console de gestion RocketMQ
La console peut être déployée via Docker ou en compilant le projet Maven. Nous utiliserons la seconde méthode.
- Téléchargement du code source depuis le site officiel
- Démarrage via IDE avec JDK 1.8 et configuration spécifique des paramètres VM et du port
Découpage du processus d'achat en deux parties avec RocketMQ
La logique est divisée ainsi : l'interface de commande initiale ne traite que la validation, le verrouilage des jetons et la sauvegarde des informations de commande initiales. Le verrouillage des trains et la logique réelle de sélection de sièges sont déplacées vers le consommateur de messages.
Configuration des dépendances
Ajout de la dépendance RocketMQ au fichier POM parent et au module métier.
Configuration de l'application
Configuration de l'adresse du serveur RocketMQ et du groupe de producteurs dans les propriétés de l'application.
Définition des énumérations de sujets
package com.example.train.business.enums;
public enum RocketMQTopicEnum {
CONFIRM_ORDER_TOPIC("CONFIRM_ORDER", "File d'attente pour la confirmation de commande");
private final String code;
private final String description;
RocketMQTopicEnum(String code, String description) {
this.code = code;
this.description = description;
}
// Getters et toString() personnalisé
}
Service de pré-traitement de la commande (Producteur)
package com.example.train.business.service;
import com.alibaba.fastjson.JSON;
import com.example.train.business.domain.ConfirmOrder;
import com.example.train.business.enums.ConfirmOrderStatusEnum;
import com.example.train.business.enums.RocketMQTopicEnum;
import com.example.train.business.mapper.ConfirmOrderMapper;
import com.example.train.business.req.ConfirmOrderDoReq;
import com.example.train.business.req.ConfirmOrderTicketReq;
import com.example.train.common.context.LoginMemberContext;
import com.example.train.common.exception.BusinessException;
import com.example.train.common.exception.BusinessExceptionEnum;
import com.example.train.common.util.SnowUtil;
import jakarta.annotation.Resource;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
@Service
public class OrderPreProcessingService {
private static final Logger LOG = LoggerFactory.getLogger(OrderPreProcessingService.class);
@Resource
private ConfirmOrderMapper orderMapper;
@Resource
private SkTokenService tokenService;
@Resource
private RocketMQTemplate mqTemplate;
public void preProcessOrder(ConfirmOrderDoReq request) {
request.setMemberId(LoginMemberContext.getId());
// Validation du jeton de surcharge
boolean isTokenValid = tokenService.validateToken(
request.getDate(),
request.getTrainCode(),
LoginMemberContext.getId()
);
if (!isTokenValid) {
throw new BusinessException(BusinessExceptionEnum.ORDER_TOKEN_INVALID);
}
// Création et enregistrement de la commande initiale
ConfirmOrder order = new ConfirmOrder();
order.setId(SnowUtil.getSnowflakeNextId());
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
order.setMemberId(request.getMemberId());
order.setDate(request.getDate());
order.setTrainCode(request.getTrainCode());
order.setStart(request.getStart());
order.setEnd(request.getEnd());
order.setDailyTrainTicketId(request.getDailyTrainTicketId());
order.setStatus(ConfirmOrderStatusEnum.INIT.getCode());
order.setTickets(JSON.toJSONString(request.getTickets()));
orderMapper.insert(order);
// Envoi du message MQ pour traitement asynchrone
String messagePayload = JSON.toJSONString(request);
LOG.info("Envoi du message MQ pour mise en file d'attente: {}", messagePayload);
mqTemplate.convertAndSend(
RocketMQTopicEnum.CONFIRM_ORDER_TOPIC.getCode(),
messagePayload
);
LOG.info("Fin de l'envoi du message MQ");
}
}
Consommateur de messages
package com.example.train.business.mq;
import com.alibaba.fastjson.JSON;
import com.example.train.business.req.ConfirmOrderDoReq;
import com.example.train.business.service.OrderProcessingService;
import jakarta.annotation.Resource;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
consumerGroup = "default",
topic = "CONFIRM_ORDER"
)
public class OrderConfirmConsumer implements RocketMQListener<MessageExt> {
private static final Logger LOG = LoggerFactory.getLogger(OrderConfirmConsumer.class);
@Resource
private OrderProcessingService processingService;
@Override
public void onMessage(MessageExt message) {
String payload = new String(message.getBody());
LOG.info("Message reçu via RocketMQ: {}", payload);
ConfirmOrderDoReq request = JSON.parseObject(payload, ConfirmOrderDoReq.class);
processingService.processOrder(request);
}
}
Ajout d'un identifiant de journalisation pour le suivi des transactions asynchrones
Les threads asynchrones n'héritent pas automatiquement du contexte MDC du thread principal. Nous devons donc propager explicitement l'identifiant de journalisation.
Ajout du champ logId à la requête
// Dans la classe ConfirmOrderDoReq
private String logId;
// Mise à jour de la méthode toString()
@Override
public String toString() {
return "ConfirmOrderDoReq{" +
"memberId=" + memberId +
", date=" + date +
", trainCode='" + trainCode + '\'' +
", logId='" + logId + '\'' +
// ... autres champs
'}';
}
Modification du service de pré-traitemant
// Dans OrderPreProcessingService.preProcessOrder()
request.setLogId(MDC.get("LOG_ID"));
Modification du consommateur
// Dans OrderConfirmConsumer.onMessage()
MDC.put("LOG_ID", request.getLogId());
Implémentation de la fonctionnalité de mise en file d'attente pour l'émission de billets
Modification du contenu du message MQ
Le message ne contient que les informations nécessaires pour identifier le train, pas les détails spécifiques à chaque utilisateur.
package com.example.train.business.dto;
import lombok.Data;
import java.util.Date;
@Data
public class OrderMQMessage {
private String logId;
private Date travelDate;
private String trainNumber;
}
Service de traitement des commandes avec logique de mise en file d'attente
package com.example.train.business.service;
// ... imports
@Service
public class OrderProcessingService {
// ... injections de dépendances
public void processOrder(OrderMQMessage message) {
String lockKey = RedisKeyPreEnum.ORDER_LOCK + "-" +
DateUtil.formatDate(message.getTravelDate()) + "-" +
message.getTrainNumber();
Boolean lockAcquired = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockKey, 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(lockAcquired)) {
LOG.info("Verrou acquis pour le train {}", message.getTrainNumber());
try {
processQueuedOrders(message.getTravelDate(), message.getTrainNumber());
} finally {
redisTemplate.delete(lockKey);
LOG.info("Verrou libéré pour le train {}", message.getTrainNumber());
}
} else {
LOG.info("Verrou non acquis, autre thread en cours de traitement");
}
}
private void processQueuedOrders(Date travelDate, String trainNumber) {
while (true) {
ConfirmOrderExample example = new ConfirmOrderExample();
example.setOrderByClause("id ASC");
example.createCriteria()
.andTravelDateEqualTo(travelDate)
.andTrainNumberEqualTo(trainNumber)
.andStatusEqualTo(ConfirmOrderStatusEnum.INIT.getCode());
PageHelper.startPage(1, 5);
List<ConfirmOrder> pendingOrders = orderMapper.selectByExampleWithBLOBs(example);
if (CollUtil.isEmpty(pendingOrders)) {
LOG.info("Aucune commande en attente, fin du traitement");
break;
}
LOG.info("Traitement de {} commandes", pendingOrders.size());
pendingOrders.forEach(this::processSingleOrder);
}
}
private void processSingleOrder(ConfirmOrder order) {
// Mise à jour du statut pour éviter le traitement multiple
order.setStatus(ConfirmOrderStatusEnum.PROCESSING.getCode());
updateOrderStatus(order);
try {
// Logique de sélection de siège et émission de billet
emitTicketsForOrder(order);
} catch (BusinessException e) {
if (e.getErrorCode().equals(BusinessExceptionEnum.INSUFFICIENT_TICKETS)) {
order.setStatus(ConfirmOrderStatusEnum.EMPTY.getCode());
updateOrderStatus(order);
} else {
throw e;
}
}
}
// ... autres méthodes de traitement
}
Ajout de la fonctionnalité de consultation de position dans la file d'attente
Méthode de service pour obtenir la position
public Integer getQueuePosition(Long orderId) {
ConfirmOrder order = orderMapper.selectByPrimaryKey(orderId);
ConfirmOrderStatusEnum status = ConfirmOrderStatusEnum.getByCode(order.getStatus());
switch (status) {
case SUCCESS: return -1; // Billet émis avec succès
case FAILURE: return -2; // Échec de l'émission
case EMPTY: return -3; // Plus de billets disponibles
case CANCEL: return -4; // Commande annulée
case PROCESSING: return 0; // En cours de traitement
default: return calculateQueuePosition(order);
}
}
private Integer calculateQueuePosition(ConfirmOrder order) {
ConfirmOrderExample example = new ConfirmOrderExample();
example.or()
.andTravelDateEqualTo(order.getTravelDate())
.andTrainNumberEqualTo(order.getTrainNumber())
.andCreateTimeLessThanOrEqualTo(order.getCreateTime())
.andStatusEqualTo(ConfirmOrderStatusEnum.INIT.getCode());
long count = orderMapper.countByExample(example);
return count > 0 ? (int) count - 1 : 0;
}
Implémentation du polling côté client pour obtenir les résultats
Ajout d'un mécanisme de consultation périodique côté frontend pour afficher la progression dans la file d'attente et le résultat final de l'émission.
// Dans le composant Vue.js
const queryQueuePosition = () => {
queuePosition.value = -1; // Valeur initiale "en cours de traitement"
pollingInterval = setInterval(() => {
axios.get(`/api/orders/queue-position/${orderId.value}`)
.then(response => {
const position = response.data.content;
switch(position) {
case -1: // Succès
notification.success({description: "Billet émis avec succès!"});
stopPolling();
break;
case -2: // Échec
notification.error({description: "Échec de l'émission du billet"});
stopPolling();
break;
case -3: // Plus de billets
notification.error({description: "Plus de billets disponibles"});
stopPolling();
break;
default: // Position dans la file
queuePosition.value = position;
}
});
}, 500); // Intervalle de 500ms
};
Tests de performance avec JMeter
Configuration de 500 threads simultanés pour simuler des requêtes d'achat. Les résultats démontrent :
- Tous les billets vendus sans survente
- Aucune perte de vente
- Temps de réponse acceptable sous charge
Configuration de l'environnement de développement
Pour faciliter les tests de charge, l'environnement de développement désactive les vérifications de sécurité :
// Désactivation des vérifications d'image captcha en dev
if (!environment.equals("dev")) {
// Validation du captcha
}
// Désactivation de la vérification du token de surcharge en dev
if (!environment.equals("dev")) {
// Validation du token
}