Analyse du code source de RocketMQ : côté consommateur

Longue interrogation (utilisée par RocketMQ)

Consommateur -> Courtier : RocketMQ utilise la longue interrogation pour établir les connexions.

  • Le courtier ignore la capacité de traitement du consommateur
  • La poussage direct des messages surcharge le courtier
  • Une connexion persistante risque d'empêcher le consommateur de traiter les données à temps
  • Le consommateur garde le contrôle de la récupération (pull)

Interrogation courte

Le client envoie des requêtes continues au serveur, chaque requête nécessitant une nouvelle connexion.

Longue interrogation

Le client envoie une requête au serveur. Si des données sont disponibles, elles sont retournées ; sinon, la requête reste en attente sans interrompre la connexion.

Connexion persistante

Une fois établie, la connexion reste permanente, utilisée pour les notifications push.

Côté consommateur

Voici les 5 étapes principales d'analyse du code source :

1. Instanciation du DefaultMQPushConsumer


DefaultMQPushConsumer consommateur = new DefaultMQPushConsumer("groupeConsommation");

2. Configuration de l'adresse du serveur de noms


consommateur.setNamesrvAddr("192.168.88.134:9876");

3. Abonnement à un sujet avec filtrage

La classe DefaultMQPushConsumerImpl implémente réellement les méthodes de DefaultMQPushConsumer.

La méthode subscribe renvoei un SubscriptionData. L'expression de souscription peut être spécifiée ou "*" par défaut.

Après le démarrage du client, MQClientFactory envoie des paquets de heartbeat au courtier.


consommateur.subscribe("sujetTag", "ETIQUETTE-A");

public void subscribe(String sujet, String expression) throws MQClientException {
    this.defaultMQPushConsumerImpl.subscribe(this.avecEspaceDeNoms(sujet), expression);
}

this.rebalanceImpl.getSubscriptionInner().put(sujet, subscriptionData);

if (this.mqClientFactory != null) {
    this.mqClientFactory.sendHeartbeatToAllBrokerWithLock();
}

4. Enregistrement de l'écouteur de messages avec rappel

Par défaut, chaque message n'est consommé qu'une seule fois (modèle point à point). Le statut des messages est maintenu par le courtier.

L'accusé de réception (ACK) renvoie un état de consommation : CONSUME_SUCCESS (succès) ou RECONSUME_LATER (échec, reconsommation ultérieure).

Lorsque le statut RECONSUME_LATER est renvoyé :

  • RocketMQ renvoie le lot de messages au courtier (sujet RETRY du groupe de consommation)
  • Après un délai (10 secondes par défaut, configurable), le message est redistribué à un autre consommateur du même groupe
  • En cas d'échecs répétés (16 par défaut), le message est envoyé dans la file d'attente des messages morts (DLQ) pour intervention manuelle

consommateur.registerMessageListener(...);

5. Démarrage du client de consommation


consommateur.demarrer();
this.defaultMQPushConsumerImpl.demarrer();

Opérations sur l'état du service :


enum EtatService {
    CREE_RECENTEMENT,
    EN_COURS_EXECUTION,
    DEJA_FERME,
    ECHEC_DEMARRAGE
}

Vérification de la configuration et récupération des abonnements :


this.verifierConfiguration();
this.copierAbonnements();

Obtention de l'instance MQClient :


this.mqClientFactory = MQClientManager.getInstance().obtenirOuCreerInstance(this.defaultMQPushConsumer, this.crochetRpc);

Enregistrement du consommateur et démarrage du client :


boolean enregistrementReussi = this.mqClientFactory.enregistrerConsommateur(this.defaultMQPushConsumer.getConsumerGroup(), this);
this.mqClientFactory.demarrer();

Démarrage du service de consommation des messages


this.serviceConsommationMessages.demarrer();

Processus de démarrage de MQClientInstance


this.mqClientAPIImpl.demarrer();
this.demarrerTachesPlanifiees();
this.serviceRecuperationMessages.demarrer();
this.serviceReequilibrage.demarrer();
this.defaultMQProducer.getDefaultMQProducerImpl().demarrer(false);

Démarrage de NettyRemotingClient

Le client Netty démarre avec 4 threads de travail. Il crée un groupe d'exécusion d'événements intégré dans le pipeline Netty.

