Importation incrémentale de données fondée sur des instantanés d'ID complets

Contexte du problème

Dans les pipelines de données hors ligne, synchroniser efficacement un nouveau lot d'enregistrements vers une base cible en distinguant les INSERT des UPDATE représente un défi courant. Cette difficulté s'accentue lorsque la source ne fournit pas d'horodatage, de numéro de version ou de journal de modificaitons (CDC). Une approche éprouvée consiste à employer des instantanés d'ID complets pour identifier les opérations.

Considérons une base cible contenant actuellement les enregistrements avec les clés primaires 1, 2 et 3. L'instantané d'ID complet précédent (sauvegardé après la dernière importation) est donc {1, 2, 3}. Le nouveau lot à importer ne comprend que deux enregistrements : la clé 3 et 4. L'objectif est de :

  • Détecter que la clé 3 existe déjà → exécuter un UPDATE
  • Détecter que la clé 4 est nouvelle → exécuter un INSERT

Il est crucial de noter que les données sources n'ont pas besoin de représenter l'état complet actuel (par exemple, les clés 1 et 2 peuvent être omises). Ce scénario est fréquent avec les API, les flux d'événements ou les rapports de logs qui ne transmettent souvent que les changements.

Principe du mécanisme

L'idée fondamentale est d'utiliser l'instantané historique d'ID comme un cache de présence. Pour chaque enregistrement du lot courant, on vérifie si son ID figure dans cet instantané pour déterminer le type d'opération.

Le flux de traitement se déroule comme suit :

  1. Entrées :
    • Données sources : liste des enregistrements à importer, au format clé\tvaleur
    • Instantané historique : ensemble de toutes les clés primaires valides, au format clé\tEXISTANT
  2. Traitement :
    • Pour chaque enregistrement source, contrôler la présence de sa clé dans l'instantané
      • Si présent → marquer comme UPDATE
      • Si absent → marquer comme INSERT
  3. Sorties :
    • Résultat des opérations : type d'opération et données pour chaque enregistrement
    • Nouvel instantané : ensemble d'ID pour la prochaine comparaison

Remarque : Ce mécanisme ne gère pas nativement les suppressions. Pour supporter les DELETE, il faudrait comparer les différences entre l'ancien et le nouvel instantané, ou utiliser un processus séparé.

Concernant la génération du nouvel instantané, la stratégie dépend de la sémantique métier :

  • Si les enregistrements ne sont jamais supprimés (ex. : inscriptions utilisateurs, création de commandes), le nouvel instantané = ancien instantané ∪ ID du lot source.
  • En cas de suppressions possibles, une gestion minutieuse est requise pour éviter l'explosion de l'instantané ou des incohérences d'état.

Pour la majorité des scénarios de type "upsert", une stratégie d'instantané "seulement en ajout" est sûre et performante.

Structure des données d'entrée

Voici un exemple concret des fichiers d'entrée :

  • Répertoire des données sources (/input/source) : ``` 3 {"nom":"Alice","age":30} 4 {"nom":"Bob","age":25}

  • Répertoire de l'instantané historique (/input/snapshot) : ``` 1 EXISTANT 2 EXISTANT 3 EXISTANT

    
    

Implémentation MapReduce en Python

Script Mapper (mappeur.py)

#!/usr/bin/env python3
import sys

for ligne in sys.stdin:
    ligne = ligne.strip()
    if not ligne:
        continue
    segments = ligne.split('\t', 1)
    if len(segments) < 2:
        continue
    identifiant, contenu = segments[0], segments[1]
    # Les lignes de l'instantané ont "EXISTANT" comme second champ
    if contenu == "EXISTANT":
        print(f"{identifiant}\tMETA\tEXISTANT")
    else:
        print(f"{identifiant}\tDONNEES\t{contenu}")

Le Mapper convertit toutes les entrées en un triplet (identifiant, étiquette, charge_utile). L'étiquette distingue l'origine : META pour l'instantané, DONNEES pour les données sources.

Script Reducer (reducteur.py)

#!/usr/bin/env python3
import sys

