Implémentation d'un système de billetterie haute performance avec SpringBoot 3.0 et RocketMQ

Traitement aysnchrone des pics de requêtes avec MQ pour résoudre les problèmes de débit

Évolution du diagramme temporel d'achat de billets

Version originale :

Version améliorée :

Avantage de MQ par rapport à l'asynchronisme Spring Boot : Résistance à la perte de messages due à des exceptions intermédiaires

Initiation à RocketMQ

Installation

[Les étapes de configuration détaillées pour le téléchargement, l'ajustement des paramètres mémoire et la résolution des problèmes sous Windows sont omises ici pour la concision]

Configuration et démarrage

[Instructions détaillées sur la configuration des variables d'environnement, le démarrage des composants NameServer et Broker, ainsi que les solutions aux erreurs courantes sous Windows]

Console de gestion RocketMQ

La console peut être déployée via Docker ou en compilant le projet Maven. Nous utiliserons la seconde méthode.

  1. Téléchargement du code source depuis le site officiel
  2. Démarrage via IDE avec JDK 1.8 et configuration spécifique des paramètres VM et du port

Découpage du processus d'achat en deux parties avec RocketMQ

La logique est divisée ainsi : l'interface de commande initiale ne traite que la validation, le verrouilage des jetons et la sauvegarde des informations de commande initiales. Le verrouillage des trains et la logique réelle de sélection de sièges sont déplacées vers le consommateur de messages.

Configuration des dépendances

Ajout de la dépendance RocketMQ au fichier POM parent et au module métier.

Configuration de l'application

Configuration de l'adresse du serveur RocketMQ et du groupe de producteurs dans les propriétés de l'application.

Définition des énumérations de sujets

package com.example.train.business.enums;

public enum RocketMQTopicEnum {
    CONFIRM_ORDER_TOPIC("CONFIRM_ORDER", "File d'attente pour la confirmation de commande");

    private final String code;
    private final String description;

    RocketMQTopicEnum(String code, String description) {
        this.code = code;
        this.description = description;
    }

    // Getters et toString() personnalisé
}

Service de pré-traitement de la commande (Producteur)

package com.example.train.business.service;

import com.alibaba.fastjson.JSON;
import com.example.train.business.domain.ConfirmOrder;
import com.example.train.business.enums.ConfirmOrderStatusEnum;
import com.example.train.business.enums.RocketMQTopicEnum;
import com.example.train.business.mapper.ConfirmOrderMapper;
import com.example.train.business.req.ConfirmOrderDoReq;
import com.example.train.business.req.ConfirmOrderTicketReq;
import com.example.train.common.context.LoginMemberContext;
import com.example.train.common.exception.BusinessException;
import com.example.train.common.exception.BusinessExceptionEnum;
import com.example.train.common.util.SnowUtil;
import jakarta.annotation.Resource;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;

@Service
public class OrderPreProcessingService {
    private static final Logger LOG = LoggerFactory.getLogger(OrderPreProcessingService.class);

    @Resource
    private ConfirmOrderMapper orderMapper;
    @Resource
    private SkTokenService tokenService;
    @Resource
    private RocketMQTemplate mqTemplate;

    public void preProcessOrder(ConfirmOrderDoReq request) {
        request.setMemberId(LoginMemberContext.getId());

        // Validation du jeton de surcharge
        boolean isTokenValid = tokenService.validateToken(
            request.getDate(), 
            request.getTrainCode(), 
            LoginMemberContext.getId()
        );
        if (!isTokenValid) {
            throw new BusinessException(BusinessExceptionEnum.ORDER_TOKEN_INVALID);
        }

        // Création et enregistrement de la commande initiale
        ConfirmOrder order = new ConfirmOrder();
        order.setId(SnowUtil.getSnowflakeNextId());
        order.setCreateTime(new Date());
        order.setUpdateTime(new Date());
        order.setMemberId(request.getMemberId());
        order.setDate(request.getDate());
        order.setTrainCode(request.getTrainCode());
        order.setStart(request.getStart());
        order.setEnd(request.getEnd());
        order.setDailyTrainTicketId(request.getDailyTrainTicketId());
        order.setStatus(ConfirmOrderStatusEnum.INIT.getCode());
        order.setTickets(JSON.toJSONString(request.getTickets()));
        orderMapper.insert(order);

        // Envoi du message MQ pour traitement asynchrone
        String messagePayload = JSON.toJSONString(request);
        LOG.info("Envoi du message MQ pour mise en file d'attente: {}", messagePayload);
        mqTemplate.convertAndSend(
            RocketMQTopicEnum.CONFIRM_ORDER_TOPIC.getCode(), 
            messagePayload
        );
        LOG.info("Fin de l'envoi du message MQ");
    }
}

Consommateur de messages

package com.example.train.business.mq;

import com.alibaba.fastjson.JSON;
import com.example.train.business.req.ConfirmOrderDoReq;
import com.example.train.business.service.OrderProcessingService;
import jakarta.annotation.Resource;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
    consumerGroup = "default", 
    topic = "CONFIRM_ORDER"
)
public class OrderConfirmConsumer implements RocketMQListener<MessageExt> {
    private static final Logger LOG = LoggerFactory.getLogger(OrderConfirmConsumer.class);

    @Resource
    private OrderProcessingService processingService;

    @Override
    public void onMessage(MessageExt message) {
        String payload = new String(message.getBody());
        LOG.info("Message reçu via RocketMQ: {}", payload);
        ConfirmOrderDoReq request = JSON.parseObject(payload, ConfirmOrderDoReq.class);
        processingService.processOrder(request);
    }
}

Ajout d'un identifiant de journalisation pour le suivi des transactions asynchrones

Les threads asynchrones n'héritent pas automatiquement du contexte MDC du thread principal. Nous devons donc propager explicitement l'identifiant de journalisation.

Ajout du champ logId à la requête

// Dans la classe ConfirmOrderDoReq
private String logId;

// Mise à jour de la méthode toString()
@Override
public String toString() {
    return "ConfirmOrderDoReq{" +
        "memberId=" + memberId +
        ", date=" + date +
        ", trainCode='" + trainCode + '\'' +
        ", logId='" + logId + '\'' +
        // ... autres champs
        '}';
}

Modification du service de pré-traitemant

// Dans OrderPreProcessingService.preProcessOrder()
request.setLogId(MDC.get("LOG_ID"));

Modification du consommateur

// Dans OrderConfirmConsumer.onMessage()
MDC.put("LOG_ID", request.getLogId());

Implémentation de la fonctionnalité de mise en file d'attente pour l'émission de billets

Modification du contenu du message MQ

Le message ne contient que les informations nécessaires pour identifier le train, pas les détails spécifiques à chaque utilisateur.

package com.example.train.business.dto;

import lombok.Data;
import java.util.Date;

@Data
public class OrderMQMessage {
    private String logId;
    private Date travelDate;
    private String trainNumber;
}

Service de traitement des commandes avec logique de mise en file d'attente

package com.example.train.business.service;

// ... imports

@Service
public class OrderProcessingService {
    // ... injections de dépendances

    public void processOrder(OrderMQMessage message) {
        String lockKey = RedisKeyPreEnum.ORDER_LOCK + "-" + 
                         DateUtil.formatDate(message.getTravelDate()) + "-" + 
                         message.getTrainNumber();

        Boolean lockAcquired = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, lockKey, 10, TimeUnit.SECONDS);

        if (Boolean.TRUE.equals(lockAcquired)) {
            LOG.info("Verrou acquis pour le train {}", message.getTrainNumber());
            try {
                processQueuedOrders(message.getTravelDate(), message.getTrainNumber());
            } finally {
                redisTemplate.delete(lockKey);
                LOG.info("Verrou libéré pour le train {}", message.getTrainNumber());
            }
        } else {
            LOG.info("Verrou non acquis, autre thread en cours de traitement");
        }
    }

    private void processQueuedOrders(Date travelDate, String trainNumber) {
        while (true) {
            ConfirmOrderExample example = new ConfirmOrderExample();
            example.setOrderByClause("id ASC");
            example.createCriteria()
                .andTravelDateEqualTo(travelDate)
                .andTrainNumberEqualTo(trainNumber)
                .andStatusEqualTo(ConfirmOrderStatusEnum.INIT.getCode());
            
            PageHelper.startPage(1, 5);
            List<ConfirmOrder> pendingOrders = orderMapper.selectByExampleWithBLOBs(example);
            
            if (CollUtil.isEmpty(pendingOrders)) {
                LOG.info("Aucune commande en attente, fin du traitement");
                break;
            }
            
            LOG.info("Traitement de {} commandes", pendingOrders.size());
            pendingOrders.forEach(this::processSingleOrder);
        }
    }

    private void processSingleOrder(ConfirmOrder order) {
        // Mise à jour du statut pour éviter le traitement multiple
        order.setStatus(ConfirmOrderStatusEnum.PROCESSING.getCode());
        updateOrderStatus(order);
        
        try {
            // Logique de sélection de siège et émission de billet
            emitTicketsForOrder(order);
        } catch (BusinessException e) {
            if (e.getErrorCode().equals(BusinessExceptionEnum.INSUFFICIENT_TICKETS)) {
                order.setStatus(ConfirmOrderStatusEnum.EMPTY.getCode());
                updateOrderStatus(order);
            } else {
                throw e;
            }
        }
    }

    // ... autres méthodes de traitement
}