NettyRemotingClient scanne périodiquement la table de réponses (ConcurrentHashMap de capacité initiale 256) et supprime les requêtes expirées.


this.mqClientAPIImpl.demarrer();
private int threadsTravailClient = 4;

pipeline.addFirst(this.groupeExecuteurEvenementsParDefaut, 
    "gestionnaireSSL", 
    this.contexteSSL.nouveauGestionnaire(canal.alloc()));

this.timer.scheduleAtFixedRate(new TacheTemporelle() {
    public void executer() {
        try {
            NettyRemotingClient.this.scannerTableReponses();
        } catch (Throwable e) {
            NettyRemotingClient.log.error("Exception lors du scan de la table de réponses", e);
        }
    }
}, 3000L, 1000L);

protected final MapConcurrent<integer futurreponse=""> tableReponses = new ConcurrentHashMap(256);

if (rf.getHorodatageDebut() + rf.getDelaiExpiration() + 1000L <= System.currentTimeMillis()) {
    rf.liberer();
    iterateur.remove();
    listeRf.add(rf);
    log.warn("Requête expirée supprimée : " + rf);
}
</integer>

Si un écouteur d'événements de canal est défini, l'exécuteur d'événements Netty démarre (ServiceThread) :


if (this.ecouteurEvenementsCanal != null) {
    this.executeurEvenementsNetty.demarrer();
}

Démarrage des tâches planifiées

Toutes les 120 secondes, vérifie l'adresse du serveur de noms et la met à jour si nécessaire :


private void demarrerTachesPlanifiees() {
    if (null == this.configClient.getAddrSrvNoms()) {
        this.servicePlanificateur.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    MQClientInstance.this.mqClientAPIImpl.recupererAdresseServeurNoms();
                } catch (Exception e) {
                    MQClientInstance.this.log.error("Exception tâche planifiée : récupération adresse serveur noms", e);
                }
            }
        }, 10000L, 120000L, TimeUnit.MILLISECONDS);
    }
}

Démarrage du service de récupération de messages

Le service de récupération de messages utilise une file bloquante liée (LinkedBlockingQueue) pour les requêtes de récupération.


public void demarrer() {
    log.info("Tentative de démarrage du service {}: démarré={}, dernierThread={}", 
        this.getNomService(), this.demarre.get(), this.thread);
    if (this.demarre.compareAndSet(false, true)) {
        this.arrete = false;
        this.thread = new Thread(this, this.getNomService());
        this.thread.setDaemon(this.estDaemon);
        this.thread.start();
    }
}

final FileBloquanteLiee<requeterecuperation> fileRequetesRecuperation = new LinkedBlockingQueue();
</requeterecuperation>

Boucle principale de récupération des messages :


public void run() {
    this.log.info(this.getNomService() + " service démarré");
    
    while (!this.estArrete()) {
        try {
            RequeteRecuperation requete = (RequeteRecuperation) this.fileRequetesRecuperation.take();
            this.recupererMessage(requete);
        } catch (InterruptedException e) {
            // Ignoré
        } catch (Exception e) {
            this.log.error("Exception méthode run du service de récupération de messages", e);
        }
    }
    
    this.log.info(this.getNomService() + " service terminé");
}

La méthode take() de LinkedBlockingQueue utilise un verrou réentrant interruptible :


public E prendre() throws InterruptedException {
    E element;
    int compteur = -1;
    final AtomicInteger compteurAtomique = this.compteur;
    final ReentrantLock verrouPrise = this.verrouPrise;
    verrouPrise.lockInterruptibly();
    
    try {
        while (compteurAtomique.get() == 0) {
            nonVide.attendre();
        }
        element = defiler();
        compteur = compteurAtomique.getAndDecrement();
        if (compteur > 1) {
            nonVide.signaler();
        }
    } finally {
        verrouPrise.unlock();
    }
    
    if (compteur == capacite) {
        signalerNonPlein();
    }
    return element;
}

Structure d'une RequeteRecuperation :


private String groupeConsommation;
private FileAttenteMessages fileAttenteMessages;
private ProcessusTraitement processusTraitement;
private long prochainDecalage;

Défilement de la requête :


private E defiler() {
    Noeud<e> tete = this.tete;
    Noeud<e> premier = tete.suivant;
    tete.suivant = tete; // Aide au GC
    this.tete = premier;
    E element = premier.element;
    premier.element = null;
    return element;
}
</e></e>

