La transformation de flux de données massifs en informations exploitables constitue un avantage concurrentiel majeur. Les systèmes traditionnels par lots introduisent des délais critiques. Apache Flink permet de construire des pipelines de traitement en temps réel robustes.
Construire un système d'analyse en temps réel avec Flink
Notre objectif est de créer un pipeilne qui reçoit des événements d'interaction utilisateur (clics, vues) depuis une source de messages, applique des transformations en continu, calcule des agrégations sur des fenêtres temporelles et expose les métriques. Voici l'architecture générale :
Source événements (ex: Kafka) → Traitement Flink → Sink (ex: base de données, API)
1. Initialisation du projet et dépendances
Pour commencer, nous configurons un projet Maven avec les modules Flink nécessaires. Le connecteur pour Apache Kafka est essentiel pour consommer les événements.
Voici un exemple de configuration simplifiée du fichier pom.xml :
<properties>
<flink.version>1.17.1</flink.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.entreprise.flink.MainApp</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Cette configuration prépare un JAR autonome (uber-JAR) prêt pour le déploiement sur un cluster Flink.
2. Schéma d'événements et source Kafka
Nous définissons d'abord la structure des événements entrants. Par exemple, un événement de clic peut être représenté par un POJO Java.
public class ClickEvent {
private String userId;
private String pageUrl;
private long timestamp;
// Constructeurs, getters et setters
}
Ensuite, nous configurons un consommateur Kafka pour lire ces événements.
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka-broker:9092");
kafkaProps.setProperty("group.id", "flink-analytics-consumer");
DataStream<ClickEvent> clickStream = env
.addSource(new FlinkKafkaConsumer<>(
"clicks-topic",
new ClickEventSchema(), // Désérialiseur personnalisé
kafkaProps
));
3. Traitement et calculs en fenêtre
Le cœur du traitement réside dans l'application de fenêtres temporelles pour agréger les données. Nous calculons le nombre d'événements par URL sur une fenêtre glissante de 10 secondes, rafraîchie chaque seconde.
// KeyedStream par URL de la page
KeyedStream<ClickEvent, String> keyedByPage = clickStream
.keyBy(ClickEvent::getPageUrl);
// Fenêtre glissante de 10 secondes avec décalage de 1 seconde
DataStream<PageStats> statsPerWindow = keyedByPage
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)))
.aggregate(new PageViewAggregator());
// PageViewAggregator est une fonction qui accumule le compteur et un set d'IDs utilisateurs uniques
// Elle retourne un objet PageStats contenant (url, pvCount, uvCount, windowEnd)
La classe PageStats pourrait ressembler à ceci :
public class PageStats {
private String pageUrl;
private long pvCount;
private long uvCount;
private long windowEndTimestamp;
// ...
}
4. Sortie des résultats
Les statistiques calculées sont envoyées vers un sink. Pour une visualisation immédiate, nous pouvons les imprimer ou les écrire dans une base de données clé-valeur.
statsPerWindow.addSink(new SinkFunction<PageStats>() {
@Override
public void invoke(PageStats stats, Context context) {
// Ici, envoyer 'stats' à Redis, PostgreSQL, ou une API REST
System.out.println("Métriques: " + stats.getPageUrl() + " PV:" + stats.getPvCount() + " UV:" + stats.getUvCount());
}
});
Pour lancer l'application :
// Créer l'environnement d'exécution
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Construire le pipeline complet (source, transformations, sink)
// ...
// Exécuter le job Flink
env.execute("Analyse Comportement Utilisateur en Temps Réel");
Ce pipeline s'exécute en continu, produisant des métriques agrégées à chaque fin de fenêtre. L'état est automatiquement géré et tolérant aux pannes par Flink grâce aux points de contrôle.