Introduction à Apache Kafka
Apache Kafka se définit comme une plateforme distribuée de gestion de flux d'événements. Ses capacités fondamentales reposent sur trois piliers :
- La publication et l'abonnement en temps réel à des flux de données, facilitant l'intégration continue entre systèmes hétérogènes.
- Le stockage persistant et tolérant aux pannes des enregistrements.
- Le traitement des flux à la volée ou de manière rétrospective.
Architecture Fondamentale
L'écosystème Kafka s'articule autour de plusieurs composants clés :
- Producteur (Producer) : Entité responsable de la génération et de l'envoi des événements.
- Consommateur (Consumer) : Entité qui lit et traite les événements.
- Courtier (Broker) : Nœud serveur exécutant une instance Kafka. Un ensemble de brokers forme un cluster distribué.
Topics et Partitions
Les événements sont catégorisés dans des topics. Pour garnatir l'évolutivité, chaque topic est divisé en plusieurs partitions. Une partition agit comme un journal (log) immuable et ordonné. Chaque message ajouté reçoit un identifiant séquentiel appelé offset. Il est crucial de noter que l'ordre est strictement garanti au sein d'une même partition, et non à l'échelle du topic entier. La distribution des partitions sur plusieurs brokers permet de paralléliser les opérations de lecture et d'écriture, contournant ainsi les limites matérielles d'une seule machine.
Mécanisme de Réplictaion
Pour assurer la haute disponibilité, Kafka réplique chaque partition sur plusieurs nœuds. Ce modèle suit une architecture maître-esclave :
- Leader : Gère toutes les requêtes de lecture et d'écriture pour la partition.
- Follower : Synchronise passivement les données depuis le leader.
En cas de défaillance du broker hébergeant le leader, un follower est automatiquement promu, assurant la continuité du service sans intervention manuelle.
Implémentation : Producteur et Consommateur
Voici des implémentations remaniées utilisant les API Java modernes pour interagir avec le cluster.
Producteur d'Événements
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class EventPublisher {
private static final String CLUSTER_NODES = "localhost:9092";
private static final String TARGET_TOPIC = "system-events";
public static void main(String[] args) {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER_NODES);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> publisher = new KafkaProducer<>(config)) {
ProducerRecord<String, String> event = new ProducerRecord<>(TARGET_TOPIC, "event-key", "Payload data");
publisher.send(event);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
Consommateur d'Événements
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class EventSubscriber {
public static void main(String[] args) {
Map<String, Object> settings = new HashMap<>();
settings.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
settings.put(ConsumerConfig.GROUP_ID_CONFIG, "analytics-group");
settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
settings.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
settings.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (KafkaConsumer<String, String> subscriber = new KafkaConsumer<>(settings)) {
subscriber.subscribe(Collections.singletonList("system-events"));
while (true) {
ConsumerRecords<String, String> batch = subscriber.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : batch) {
System.out.printf("Partition: %d | Offset: %d | Value: %s%n",
record.partition(), record.offset(), record.value());
}
subscriber.commitSync();
}
}
}
}
Paramètres Cruciaux du Serveur
- zookeeper.connect : (Pour les anciennes versions) Chaîne de connexion au cluster ZooKeeper.
- listeners : Interfaces réseau et ports sur lesquels le broker acceptte les connexions clients.
- broker.id : Identifiant numérique unique du nœud au sein du cluster.
- log.dirs : Répertoires du système de fichiers où les segments de log sont persistés.
- message.max.bytes : Taille maximale absolue qu'un broker peut accepter pour un message unique.
Approfondissement du Producteur
La classe ProducerRecord encapsule les métadonnées de l'événement :
- topic : Destination logique.
- partition : Cible physique spécifique (optionnelle).
- key : Utilisé par le partitionneur pour router les messages de manière déterministe.
- value : Le contenu sérialisé.
- headers : Paires clé-valeur pour le routage ou le traçage applicatif.
- timestamp : Horodatage de création ou d'ajout au log.
Note : L'instance de KafkaProducer est thread-safe. Il est fortement recommandé de la partager entre plusieurs threads plutôt que d'en créer une par thread.
Mise en Pool du Producteur
Bien que le producteur soit thread-safe, certaines architectures legacy ou spécifiques nécessitent un pool d'objets. Voici une implémentation basée sur Apache Commons Pool2.
1. Fabrique d'objets
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;
public class PublisherFactory extends BasePooledObjectFactory<KafkaProducer<String, String>> {
private final Properties envProps;
public PublisherFactory(Properties envProps) {
this.envProps = envProps;
}
@Override
public KafkaProducer<String, String> create() {
return new KafkaProducer<>(envProps);
}
@Override
public PooledObject<KafkaProducer<String, String>> wrap(KafkaProducer<String, String> obj) {
return new DefaultPooledObject<>(obj);
}
@Override
public void destroyObject(PooledObject<KafkaProducer<String, String>> p) {
p.getObject().close();
}
}
2. Gestionnaire de Pool
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;
public class PublisherPoolManager {
private final GenericObjectPool<KafkaProducer<String, String>> pool;
public PublisherPoolManager(Properties props) {
GenericObjectPoolConfig<KafkaProducer<String, String>> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(10);
this.pool = new GenericObjectPool<>(new PublisherFactory(props), config);
}
public KafkaProducer<String, String> acquire() throws Exception {
return pool.borrowObject();
}
public void release(KafkaProducer<String, String> instance) {
pool.returnObject(instance);
}
}
Modes d'Expédition des Messages
Expédition Synchrone
Le producteur bloque le thread appelant jusqu'à la réception de l'accusé de réception du broker. Cela garantit la livraison mais dégrade le débit.
try {
RecordMetadata metadata = publisher.send(event).get(5, TimeUnit.SECONDS);
System.out.println("Écrit dans la partition " + metadata.partition());
} catch (TimeoutException | ExecutionException | InterruptedException e) {
// Gérer l'échec de l'écriture
}
Gestion des erreurs : Les exceptions comme NetworkException sont retraitées automatiquement si retries est configuré. Les erreurs fatales comme RecordTooLargeException nécessitent une intervention au niveau du code métier.
Expédition Asynchrone
L'approche asynchrone utilise une fonction de rappel (Callback) pour traiter la réponse sans bloquer le thread principal. C'est le mode privilégié pour les systèmes à haut débit.
publisher.send(event, (metadata, exception) -> {
if (exception != null) {
System.err.println("Échec de l'envoi : " + exception.getMessage());
} else {
System.out.println("Succès : offset " + metadata.offset());
}
});
Mécanismes Internes et Paramètres Clients
Avant l'envoi réseau, le message traverse des intercepteurs, un sérialiseur et un partitionneur. Il est ensuite stocké dans le RecordAccumulator, une mémoire tampon qui regroupe les messages par partition pour optimiser les requêtes réseau (batching).
Paramètres d'acknowledgement (acks)
- acks=0 : "Fire-and-forget". Débit maximal, risque de perte de données.
- acks=1 : Confirmation dès l'écriture sur le leader. Compromis standard.
- acks=all : Confirmation après écriture sur tous les réplicas ISR (In-Sync Replicas). Fiabilité maximale, latence accrue.
Autres configurations notables
- max.request.size : Limite la taille globale d'une requête batchée envoyée au broker.
- compression.type : Active la compression (snappy, lz4, zstd) pour réduire la bande passante réseau.
- buffer.memory : Taille totale de la mémoire allouée au
RecordAccumulator.