Comprendre les RDD dans Apache Spark : Fondamentaux et Pratiques

Introduction aux RDD

Le concept de RDD (Resilient Distributed Dataset) constitue l'abstraction fondamentale de Spark. Il s'agit d'une collection d'objets immuable et distribuée à travers les différents nœuds d'un cluster.

  • Résilience : Spark assure la tolérance aux pannes. Si une partition de données est perdue, elle peut être recalculée automatiquement grâce au lignage (lineage). Les données peuvent également basculer de la mémoire vers le disque en cas de saturation.
  • Distribution : Les données sont segmentées en partitions réparties sur plusieurs machines, permettant un traietment parallèle efficace.

En tant que développeur, vous manipulez l'objet RDD comme une collection locale, tandis que Spark gère la complexité de l'exécution distribuée en arrière-plan.

Création d'un RDD

Il existe deux méthodes principales pour initialiser un RDD dans Spark.

À partir d'une collection locale

val configuration = new SparkConf().setAppName("ExempleRDD").setMaster("local")
val contexte = new SparkContext(configuration)
val nombres = Array(10, 20, 30, 40, 50)
val rddNombres = contexte.parallelize(nombres)

Depuis une source externe

Spark peut lire des données provenant de HDFS, S3 ou de systèmes de fichiers locaux.

val rddTexte = contexte.textFile("chemin/vers/donnees.csv")

Opérateurs de Transformation

Les transformations créent un nouveau RDD à partir d'un existant. Elles sont paresseuses (lazy), signifiant qu'elles ne s'exécutent qu'au moment d'une action.

1. map : Applique une fonction à chaque élément.

val rddInitial = contexte.parallelize(List(1, 2, 3))
val rddCarre = rddInitial.map(x => x * x) // Résultat: 1, 4, 9

2. flatMap : Similaire à map, mais peut retourner plusieurs éléments pour une seule entrée.

val phrases = contexte.parallelize(Seq("Hello Spark", "Scala est top"))
val mots = phrases.flatMap(ligne => ligne.split(" "))

3. filter : Sélectionne les éléments respectant un prédicat.

val rddFiltre = rddNombres.filter(_ > 25)

4. reduceByKey : Regroupe les valeurs par clé et applique une agrégation. Très efficace car il effectue une pré-agrégation côté map.

val ventes = contexte.parallelize(List(("Pomme", 5), ("Poire", 3), ("Pomme", 2)))
val totalVentes = ventes.reduceByKey((acc, valeur) => acc + valeur) 
// Résultat: (Pomme, 7), (Poire, 3)

5. join : Fusionne deux RDD basés sur une clé commune (Jointure interne).

val rddA = contexte.parallelize(Seq(("ID1", "ClientA"), ("ID2", "ClientB")))
val rddB = contexte.parallelize(Seq(("ID1", 100.0), ("ID2", 150.0)))
val rddJoint = rddA.join(rddB)

Opérateurs d'Action

Les actions déclenchent le calcul effectif et renvoient un résultat au programme Driver ou écrivent des données sur le stockage.

val somme = rddInitial.reduce(_ + _)
val contenu = rddInitial.collect() // Récupère tout le RDD en mémoire sur le Driver
val compte = rddInitial.count()

Partitionnement des RDD

Le nombre de partitions détermine le degré de parallélisme. Idéalement, il doit correspondre au nombre de cœurs CPU disponibles dans le cluster.

  • parallelize : sc.parallelize(data, 8) crée 8 partitions.
  • Configuraton par défaut : Si non spécifié, Spark utilise spark.default.parallelism ou le nombre total de cœurs en mode local.

Dépendances et Stages

Le graphe d'exécution de Spark est un DAG (Directed Acyclic Graph). Il se divise en Stages basés sur les dépendances entre RDD :

  • Dépendance étroite (Narrow) : Chaque partition du RDD parent est utilisée par au plus une partition du RDD enfant (ex: map, filter). Aucune réorganisation des données (Shuffle) n'est nécessaire.
  • Dépendance large (Wide) : Une partition parente est partagée entre plusieurs partitions enfants (ex: groupByKey, join). Cela provoque un Shuffle, transfert de données coûteux via le réseau.

Persistance et Mise en cache

Pour éviter de recalculer un RDD utilisé plusieurs fois, on peut le mettre en mémoire.

val rddIntermediaire = rddInitial.map(transformationLente).cache()
// cache() est un raccourci pour persist(StorageLevel.MEMORY_ONLY)

Variables Partagées

Variables de Diffusion (Broadcast)

Permettent de garder une variable en lecture seule sur chaque machine plutôt que de l'envoyer avec chaque tâche.

val tableReference = Map("A" -> 1, "B" -> 2)
val diffusion = contexte.broadcast(tableReference)
rdd.map(x => diffusion.value.get(x))

Accumulateurs

Variables utilisées pour agréger des informations (compteurs, sommes) depuis les exécuteurs vers le Driver.

val compteurErreurs = contexte.longAccumulator("Erreurs")
rdd.foreach(x => if (x < 0) compteurErreurs.add(1))

Cas Pratique : Calcul de la moyenne par étudiant

val notesBrutes = List("Alice,Math,18", "Bob,Math,15", "Alice,Physique,14", "Bob,Physique,12")
val rddNotes = contexte.parallelize(notesBrutes)

val moyenne = rddNotes.map(ligne => {
    val champs = ligne.split(",")
    (champs(0), (champs(2).toDouble, 1)) // (Nom, (Note, 1))
})
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.mapValues(total => total._1 / total._2)

moyenne.collect().foreach(println)

Gestion du Déséquilibre des Données (Data Skew)

Le déséquilibre survient lorsqu'une partition contient beaucoup plus de données que les autres, ralentissant l'ensemble du job.

  • Filtrage : Supprimer les clés nulles ou inutiles avant le Shuffle.
  • Augmentation du parallélisme : Augmenter le nombre de partitions pour diviser les grosses clés.
  • Salting (Ajout de sel) : Ajouter un préfixe aléatoire aux clés pour forcer une redistribution, effectuer une première agrégation, puis supprimer le préfixe pour l'agrégation finale.

Étiquettes: Apache Spark Scala Big Data Distributed Computing RDD

Publié le 6 juin à 01h23