def executer():
    identifiant_courant = None
    valeurs_donnees = []
    est_dans_base = False

    for ligne in sys.stdin:
        ligne = ligne.strip()
        if not ligne:
            continue
        elements = ligne.split('\t')
        if len(elements) < 3:
            continue

        cle = elements[0]
        etiquette = elements[1]
        charge = elements[2]

        if identifiant_courant != cle:
            if identifiant_courant is not None:
                # Générer les résultats d'opération
                for val in set(valeurs_donnees):
                    operation = "UPDATE" if est_dans_base else "INSERT"
                    print(f"{identifiant_courant}\t{operation}\t{val}")
                # Écrire la ligne de l'instantané (avec préfixe pour séparation)
                print(f"INSTANTANÉ\t{identifiant_courant}\tEXISTANT")
            # Réinitialiser l'état
            identifiant_courant = cle
            valeurs_donnees = []
            est_dans_base = False

        if etiquette == "META" and charge == "EXISTANT":
            est_dans_base = True
        elif etiquette == "DONNEES":
            valeurs_donnees.append(charge)

    # Traiter le dernier groupe
    if identifiant_courant is not None:
        for val in set(valeurs_donnees):
            operation = "UPDATE" if est_dans_base else "INSERT"
            print(f"{identifiant_courant}\t{operation}\t{val}")
        print(f"INSTANTANÉ\t{identifiant_courant}\tEXISTANT")

if __name__ == "__main__":
    executer()

Le Reducer agrège les enregistrements par identifiant. Si un ID apparaît à la fois dans META et DONNEES, l'enregistrement existant est mis à jour. S'il n'apparaît que dans DONNEES, c'est un nouvel insertion. Tous les ID présents dans l'instantané ou les données sources sont écrits dans le nouvel instantané pour assurer la continuité.

Soumission du job Hadoop Streaming

Pour exécuter le traitement sur un cluster Hadoop :

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
  -files mappeur.py,reducteur.py \
  -input /input/source \
  -input /input/snapshot \
  -output /output/resultat_brut \
  -mapper "python3 mappeur.py" \
  -reducer "python3 reducteur.py" \
  -numReduceTasks 10

Points notables :

  • -files distribue les scripts locaux sur tous les nœuds de travail
  • Deux directives -input spécifient les chemins des données sources et de l'instantané
  • La sortie est écrite dans /output/resultat_brut, contenant un mélange de résultats

Traitement des sorties

Après l'exécution du job, les fichiers de sortie contiennent deux types de contenus : des instructions d'opération et des lignes d'instantané. Comme le Reducer de Hadoop Streaming produit un seul flux de sortie, nous utilsions un préfixe INSTANTANÉ\t pour marquer les lignes d'instantané, facilitant leur séparation ultérieure.

Extraction des résultats d'opération (pour injection en base)

hdfs dfs -cat /output/resultat_brut/part-* | grep -v "^INSTANTANÉ" > actions.txt

Contenu de actions.txt :

3	UPDATE	{"nom":"Alice","age":30}
4	INSERT	{"nom":"Bob","age":25}

Ces enregistrements peuvent être lus par un programme en aval pour exécuter les INSERT ou UDPATE correspondants.

Génération du nouvel instantané (pour la prochaine comparaison)

hdfs dfs -cat /output/resultat_brut/part-* | grep "^INSTANTANÉ" | cut -f2- > nouvel_instantane.txt
hdfs dfs -put nouvel_instantane.txt /output/nouvel_instantane/part-00000

Contenu de nouvel_instantane.txt :

1	EXISTANT
2	EXISTANT
3	EXISTANT
4	EXISTANT

Ce fichier deviendra l'entrée de l'instantané historique pour le prochain travail d'importation.

Domaines d'application et limitations

Scénarios adaptés

  • La source ne fournit qu'un sous-ensemble d'enregistrements (pas la totalité)
  • La table cible supporte les opérations de type upsert
  • Les suppressions n'existent pas ou sont gérées par un autre mécanisme (ex. : TTL, flux de suppression dédié)
  • La volumétrie des données convient au traitement par lots

Limites

  • Impossible de détecter automatiquement les suppressions : si un enregistrement disparaît définitivement de la source, aucun DELETE n'est généré. Pour le supporter, une comparaison supplémentaire entre instantanés est nécessaire.
  • L'instantané peut croître indéfiniment au fil du temps. Des reconstructions périodiques en réconciliation avec la base cible, ou l'utilisation de filtres de Bloom, peuvent optimiser le stockage.
  • Dépendance à la stabilité des clés primaires : elles doivent être uniques et immuables globalement.
  • Fonctionnalités limitées de Hadoop Streaming : les sorties multiples complexes nécessitent des préfixes ou des jobs secondaires. Pour les environnements de production, Spark ou Hive offrent des alternatives plus robustes.

Étiquettes: Hadoop Streaming Python mapreduce Importation Incrémentale Synchronisation de Données

Publié le 2 juillet à 05h20