Ajout de la fonctionnalité de consultation de position dans la file d'attente

Méthode de service pour obtenir la position

public Integer getQueuePosition(Long orderId) {
    ConfirmOrder order = orderMapper.selectByPrimaryKey(orderId);
    ConfirmOrderStatusEnum status = ConfirmOrderStatusEnum.getByCode(order.getStatus());
    
    switch (status) {
        case SUCCESS: return -1;   // Billet émis avec succès
        case FAILURE: return -2;   // Échec de l'émission
        case EMPTY:   return -3;   // Plus de billets disponibles
        case CANCEL:  return -4;   // Commande annulée
        case PROCESSING: return 0; // En cours de traitement
        default: return calculateQueuePosition(order);
    }
}

private Integer calculateQueuePosition(ConfirmOrder order) {
    ConfirmOrderExample example = new ConfirmOrderExample();
    example.or()
        .andTravelDateEqualTo(order.getTravelDate())
        .andTrainNumberEqualTo(order.getTrainNumber())
        .andCreateTimeLessThanOrEqualTo(order.getCreateTime())
        .andStatusEqualTo(ConfirmOrderStatusEnum.INIT.getCode());
    
    long count = orderMapper.countByExample(example);
    return count > 0 ? (int) count - 1 : 0;
}

Implémentation du polling côté client pour obtenir les résultats

