Longue interrogation (utilisée par RocketMQ)
Consommateur -> Courtier : RocketMQ utilise la longue interrogation pour établir les connexions.
- Le courtier ignore la capacité de traitement du consommateur
- La poussage direct des messages surcharge le courtier
- Une connexion persistante risque d'empêcher le consommateur de traiter les données à temps
- Le consommateur garde le contrôle de la récupération (pull)
Interrogation courte
Le client envoie des requêtes continues au serveur, chaque requête nécessitant une nouvelle connexion.
Longue interrogation
Le client envoie une requête au serveur. Si des données sont disponibles, elles sont retournées ; sinon, la requête reste en attente sans interrompre la connexion.
Connexion persistante
Une fois établie, la connexion reste permanente, utilisée pour les notifications push.
Côté consommateur
Voici les 5 étapes principales d'analyse du code source :
1. Instanciation du DefaultMQPushConsumer
DefaultMQPushConsumer consommateur = new DefaultMQPushConsumer("groupeConsommation");
2. Configuration de l'adresse du serveur de noms
consommateur.setNamesrvAddr("192.168.88.134:9876");
3. Abonnement à un sujet avec filtrage
La classe DefaultMQPushConsumerImpl implémente réellement les méthodes de DefaultMQPushConsumer.
La méthode subscribe renvoei un SubscriptionData. L'expression de souscription peut être spécifiée ou "*" par défaut.
Après le démarrage du client, MQClientFactory envoie des paquets de heartbeat au courtier.
consommateur.subscribe("sujetTag", "ETIQUETTE-A");
public void subscribe(String sujet, String expression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(this.avecEspaceDeNoms(sujet), expression);
}
this.rebalanceImpl.getSubscriptionInner().put(sujet, subscriptionData);
if (this.mqClientFactory != null) {
this.mqClientFactory.sendHeartbeatToAllBrokerWithLock();
}
4. Enregistrement de l'écouteur de messages avec rappel
Par défaut, chaque message n'est consommé qu'une seule fois (modèle point à point). Le statut des messages est maintenu par le courtier.
L'accusé de réception (ACK) renvoie un état de consommation : CONSUME_SUCCESS (succès) ou RECONSUME_LATER (échec, reconsommation ultérieure).
Lorsque le statut RECONSUME_LATER est renvoyé :
- RocketMQ renvoie le lot de messages au courtier (sujet RETRY du groupe de consommation)
- Après un délai (10 secondes par défaut, configurable), le message est redistribué à un autre consommateur du même groupe
- En cas d'échecs répétés (16 par défaut), le message est envoyé dans la file d'attente des messages morts (DLQ) pour intervention manuelle
consommateur.registerMessageListener(...);
5. Démarrage du client de consommation
consommateur.demarrer();
this.defaultMQPushConsumerImpl.demarrer();
Opérations sur l'état du service :
enum EtatService {
CREE_RECENTEMENT,
EN_COURS_EXECUTION,
DEJA_FERME,
ECHEC_DEMARRAGE
}
Vérification de la configuration et récupération des abonnements :
this.verifierConfiguration();
this.copierAbonnements();
Obtention de l'instance MQClient :
this.mqClientFactory = MQClientManager.getInstance().obtenirOuCreerInstance(this.defaultMQPushConsumer, this.crochetRpc);
Enregistrement du consommateur et démarrage du client :
boolean enregistrementReussi = this.mqClientFactory.enregistrerConsommateur(this.defaultMQPushConsumer.getConsumerGroup(), this);
this.mqClientFactory.demarrer();
Démarrage du service de consommation des messages
this.serviceConsommationMessages.demarrer();
Processus de démarrage de MQClientInstance
this.mqClientAPIImpl.demarrer();
this.demarrerTachesPlanifiees();
this.serviceRecuperationMessages.demarrer();
this.serviceReequilibrage.demarrer();
this.defaultMQProducer.getDefaultMQProducerImpl().demarrer(false);
Démarrage de NettyRemotingClient
Le client Netty démarre avec 4 threads de travail. Il crée un groupe d'exécusion d'événements intégré dans le pipeline Netty.
NettyRemotingClient scanne périodiquement la table de réponses (ConcurrentHashMap de capacité initiale 256) et supprime les requêtes expirées.
this.mqClientAPIImpl.demarrer();
private int threadsTravailClient = 4;
pipeline.addFirst(this.groupeExecuteurEvenementsParDefaut,
"gestionnaireSSL",
this.contexteSSL.nouveauGestionnaire(canal.alloc()));
this.timer.scheduleAtFixedRate(new TacheTemporelle() {
public void executer() {
try {
NettyRemotingClient.this.scannerTableReponses();
} catch (Throwable e) {
NettyRemotingClient.log.error("Exception lors du scan de la table de réponses", e);
}
}
}, 3000L, 1000L);
protected final MapConcurrent<integer futurreponse=""> tableReponses = new ConcurrentHashMap(256);
if (rf.getHorodatageDebut() + rf.getDelaiExpiration() + 1000L <= System.currentTimeMillis()) {
rf.liberer();
iterateur.remove();
listeRf.add(rf);
log.warn("Requête expirée supprimée : " + rf);
}
</integer>
Si un écouteur d'événements de canal est défini, l'exécuteur d'événements Netty démarre (ServiceThread) :
if (this.ecouteurEvenementsCanal != null) {
this.executeurEvenementsNetty.demarrer();
}
Démarrage des tâches planifiées
Toutes les 120 secondes, vérifie l'adresse du serveur de noms et la met à jour si nécessaire :
private void demarrerTachesPlanifiees() {
if (null == this.configClient.getAddrSrvNoms()) {
this.servicePlanificateur.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
MQClientInstance.this.mqClientAPIImpl.recupererAdresseServeurNoms();
} catch (Exception e) {
MQClientInstance.this.log.error("Exception tâche planifiée : récupération adresse serveur noms", e);
}
}
}, 10000L, 120000L, TimeUnit.MILLISECONDS);
}
}
Démarrage du service de récupération de messages
Le service de récupération de messages utilise une file bloquante liée (LinkedBlockingQueue) pour les requêtes de récupération.
public void demarrer() {
log.info("Tentative de démarrage du service {}: démarré={}, dernierThread={}",
this.getNomService(), this.demarre.get(), this.thread);
if (this.demarre.compareAndSet(false, true)) {
this.arrete = false;
this.thread = new Thread(this, this.getNomService());
this.thread.setDaemon(this.estDaemon);
this.thread.start();
}
}
final FileBloquanteLiee<requeterecuperation> fileRequetesRecuperation = new LinkedBlockingQueue();
</requeterecuperation>
Boucle principale de récupération des messages :
public void run() {
this.log.info(this.getNomService() + " service démarré");
while (!this.estArrete()) {
try {
RequeteRecuperation requete = (RequeteRecuperation) this.fileRequetesRecuperation.take();
this.recupererMessage(requete);
} catch (InterruptedException e) {
// Ignoré
} catch (Exception e) {
this.log.error("Exception méthode run du service de récupération de messages", e);
}
}
this.log.info(this.getNomService() + " service terminé");
}
La méthode take() de LinkedBlockingQueue utilise un verrou réentrant interruptible :
public E prendre() throws InterruptedException {
E element;
int compteur = -1;
final AtomicInteger compteurAtomique = this.compteur;
final ReentrantLock verrouPrise = this.verrouPrise;
verrouPrise.lockInterruptibly();
try {
while (compteurAtomique.get() == 0) {
nonVide.attendre();
}
element = defiler();
compteur = compteurAtomique.getAndDecrement();
if (compteur > 1) {
nonVide.signaler();
}
} finally {
verrouPrise.unlock();
}
if (compteur == capacite) {
signalerNonPlein();
}
return element;
}
Structure d'une RequeteRecuperation :
private String groupeConsommation;
private FileAttenteMessages fileAttenteMessages;
private ProcessusTraitement processusTraitement;
private long prochainDecalage;
Défilement de la requête :
private E defiler() {
Noeud<e> tete = this.tete;
Noeud<e> premier = tete.suivant;
tete.suivant = tete; // Aide au GC
this.tete = premier;
E element = premier.element;
premier.element = null;
return element;
}
</e></e>
Récupération du consommateur correspondant :
private void recupererMessage(RequeteRecuperation requete) {
MQConsumerInner consommateur = this.mqClientFactory.selectionnerConsommateur(requete.getGroupeConsommation());
if (consommateur != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consommateur;
impl.recupererMessage(requete);
} else {
this.log.warn("Aucun consommateur correspondant pour la requête {}, abandon", requete);
}
}
Utilisation d'un rappel asynchrone pour traiter les résultats de récupération :
RappelRecuperation rappel = new RappelRecuperation() {
public void enSucces(ResultatRecuperation resultat) {
switch (resultat.getStatut()) {
case TROUVE:
requete.setProchainDecalage(resultat.getProchainDecalage());
// Soumettre la tâche de consommation
break;
}
}
};
public class ResultatRecuperation {
private final StatutRecuperation statut;
private final long prochainDecalage;
private final long decalageMin;
private final long decalageMax;
private List<messageext> messagesTrouves;
}
</messageext>
Service de consommation concurrente :
this.executeurConsommation = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getThreadsMinConsommation(),
this.defaultMQPushConsumer.getThreadsMaxConsommation(),
60000L, TimeUnit.MILLISECONDS,
this.fileRequetesConsommation,
new FabriqueThreadsImpl("ThreadConsommationMessage_")
);
Interface d'écoute concurrente :
public interface EcouteurMessagesConcurrent extends EcouteurMessages {
StatutConsommationConcurrente consommerMessage(List<messageext> var1, ContexteConcurrent var2);
}
</messageext>
Service de consommation séquentielle :
RequeteConsommation requete = new RequeteConsommation(processusTraitement, fileAttenteMessages);
this.executeurConsommation.soumettre(requete);
Interface d'écoute séquentielle :
public interface EcouteurMessagesOrdonnes extends EcouteurMessages {
StatutConsommationOrdonnee consommerMessage(List<messageext> var1, ContexteOrdonne var2);
}
</messageext>
Avant et après la consommation, les crochets (hooks) sont exécutés :
if (this.defaultMQPushConsumerImpl.aCrochet()) {
contexteMessageConsommation.setStatut(statut.toString());
contexteMessageConsommation.setSucces(StatutConsommationOrdonnee.SUCCES == statut ||
StatutConsommationOrdonnee.VALIDE == statut);
this.defaultMQPushConsumerImpl.executerCrochetApres(contexteMessageConsommation);
}
Démarrage du service de rééquilibrage
Le service de rééquilibrage s'exécute avec un entervalle d'attente configurable :
private static long intervalleAttente = Long.parseLong(
System.getProperty("rocketmq.client.rebalance.waitInterval", "20000")
);
Boucle principale du rééquilibrage :
public void run() {
this.log.info(this.getNomService() + " service démarré");
while (!this.estArrete()) {
this.attendreExecution(intervalleAttente);
this.fabriqueClients.reequilibrer();
}
this.log.info(this.getNomService() + " service terminé");
}
Deux modes de fonctionnement : diffusion et clustering. En mode clustering, les files de messages sont réparties entre les consommateurs d'un même groupe :
try {
resultatAllocation = strategie.allouer(
this.groupeConsommation,
this.mqClientFactory.getIdClient(),
toutesFiles,
tousIdsConsommateurs
);
} catch (Throwable e) {
log.error("Exception AllocationMessageQueueStrategy.allocate. strategie={}", strategie.getNom(), e);
return;
}
Le producteur par défaut n'est pas démarré du côté consommateur (paramètre false). Sinon, il créerait une instance de producteur avec la configuration appropriée :
this.mqClientFactory = MQClientManager.getInstance().obtenirOuCreerInstance(this.defaultMQProducer, this.crochetRpc);
boolean enregistrementReussi = this.mqClientFactory.enregistrerProducteur(
this.defaultMQProducer.getGroupeProducteur(), this
);
MapConcurrent<string mqproducerinner=""> tableProducteurs;
</string>