Récupération du consommateur correspondant :


private void recupererMessage(RequeteRecuperation requete) {
    MQConsumerInner consommateur = this.mqClientFactory.selectionnerConsommateur(requete.getGroupeConsommation());
    
    if (consommateur != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consommateur;
        impl.recupererMessage(requete);
    } else {
        this.log.warn("Aucun consommateur correspondant pour la requête {}, abandon", requete);
    }
}

Utilisation d'un rappel asynchrone pour traiter les résultats de récupération :


RappelRecuperation rappel = new RappelRecuperation() {
    public void enSucces(ResultatRecuperation resultat) {
        switch (resultat.getStatut()) {
            case TROUVE:
                requete.setProchainDecalage(resultat.getProchainDecalage());
                // Soumettre la tâche de consommation
                break;
        }
    }
};

public class ResultatRecuperation {
    private final StatutRecuperation statut;
    private final long prochainDecalage;
    private final long decalageMin;
    private final long decalageMax;
    private List<messageext> messagesTrouves;
}
</messageext>

Service de consommation concurrente :


this.executeurConsommation = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getThreadsMinConsommation(),
    this.defaultMQPushConsumer.getThreadsMaxConsommation(),
    60000L, TimeUnit.MILLISECONDS,
    this.fileRequetesConsommation,
    new FabriqueThreadsImpl("ThreadConsommationMessage_")
);

Interface d'écoute concurrente :


public interface EcouteurMessagesConcurrent extends EcouteurMessages {
    StatutConsommationConcurrente consommerMessage(List<messageext> var1, ContexteConcurrent var2);
}
</messageext>

Service de consommation séquentielle :


RequeteConsommation requete = new RequeteConsommation(processusTraitement, fileAttenteMessages);
this.executeurConsommation.soumettre(requete);

Interface d'écoute séquentielle :


public interface EcouteurMessagesOrdonnes extends EcouteurMessages {
    StatutConsommationOrdonnee consommerMessage(List<messageext> var1, ContexteOrdonne var2);
}
</messageext>

Avant et après la consommation, les crochets (hooks) sont exécutés :


if (this.defaultMQPushConsumerImpl.aCrochet()) {
    contexteMessageConsommation.setStatut(statut.toString());
    contexteMessageConsommation.setSucces(StatutConsommationOrdonnee.SUCCES == statut || 
                                          StatutConsommationOrdonnee.VALIDE == statut);
    this.defaultMQPushConsumerImpl.executerCrochetApres(contexteMessageConsommation);
}

Démarrage du service de rééquilibrage

Le service de rééquilibrage s'exécute avec un entervalle d'attente configurable :


private static long intervalleAttente = Long.parseLong(
    System.getProperty("rocketmq.client.rebalance.waitInterval", "20000")
);

Boucle principale du rééquilibrage :


public void run() {
    this.log.info(this.getNomService() + " service démarré");
    
    while (!this.estArrete()) {
        this.attendreExecution(intervalleAttente);
        this.fabriqueClients.reequilibrer();
    }
    
    this.log.info(this.getNomService() + " service terminé");
}

Deux modes de fonctionnement : diffusion et clustering. En mode clustering, les files de messages sont réparties entre les consommateurs d'un même groupe :


try {
    resultatAllocation = strategie.allouer(
        this.groupeConsommation,
        this.mqClientFactory.getIdClient(),
        toutesFiles,
        tousIdsConsommateurs
    );
} catch (Throwable e) {
    log.error("Exception AllocationMessageQueueStrategy.allocate. strategie={}", strategie.getNom(), e);
    return;
}

Le producteur par défaut n'est pas démarré du côté consommateur (paramètre false). Sinon, il créerait une instance de producteur avec la configuration appropriée :


this.mqClientFactory = MQClientManager.getInstance().obtenirOuCreerInstance(this.defaultMQProducer, this.crochetRpc);
boolean enregistrementReussi = this.mqClientFactory.enregistrerProducteur(
    this.defaultMQProducer.getGroupeProducteur(), this
);

MapConcurrent<string mqproducerinner=""> tableProducteurs;
</string>

Étiquettes: RocketMQ Java consommation de messages longue interrogation Netty

Publié le 28 juin à 00h19