Le zero-copy dans Kafka est un concept clé pour optimiser les performances, notamment lors de la consommation des messages. Kafka, en tant que système de messageire mature, offre de nombreuses fonctionnalités avancées, mais nous nous concentrons ici sur les aspects fondamentaux de la production, du stockage et de la consommation des messages. Le zero-copy n'intervient que dans la phase de consommation ; en production, les messages sont placés en mémoire heap pour vérification, ce qui ne permet pas ce mécanisme.
Le protocole de message dans Kafka a évolué à travers trois versions. La version 2, largement adoptée, introduit le concept de Record Batch, où chaque batch peut contenir de un à plusieurs messages. Par défaut, la taille maximale d'un batch est de 16 Ko. Lors de la production, le paramètre linger.ms influence la formation des batches : une valeur plus élevée favorise le remplissage complet, tandis qu'une valeur par défaut (0) entraîne un message par batch. Ainsi, le Record Batch constitue l'unité minimale pour l'échange de données.
Pour le stockage, Kafka s'appuie sur deux types de fichiers par segment : un fichier .log pour les messages (ajoutés en mode append) et un fichier .index pour les index clairsemés. Ces index sont construits tous les 4 Ko, alignés sur les caractéristiques matérielles pour optimiser la mise en cache par le système d'exploitation. En version 2, les requêtes de messages se basent sur des Record Batches, et l'intervalle d'index dépend de la taille des batches (souvent 16 Ko avec une configuration par défaut).
Lors de la consommation, deux paramètres sont essentiels : fetch.min.bytes (taille minimale de récupération, par défaut 1 octet) et fetch.max.bytes (taille maximale, par défaut 50 Mo). Kafka garantit qu'au moins un message est renvoyé, même s'il dépasse fetch.max.bytes. Pour localiser un message spécifique, la recherche se fait en deux étapes via les index clairsemés et les fichiers .log.
La localisation grossière utilise une recherche binaire dans le fichier .index pour déterminer une position approximative dans le fichier .log. Par exemple, si l'index contient les entrées (offset 110, position 8200) et (offset 120, position 13000), une recherche pour l'offset 115 débutera à la position 8200. Cette opération est implémentée dans des classes comme AbstractIndex.scala.
Ensuite, la localisation précise scanne les Record Batches à partir de cette position. Un batch contient des métadonnées telles que le premier et le dernier offset, ce qui permet de vérifier si l'offset cible s'y trouve. Voici un exemple de code modifié pour cette recherche :
public RecordBatchLocation locateRecordBatchByOffset(long desiredOffset, int fileOffset) {
for (FileChannelRecordBatch currentBatch : batchesStartingFrom(fileOffset)) {
long lastOffset = currentBatch.lastOffset();
if (lastOffset >= desiredOffset) {
return new RecordBatchLocation(lastOffset, currentBatch.position(), currentBatch.sizeInBytes());
}
}
return null;
}
Une fois la positoin du batch déterminée, le transfert de données s'effectue via le zero-copy avec FileChannel.transferTo(long position, long count). La longueur de copie correspond à fetch.max.bytes, mais le dernier batch peut être incomplet. Le client Kafka gère cela en tronquant les batches partiels, comme illustré dans le code ci-dessous (modifié pour la clarté) :
public MutableRecordBatch readNextBatch() {
int availableBytes = buffer.remaining();
Integer size = determineNextBatchSize();
if (size == null || availableBytes < size) {
return null;
}
byte magicByte = buffer.get(buffer.position() + MAGIC_OFFSET);
ByteBuffer batchData = buffer.slice();
batchData.limit(size);
buffer.position(buffer.position() + size);
if (magicByte > RecordBatch.MAGIC_VALUE_V1) {
return new DefaultRecordBatch(batchData);
} else {
return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchData);
}
}
Les données en cache dans le Page Cache réduisent les accès disque, ce qui rend le scan efficace même avec des batches multiples. La stratégie de Kafka consiste à renvoyer des batches complets lorsque possible, mais les lectures froides depuis le disque peuvent entraîner une légère redondance de données, considérée comme acceptable au vu des gains de performence.
Examinons quelques scénarios concrets. Pour un topic avec des batches à message unique et fetch.max.bytes=3500, le consommateur peut récupérer trois batches complets et un demi-batch, puis compléter lors des requêtes suivantes. Dans un scénario de batches à messages multiples, avec fetch.max.bytes=10, le client renvoie le batch entier, et le filtrage est effectué côté client. Voici un exemple de code modifié pour la consommation :
private void retrieveMessages() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(loadConfiguration());
consumer.commitAsync();
consumer.assign(Collections.singletonList(new TopicPartition("myTopic", 0)));
TopicPartition partition = new TopicPartition("myTopic", 0);
consumer.seek(partition, 1);
while (true) {
ConsumerRecords<String, String> fetchedRecords = consumer.poll(Duration.ofSeconds(1));
int count = fetchedRecords.count();
System.out.println("Nombre de messages récupérés : " + count);
if (count == 0) {
System.out.println("Aucun message disponible");
}
}
}
Cette approche montre que le client Kafka, bien que complexe, optimise les transferts via le zero-copy et la gestion des batches, assurant une haute performance même dans des conditions variées.