Groupes de Consommateurs et Mécanisme de Rééquilibrage
Les consommateurs Kafka sont structurés au sein de groupes logiques. Au sein d'un même groupe, les instances clientes souscrivent au même flux de données (topic). Le cluster distribue les partitions de manière à ce que chaque partition soit traitée par une seule instance du groupe. Si le nombre de partitions excède celui des consommateurs, certaines instances traiteront plusieurs partitions. Inversement, si les consommateurs sont plus nombreux que les partitions, les instances excédentaires resteront inactives.
Coordination et Pulsations (Heartbeats)
Le rééquilibrage est le processus par lequel la propriété des partitions est transférée d'un consommateur à un autre. Ce mécanisme est essentiel pour absorber les changements dynamiques, tels que l'ajout de nouvelles instances, la création de partitions ou la défaillance d'un nœud. Pendant cette phase, la consommation est suspandue, ce qui peut entraîner une perte de l'état de lecture local et nécessiter un rechargement de cache.
Pour maintenir leur appartenance au groupe, les consommateurs envoient des pulsations (heartbeats) au coordinateur du groupe (un broker spécifique). Si le coordinateur ne reçoit plus de pulsations dans le délai imparti, il considère le consommateur comme défaillant et initie un rééquilibrage. Lors d'un arrêt volontaire, le consommateur notifie le coordinateur pour déclencher immédiatement la redistribution et minimiser les temps d'arrêt.
Initialisation d'un Client Consommateur
La mise en place d'un consommateur nécessite une configuration précise des désérialiseurs et des paramètres de groupe. Voici une implémentation modulaire en Scala :
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, ConsumerRecords}
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.Properties
import scala.jdk.CollectionConverters._
import scala.util.{Try, Success, Failure}
object StreamEventProcessor {
def main(args: Array[String]): Unit = {
val clientProperties = buildClientConfiguration()
val messageSubscriber = new KafkaConsumer[String, String](clientProperties)
Try {
messageSubscriber.subscribe(Seq("telemetry_stream").asJava)
executePollingLoop(messageSubscriber)
} match {
case Success(_) => println("Traitement terminé avec succès.")
case Failure(exception) => System.err.println(s"Erreur fatale lors de la consommation : ${exception.getMessage}")
} finally {
messageSubscriber.close()
}
}
private def buildClientConfiguration(): Properties = {
val config = new Properties()
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
config.put(ConsumerConfig.GROUP_ID_CONFIG, "telemetry_processing_cluster")
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
config
}
private def executePollingLoop(subscriber: KafkaConsumer[String, String]): Unit = {
var isRunning = true
while (isRunning) {
val polledEvents: ConsumerRecords[String, String] = subscriber.poll(Duration.ofMillis(100))
if (!polledEvents.isEmpty) {
polledEvents.asScala.foreach { event =>
println(s"Reçu => Flux: ${event.topic()}, Partition: ${event.partition()}, Clé: ${event.key()}")
}
}
}
}
}
Sécurité des Threads
L'API du consommateur Kafka n'est pas thread-safe. Il est impératif d'associer une seule instance de consommateur à un thread dédié pour éviter les conditions de course sur l'état interne du client.
Paramètres de Configuration Clés
| Paramètre | Description |
|---|---|
group.id |
Identifiant unique du groupe de consommateurs. |
key.deserializer |
Classe responsable de la désérialisation des clés des messages. |
value.deserializer |
Classe responsable de la désérialisation des valeurs des messages. |
fetch.min.bytes |
Volume minimal de données (en octets) que le serveur doit accumuler avant de répondre à une requête. |
fetch.max.wait.ms |
Délai maximal d'attente pour atteindre le volume minimal de données avant de renvoyer ce qui est disponible. |
max.partition.fetch.bytes |
Volume maximal de données renvoyées par le serveur pour une seule partition lors d'une requête. |
session.timeout.ms |
Durée maximale d'inactivité (absence de pulsations) avant qu'un consommateur ne soit déclaré mort par le coordinateur. |
Gestion des Offsets et Validation
La validation (commit) d'un offset consiste à mettre à jour la position de lecture courante d'une partition. Kafka offre plusieurs stratégies pour gérer cette opération.
Stratégies de Validation (Commit)
Validation Automatique : Si enable.auto.commit est activé, le client valide périodiquement l'offset le plus récent. Cela peut entraîner un traitement en double en cas de rééquilibrage entre deux validations automatiques.
Validation Synchrone : La méthode commitSync() bloque le thread jusqu'à ce que le broker confirme la validation. Elle garantit la cohérence mais impacte les performances en cas de latence réseau.
Validation Asynchrone : commitAsync() n'attend pas la réponse du broker. Elle accepte un callback pour gérer les erreurs. Elle ne tente pas de retry automatiquement pour éviter les doublons dus au réordonnancement des requêtes.
Approche Hybride : Il est recommandé d'utiliser commitAsync() pendant le cycle de vie normal pour la performance, et commitSync() juste avant la fermeture du consommateur pour garantir qu'aucun offset n'est perdu.
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
import java.time.Duration
import java.util.Properties
import scala.jdk.CollectionConverters._
import scala.collection.mutable
object AdvancedOffsetManager {
def main(args: Array[String]): Unit = {
val config = new Properties()
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
config.put(ConsumerConfig.GROUP_ID_CONFIG, "hybrid_commit_group")
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val subscriber = new KafkaConsumer[String, String](config)
val pendingOffsets = mutable.Map[TopicPartition, OffsetAndMetadata]()
var processedCount = 0
try {
subscriber.subscribe(Seq("analytics_events").asJava)
while (true) {
val batch = subscriber.poll(Duration.ofMillis(100))
batch.asScala.foreach { record =>
// Traitement métier simulé
println(s"Traitement de l'événement : ${record.value()}")
val partitionKey = new TopicPartition(record.topic(), record.partition())
pendingOffsets.put(partitionKey, new OffsetAndMetadata(record.offset() + 1))
processedCount += 1
if (processedCount % 500 == 0) {
// Validation asynchrone par lots pour la performance
subscriber.commitAsync(pendingOffsets.toMap.asJava, null)
}
}
}
} catch {
case e: Exception => System.err.println(s"Erreur de traitement : ${e.getMessage}")
} finally {
try {
// Validation synchrone finale pour garantir la persistance avant fermeture
subscriber.commitSync()
} finally {
subscriber.close()
}
}
}
}
Interception des Rééquilibrages
Pour exécuter des opérations de nettoyage ou de sauvegarde d'état avant qu'une partition ne soit réassignée, il faut implémenter l'interface ConsumerRebalanceListener. Les méthodes onPartitionsRevoked et onPartitionsAssigned permettent de contrôler précisément le cycle de vie des partitions.
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import java.util
import java.util.Properties
import scala.jdk.CollectionConverters._
object PartitionLifecycleManager {
def main(args: Array[String]): Unit = {
val config = new Properties()
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
config.put(ConsumerConfig.GROUP_ID_CONFIG, "lifecycle_group")
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
val subscriber = new KafkaConsumer[String, String](config)
val rebalanceHandler = new ConsumerRebalanceListener {
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
println(s"Révocation des partitions : ${partitions.asScala.mkString(", ")}")
// Vider les tampons locaux et valider les offsets finaux
flushLocalBuffers()
subscriber.commitSync()
}
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
println(s"Assignation des partitions : ${partitions.asScala.mkString(", ")}")
// Initialiser l'état local ou charger les offsets depuis un store externe
initializePartitionState(partitions.asScala)
}
}
subscriber.subscribe(Seq("user_interactions").asJava, rebalanceHandler)
try {
while (true) {
val events = subscriber.poll(Duration.ofMillis(100))
events.asScala.foreach(event => println(s"Interaction : ${event.value()}"))
}
} finally {
subscriber.close()
}
}
private def flushLocalBuffers(): Unit = println("Tampons locaux vidés.")
private def initializePartitionState(partitions: Iterable[TopicPartition]): Unit =
println(s"État initialisé pour : ${partitions.mkString(", ")}")
}
Contrôle Granulaire de la Position de Lecture
L'API seek permet de déplacer manuellement le curseur de lecture vers un offset spécifique. Cela est particulièrement utile pour rejouer des données, ignorer des messages corrompus, ou restaurer un état depuis un système de stockage externe (comme une base de données).
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import java.time.Duration
import java.util.Properties
import scala.jdk.CollectionConverters._
object CustomOffsetSeeker {
def main(args: Array[String]): Unit = {
val config = new Properties()
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
config.put(ConsumerConfig.GROUP_ID_CONFIG, "seeker_group")
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val subscriber = new KafkaConsumer[String, String](config)
subscriber.subscribe(Seq("audit_logs").asJava)
// Poll initial pour déclencher l'assignation des partitions
subscriber.poll(Duration.ofMillis(0))
val targetPartition = new TopicPartition("audit_logs", 0)
val targetOffset = 1500L
// Déplacement forcé du curseur
subscriber.seek(targetPartition, targetOffset)
try {
while (true) {
val logs = subscriber.poll(Duration.ofMillis(100))
logs.asScala.foreach(log => println(s"Log traité depuis l'offset ${log.offset()} : ${log.value()}"))
}
} finally {
subscriber.close()
}
}
}
En combinant le ConsumerRebalanceListener avec la méthode seek, il est possible de garantir une reprise sur erreur précise. Lors de la révocation, les offsets sont persistés dans une base de données transactionnelle. Lors de la réassignation, les offsets sont extraits de cette base et appliqués via seek, assurant ainsi une sémantique de traitement exacte (exact-once) au niveau de l'application.
Arrêt Propre et Gestion du Cycle de Vie
Lors de l'utilisation d'une boucle infinie pour la scrutation, l'arrêt du consommateur nécessite une approche spécifique pour éviter les blocages. La méthode consumer.wakeup() est la seule opération thread-safe de l'API consommateur. Elle permet à un thread externe (comme un hook d'arrêt ou un thread de supervision) d'interrompre la boucle de scrutation en levant une WakeupException.
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.errors.WakeupException
import java.time.Duration
import java.util.Properties
import scala.jdk.CollectionConverters._
object GracefulShutdownDemo {
def main(args: Array[String]): Unit = {
val config = new Properties()
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
config.put(ConsumerConfig.GROUP_ID_CONFIG, "shutdown_group")
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
val subscriber = new KafkaConsumer[String, String](config)
subscriber.subscribe(Seq("system_metrics").asJava)
// Thread de supervision pour l'arrêt
val shutdownThread = new Thread(() => {
Thread.sleep(5000) // Simulation d'un signal d'arrêt après 5 secondes
println("Signal d'arrêt reçu. Déclenchement du wakeup...")
subscriber.wakeup()
})
shutdownThread.start()
try {
while (true) {
val metrics = subscriber.poll(Duration.ofMillis(100))
metrics.asScala.foreach(metric => println(s"Métrique : ${metric.value()}"))
}
} catch {
case _: WakeupException =>
println("WakeupException interceptée. Préparation à la fermeture.")
case e: Exception =>
System.err.println(s"Erreur inattendue : ${e.getMessage}")
} finally {
subscriber.commitSync() // Validation finale sécurisée
subscriber.close()
println("Consommateur fermé proprement.")
}
shutdownThread.join()
}
}