Ajout d'un mécanisme de consultation périodique côté frontend pour afficher la progression dans la file d'attente et le résultat final de l'émission.

// Dans le composant Vue.js
const queryQueuePosition = () => {
    queuePosition.value = -1; // Valeur initiale "en cours de traitement"
    pollingInterval = setInterval(() => {
        axios.get(`/api/orders/queue-position/${orderId.value}`)
            .then(response => {
                const position = response.data.content;
                switch(position) {
                    case -1: // Succès
                        notification.success({description: "Billet émis avec succès!"});
                        stopPolling();
                        break;
                    case -2: // Échec
                        notification.error({description: "Échec de l'émission du billet"});
                        stopPolling();
                        break;
                    case -3: // Plus de billets
                        notification.error({description: "Plus de billets disponibles"});
                        stopPolling();
                        break;
                    default: // Position dans la file
                        queuePosition.value = position;
                }
            });
    }, 500); // Intervalle de 500ms
};

Tests de performance avec JMeter

Configuration de 500 threads simultanés pour simuler des requêtes d'achat. Les résultats démontrent :

  • Tous les billets vendus sans survente
  • Aucune perte de vente
  • Temps de réponse acceptable sous charge

Configuration de l'environnement de développement

Pour faciliter les tests de charge, l'environnement de développement désactive les vérifications de sécurité :

// Désactivation des vérifications d'image captcha en dev
if (!environment.equals("dev")) {
    // Validation du captcha
}

// Désactivation de la vérification du token de surcharge en dev
if (!environment.equals("dev")) {
    // Validation du token
}

Étiquettes: SpringBoot RocketMQ Haute Performance Concurrency Java

Publié le 13 juin à 17h09