Introduction aux RDD
Le concept de RDD (Resilient Distributed Dataset) constitue l'abstraction fondamentale de Spark. Il s'agit d'une collection d'objets immuable et distribuée à travers les différents nœuds d'un cluster.
- Résilience : Spark assure la tolérance aux pannes. Si une partition de données est perdue, elle peut être recalculée automatiquement grâce au lignage (lineage). Les données peuvent également basculer de la mémoire vers le disque en cas de saturation.
- Distribution : Les données sont segmentées en partitions réparties sur plusieurs machines, permettant un traietment parallèle efficace.
En tant que développeur, vous manipulez l'objet RDD comme une collection locale, tandis que Spark gère la complexité de l'exécution distribuée en arrière-plan.
Création d'un RDD
Il existe deux méthodes principales pour initialiser un RDD dans Spark.
À partir d'une collection locale
val configuration = new SparkConf().setAppName("ExempleRDD").setMaster("local")
val contexte = new SparkContext(configuration)
val nombres = Array(10, 20, 30, 40, 50)
val rddNombres = contexte.parallelize(nombres)
Depuis une source externe
Spark peut lire des données provenant de HDFS, S3 ou de systèmes de fichiers locaux.
val rddTexte = contexte.textFile("chemin/vers/donnees.csv")
Opérateurs de Transformation
Les transformations créent un nouveau RDD à partir d'un existant. Elles sont paresseuses (lazy), signifiant qu'elles ne s'exécutent qu'au moment d'une action.
1. map : Applique une fonction à chaque élément.
val rddInitial = contexte.parallelize(List(1, 2, 3))
val rddCarre = rddInitial.map(x => x * x) // Résultat: 1, 4, 9
2. flatMap : Similaire à map, mais peut retourner plusieurs éléments pour une seule entrée.
val phrases = contexte.parallelize(Seq("Hello Spark", "Scala est top"))
val mots = phrases.flatMap(ligne => ligne.split(" "))
3. filter : Sélectionne les éléments respectant un prédicat.
val rddFiltre = rddNombres.filter(_ > 25)
4. reduceByKey : Regroupe les valeurs par clé et applique une agrégation. Très efficace car il effectue une pré-agrégation côté map.
val ventes = contexte.parallelize(List(("Pomme", 5), ("Poire", 3), ("Pomme", 2)))
val totalVentes = ventes.reduceByKey((acc, valeur) => acc + valeur)
// Résultat: (Pomme, 7), (Poire, 3)
5. join : Fusionne deux RDD basés sur une clé commune (Jointure interne).
val rddA = contexte.parallelize(Seq(("ID1", "ClientA"), ("ID2", "ClientB")))
val rddB = contexte.parallelize(Seq(("ID1", 100.0), ("ID2", 150.0)))
val rddJoint = rddA.join(rddB)
Opérateurs d'Action
Les actions déclenchent le calcul effectif et renvoient un résultat au programme Driver ou écrivent des données sur le stockage.
val somme = rddInitial.reduce(_ + _)
val contenu = rddInitial.collect() // Récupère tout le RDD en mémoire sur le Driver
val compte = rddInitial.count()
Partitionnement des RDD
Le nombre de partitions détermine le degré de parallélisme. Idéalement, il doit correspondre au nombre de cœurs CPU disponibles dans le cluster.
- parallelize :
sc.parallelize(data, 8)crée 8 partitions. - Configuraton par défaut : Si non spécifié, Spark utilise
spark.default.parallelismou le nombre total de cœurs en mode local.
Dépendances et Stages
Le graphe d'exécution de Spark est un DAG (Directed Acyclic Graph). Il se divise en Stages basés sur les dépendances entre RDD :
- Dépendance étroite (Narrow) : Chaque partition du RDD parent est utilisée par au plus une partition du RDD enfant (ex: map, filter). Aucune réorganisation des données (Shuffle) n'est nécessaire.
- Dépendance large (Wide) : Une partition parente est partagée entre plusieurs partitions enfants (ex: groupByKey, join). Cela provoque un Shuffle, transfert de données coûteux via le réseau.
Persistance et Mise en cache
Pour éviter de recalculer un RDD utilisé plusieurs fois, on peut le mettre en mémoire.
val rddIntermediaire = rddInitial.map(transformationLente).cache()
// cache() est un raccourci pour persist(StorageLevel.MEMORY_ONLY)
Variables Partagées
Variables de Diffusion (Broadcast)
Permettent de garder une variable en lecture seule sur chaque machine plutôt que de l'envoyer avec chaque tâche.
val tableReference = Map("A" -> 1, "B" -> 2)
val diffusion = contexte.broadcast(tableReference)
rdd.map(x => diffusion.value.get(x))
Accumulateurs
Variables utilisées pour agréger des informations (compteurs, sommes) depuis les exécuteurs vers le Driver.
val compteurErreurs = contexte.longAccumulator("Erreurs")
rdd.foreach(x => if (x < 0) compteurErreurs.add(1))
Cas Pratique : Calcul de la moyenne par étudiant
val notesBrutes = List("Alice,Math,18", "Bob,Math,15", "Alice,Physique,14", "Bob,Physique,12")
val rddNotes = contexte.parallelize(notesBrutes)
val moyenne = rddNotes.map(ligne => {
val champs = ligne.split(",")
(champs(0), (champs(2).toDouble, 1)) // (Nom, (Note, 1))
})
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.mapValues(total => total._1 / total._2)
moyenne.collect().foreach(println)
Gestion du Déséquilibre des Données (Data Skew)
Le déséquilibre survient lorsqu'une partition contient beaucoup plus de données que les autres, ralentissant l'ensemble du job.
- Filtrage : Supprimer les clés nulles ou inutiles avant le Shuffle.
- Augmentation du parallélisme : Augmenter le nombre de partitions pour diviser les grosses clés.
- Salting (Ajout de sel) : Ajouter un préfixe aléatoire aux clés pour forcer une redistribution, effectuer une première agrégation, puis supprimer le préfixe pour l'agrégation finale.