Ingestion des Données vers l'Entrepôt Hive (Couche ODS)

Introduction à l'Ingestion de Données Big Data

La mise en place d'une infrastructure Big Data robuste débute par une ingestion efficace des données. Dans notre architecture, la couche ODS (Operational Data Store) de notre entrepôt de données est la première étape où les informations sont structurées pour l'analyse. Cette phase de chargement des données vers Hive intervient une fois que toutes les sources de données brutes, qu'il s'agisse de données transactionnelles métier ou de journaux d'événements, ont été synchronisées avec succès sur le système de fichiers distribué HDFS.

Les données métier complètes (full load) sont généralement importées quotidiennement, transférant les enregistrements de la veille vers des répertoires HDFS datés correspondants. Les mises à jour incrémentielles des données métier, issues par exemple de journaux de transactions (binlogs), sont acheminées en temps réel vers HDFS, dans des dossiers structurés par date. De manière similaire, les données de log sont ingérées en continu vers HDFS, avec une structuration par date basée sur l'horodatage (champ ts) présent dans les messages JSON.

Chargement des Données (HDFS vers Hive)

Cette section détaille le processus de création des tables Hive externes et le mécanisme de chargement des données depuis HDFS vers la base de données ODS.

Définition des Tables Hive

Pour cette infrastructure, nous avons défini un total de 28 tables au sein de la base de données ODS : 16 tables pour les données métier complètes (chargement quotidien intégral), 11 tables pour les données métier incrémentielles (capture des changements de données) et 1 table pour les journaux d'événements de trafic.

Il est important de noter que les tables Hive utilisent par défaut le format TextFile. Les tables de dimension statiques, telles que celles des provinces et des régions, ne sont pas partitionnées par date, car leur contenu est statique ou mis à jour moins fréquemment. Les fichiers HDFS contenant les données métier complètes sont formatés avec des champs séparés par des tabulations (\t). En revanche, les données incrémentielles métier et les logs de trafic sont stockés sous forme de chaînes JSON, nécessitant un SerDe (Serializer/Deserializer) spécifique pour leur parsing.

Définitions des Tables Métier Complètes


DROP TABLE IF EXISTS ods.db.ods_produits_sku_complets;
CREATE EXTERNAL TABLE ods.db.ods_produits_sku_complets(
    `identifiant_sku` STRING COMMENT 'Identifiant unique du SKU',
    `identifiant_spu` STRING COMMENT 'Identifiant unique du SPU associé',
    `prix_unitaire` DECIMAL(16,2) COMMENT 'Prix du produit',
    `nom_produit` STRING COMMENT 'Désignation du produit',
    `description_produit` STRING COMMENT 'Description détaillée du SKU',
    `poids` DECIMAL(16,2) COMMENT 'Poids en unités standards',
    `identifiant_marque` STRING COMMENT 'Identifiant de la marque',
    `identifiant_categorie_3` STRING COMMENT 'ID de la catégorie de troisième niveau',
    `en_vente` STRING COMMENT 'Indicateur de disponibilité (oui/non)',
    `date_creation` STRING COMMENT 'Timestamp de création de l''enregistrement'
) COMMENT 'Table des informations complètes sur les SKU'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_produits_sku_complets/';

DROP TABLE IF EXISTS ods.db.ods_categories_niveau1_complet;
CREATE EXTERNAL TABLE ods.db.ods_categories_niveau1_complet(
    `identifiant_cat1` STRING COMMENT 'Identifiant de la catégorie de premier niveau',
    `nom_cat1` STRING COMMENT 'Nom de la catégorie de premier niveau'
) COMMENT 'Table des catégories de produits de niveau 1'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_categories_niveau1_complet/';

DROP TABLE IF EXISTS ods.db.ods_categories_niveau2_complet;
CREATE EXTERNAL TABLE ods.db.ods_categories_niveau2_complet(
    `identifiant_cat2` STRING COMMENT 'Identifiant de la catégorie de deuxième niveau',
    `nom_cat2` STRING COMMENT 'Nom de la catégorie de deuxième niveau',
    `identifiant_cat1` STRING COMMENT 'Identifiant de la catégorie de premier niveau parente'
) COMMENT 'Table des catégories de produits de niveau 2'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_categories_niveau2_complet/';

DROP TABLE IF EXISTS ods.db.ods_categories_niveau3_complet;
CREATE EXTERNAL TABLE ods.db.ods_categories_niveau3_complet(
    `identifiant_cat3` STRING COMMENT 'Identifiant de la catégorie de troisième niveau',
    `nom_cat3` STRING COMMENT 'Nom de la catégorie de troisième niveau',
    `identifiant_cat2` STRING COMMENT 'Identifiant de la catégorie de deuxième niveau parente'
) COMMENT 'Table des catégories de produits de niveau 3'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_categories_niveau3_complet/';

DROP TABLE IF EXISTS ods.db.ods_ref_provinces;
CREATE EXTERNAL TABLE ods.db.ods_ref_provinces (
    `id_province` STRING COMMENT 'Identifiant de la province',
    `nom_province` STRING COMMENT 'Nom de la province',
    `id_region` STRING COMMENT 'Identifiant de la région géographique',
    `code_zone` STRING COMMENT 'Code de zone administrative',
    `code_iso` STRING COMMENT 'Code ISO-3166 pour la visualisation',
    `code_iso_3166_2` STRING COMMENT 'Code ISO-3166-2 pour la visualisation'
)  COMMENT 'Table de référence des provinces'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_ref_provinces/';

DROP TABLE IF EXISTS ods.db.ods_marques_complet;
CREATE EXTERNAL TABLE ods.db.ods_marques_complet (
    `id_marque` STRING COMMENT 'Identifiant de la marque',
    `nom_marque` STRING COMMENT 'Nom de la marque'
)  COMMENT 'Table des marques de produits'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_marques_complet/';

DROP TABLE IF EXISTS ods.db.ods_produits_spu_complets;
CREATE EXTERNAL TABLE ods.db.ods_produits_spu_complets(
    `identifiant_spu` STRING COMMENT 'Identifiant unique du SPU',
    `nom_spu` STRING COMMENT 'Nom du SPU',
    `identifiant_categorie_3` STRING COMMENT 'ID de la catégorie de troisième niveau',
    `identifiant_marque` STRING COMMENT 'Identifiant de la marque'
) COMMENT 'Table des informations complètes sur les SPU'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_produits_spu_complets/';

DROP TABLE IF EXISTS ods.db.ods_favoris_complets;
CREATE EXTERNAL TABLE ods.db.ods_favoris_complets(
    `id_favori` STRING COMMENT 'Identifiant de l''enregistrement favori',
    `id_utilisateur` STRING COMMENT 'ID de l''utilisateur',
    `identifiant_sku` STRING COMMENT 'ID du SKU favori',
    `identifiant_spu` STRING COMMENT 'ID du SPU favori',
    `est_annule` STRING COMMENT 'Indique si le favori a été annulé',
    `date_ajout` STRING COMMENT 'Date d''ajout du favori',
    `date_annulation` STRING COMMENT 'Date d''annulation du favori'
) COMMENT 'Table des produits favoris'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_favoris_complets/';

DROP TABLE IF EXISTS ods.db.ods_paniers_complets;
CREATE EXTERNAL TABLE ods.db.ods_paniers_complets(
    `id_panier` STRING COMMENT 'Identifiant de l''article dans le panier',
    `id_utilisateur` STRING COMMENT 'ID de l''utilisateur',
    `identifiant_sku` STRING COMMENT 'ID du SKU ajouté au panier',
    `prix_panier` DECIMAL(16,2)  COMMENT 'Prix du SKU au moment de l''ajout',
    `quantite_sku` BIGINT COMMENT 'Quantité de SKU',
    `nom_sku` STRING COMMENT 'Nom du SKU (redondant)',
    `date_creation` STRING COMMENT 'Date d''ajout au panier',
    `date_modification` STRING COMMENT 'Date de dernière modification',
    `est_commande` STRING COMMENT 'Indique si l''article a été commandé',
    `date_commande` STRING COMMENT 'Date de la commande',
    `type_source` STRING COMMENT 'Type de source de l''ajout (ex: recherche, recommandation)',
    `id_source` STRING COMMENT 'ID de la source'
) COMMENT 'Table des articles dans le panier'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_paniers_complets/';

DROP TABLE IF EXISTS ods.db.ods_coupons_info_complets;
CREATE EXTERNAL TABLE ods.db.ods_coupons_info_complets(
    `id_coupon` STRING COMMENT 'Identifiant du coupon',
    `nom_coupon` STRING COMMENT 'Nom du coupon',
    `type_coupon` STRING COMMENT 'Type de coupon (1: cash, 2: réduction, 3: seuil-réduction)',
    `montant_condition` DECIMAL(16,2) COMMENT 'Montant minimum pour application',
    `quantite_condition` BIGINT COMMENT 'Nombre d''articles minimum pour application',
    `id_activite` STRING COMMENT 'Identifiant de l''activité associée',
    `montant_avantage` DECIMAL(16,2) COMMENT 'Montant de la réduction',
    `pourcentage_reduction` DECIMAL(16,2) COMMENT 'Pourcentage de réduction',
    `date_creation` STRING COMMENT 'Date de création du coupon',
    `type_portee` STRING COMMENT 'Portée du coupon (1: produit, 2: catégorie, 3: marque)',
    `limite_utilisation` BIGINT COMMENT 'Nombre maximal d''utilisations par utilisateur',
    `comptage_utilisations` BIGINT COMMENT 'Nombre total d''utilisations du coupon',
    `date_debut_validite` STRING COMMENT 'Date de début de validité',
    `date_fin_validite` STRING COMMENT 'Date de fin de validité',
    `date_modification` STRING COMMENT 'Date de dernière modification',
    `date_expiration` STRING COMMENT 'Date d''expiration du coupon'
) COMMENT 'Table des informations sur les coupons'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_coupons_info_complets/';

DROP TABLE IF EXISTS ods.db.ods_activites_info_complets;
CREATE EXTERNAL TABLE ods.db.ods_activites_info_complets(
    `id_activite` STRING COMMENT 'Identifiant de l''activité',
    `nom_activite` STRING  COMMENT 'Nom de l''activité promotionnelle',
    `type_activite` STRING  COMMENT 'Type de l''activité',
    `date_debut` STRING  COMMENT 'Date de début de l''activité',
    `date_fin` STRING  COMMENT 'Date de fin de l''activité',
    `date_creation` STRING  COMMENT 'Date de création de l''enregistrement d''activité'
) COMMENT 'Table des informations sur les activités promotionnelles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_activites_info_complets/';

DROP TABLE IF EXISTS ods.db.ods_regles_activites_complets;
CREATE EXTERNAL TABLE ods.db.ods_regles_activites_complets(
    `id_regle` STRING COMMENT 'Identifiant de la règle',
    `id_activite` STRING  COMMENT 'ID de l''activité associée',
    `type_activite` STRING COMMENT 'Type de l''activité (redondant)',
    `montant_condition` DECIMAL(16,2) COMMENT 'Montant conditionnel pour la réduction',
    `quantite_condition` BIGINT COMMENT 'Quantité conditionnelle pour la réduction',
    `montant_avantage` DECIMAL(16,2) COMMENT 'Montant de l''avantage offert',
    `pourcentage_avantage` DECIMAL(16,2) COMMENT 'Pourcentage de l''avantage offert',
    `niveau_avantage` STRING COMMENT 'Niveau ou priorité de l''avantage'
) COMMENT 'Table des règles d''activités promotionnelles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_regles_activites_complets/';

DROP TABLE IF EXISTS ods.db.ods_dictionnaire_base_complet;
CREATE EXTERNAL TABLE ods.db.ods_dictionnaire_base_complet(
    `code_dictionnaire` STRING COMMENT 'Code de l''entrée du dictionnaire',
    `nom_dictionnaire` STRING COMMENT 'Nom de l''entrée du dictionnaire',
    `code_parent` STRING COMMENT 'Code parent de l''entrée',
    `date_creation` STRING COMMENT 'Date de création de l''entrée',
    `date_modification` STRING COMMENT 'Date de dernière modification'
) COMMENT 'Table du dictionnaire de codes de base'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_dictionnaire_base_complet/';

DROP TABLE IF EXISTS ods.db.ods_attributs_sku_complets;
CREATE EXTERNAL TABLE ods.db.ods_attributs_sku_complets(
    `id_attribut_sku` STRING COMMENT 'Identifiant de la valeur d''attribut SKU',
    `id_attribut_plateforme` STRING COMMENT 'ID de l''attribut de plateforme',
    `id_valeur_attribut` STRING COMMENT 'ID de la valeur d''attribut de plateforme',
    `identifiant_sku` STRING COMMENT 'ID du SKU',
    `nom_attribut` STRING COMMENT 'Nom de l''attribut de plateforme',
    `nom_valeur_attribut` STRING COMMENT 'Nom de la valeur d''attribut de plateforme'
) COMMENT 'Table des attributs de plateforme des SKU'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_attributs_sku_complets/';

DROP TABLE IF EXISTS ods.db.ods_attributs_vente_sku_complets;
CREATE EXTERNAL TABLE ods.db.ods_attributs_vente_sku_complets(
    `id_attribut_vente_sku` STRING COMMENT 'Identifiant de l''attribut de vente du SKU',
    `identifiant_sku` STRING COMMENT 'ID du SKU',
    `identifiant_spu` STRING COMMENT 'ID du SPU',
    `id_valeur_attribut_vente` STRING COMMENT 'ID de la valeur de l''attribut de vente',
    `id_attribut_vente` STRING COMMENT 'ID de l''attribut de vente',
    `nom_attribut_vente` STRING COMMENT 'Nom de l''attribut de vente',
    `nom_valeur_attribut_vente` STRING COMMENT 'Nom de la valeur de l''attribut de vente'
) COMMENT 'Table des attributs de vente des SKU'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_attributs_vente_sku_complets/';

DROP TABLE IF EXISTS ods.db.ods_ref_regions;
CREATE EXTERNAL TABLE ods.db.ods_ref_regions (
    `id_region` STRING COMMENT 'Identifiant de la région',
    `nom_region` STRING COMMENT 'Nom de la région'
)  COMMENT 'Table de référence des régions'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_ref_regions/';

DROP TABLE IF EXISTS ods.db.ods_utilisateurs_complets;
CREATE EXTERNAL TABLE ods.db.ods_utilisateurs_complets(
    `id_utilisateur` STRING COMMENT 'Identifiant unique de l''utilisateur',
    `nom_connexion` STRING COMMENT 'Nom de connexion de l''utilisateur',
    `surnom` STRING COMMENT 'Surnom de l''utilisateur',
    `mot_de_passe` STRING COMMENT 'Mot de passe haché de l''utilisateur',
    `nom_complet` STRING COMMENT 'Nom complet de l''utilisateur',
    `numero_telephone` STRING COMMENT 'Numéro de téléphone',
    `email` STRING COMMENT 'Adresse email',
    `url_avatar` STRING COMMENT 'URL de l''image d''avatar',
    `niveau_utilisateur` STRING COMMENT 'Niveau de l''utilisateur',
    `date_naissance` STRING COMMENT 'Date de naissance',
    `genre` STRING COMMENT 'Genre (M: masculin, F: féminin)',
    `date_creation` STRING COMMENT 'Date de création du compte',
    `date_modification` STRING COMMENT 'Date de dernière modification du compte',
    `statut_compte` STRING COMMENT 'Statut du compte (actif, inactif)'
) COMMENT 'Table des informations complètes sur les utilisateurs'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_utilisateurs_complets/';

DROP TABLE IF EXISTS ods.db.ods_commandes_complets;
CREATE EXTERNAL TABLE ods.db.ods_commandes_complets(
    `id_commande` BIGINT COMMENT 'Identifiant unique de la commande',
    `destinataire` STRING COMMENT 'Nom du destinataire',
    `telephone_destinataire` STRING COMMENT 'Numéro de téléphone du destinataire',
    `montant_total` DECIMAL(16,2) COMMENT 'Montant total de la commande',
    `statut_commande` STRING COMMENT 'Statut actuel de la commande',
    `id_utilisateur` BIGINT COMMENT 'ID de l''utilisateur ayant passé la commande',
    `mode_paiement` STRING COMMENT 'Méthode de paiement utilisée',
    `adresse_livraison` STRING COMMENT 'Adresse complète de livraison',
    `commentaire_commande` STRING COMMENT 'Commentaire laissé sur la commande',
    `numero_transaction_externe` STRING COMMENT 'Numéro de transaction pour paiement tiers',
    `description_transaction` STRING COMMENT 'Description de la transaction pour paiement tiers',
    `date_creation` STRING COMMENT 'Date de création de la commande',
    `date_modification` STRING COMMENT 'Date de dernière modification de la commande',
    `date_expiration` STRING COMMENT 'Date d''expiration de la commande',
    `statut_traitement` STRING COMMENT 'Statut du processus de commande',
    `numero_suivi` STRING COMMENT 'Numéro de suivi logistique',
    `id_commande_parente` BIGINT COMMENT 'ID de la commande parente (pour commandes divisées)',
    `url_image` STRING COMMENT 'URL de l''image principale de la commande',
    `id_province` INT COMMENT 'ID de la province de livraison',
    `montant_reduction_activite` DECIMAL(16,2) COMMENT 'Montant réduit par activité promotionnelle',
    `montant_reduction_coupon` DECIMAL(16,2) COMMENT 'Montant réduit par coupon',
    `montant_total_initial` DECIMAL(16,2) COMMENT 'Montant total avant réductions',
    `frais_port` DECIMAL(16,2) COMMENT 'Frais de livraison',
    `reduction_frais_port` DECIMAL(16,2) COMMENT 'Réduction sur les frais de livraison',
    `date_remboursable` STRING COMMENT 'Date à partir de laquelle un remboursement est possible'
) COMMENT 'Table des informations complètes sur les commandes'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_commandes_complets/';

DROP TABLE IF EXISTS ods.db.ods_utilisation_coupons_complets;
CREATE EXTERNAL TABLE ods.db.ods_utilisation_coupons_complets(
    `id_utilisation_coupon` BIGINT COMMENT 'Identifiant d''utilisation du coupon',
    `id_coupon` BIGINT COMMENT 'ID du coupon utilisé',
    `id_utilisateur` BIGINT COMMENT 'ID de l''utilisateur',
    `id_commande` BIGINT COMMENT 'ID de la commande où le coupon a été utilisé',
    `statut_coupon` STRING COMMENT 'Statut du coupon (1: non utilisé, 2: utilisé)',
    `date_obtention` STRING COMMENT 'Date d''obtention du coupon',
    `date_utilisation_tentative` STRING COMMENT 'Date de tentative d''utilisation',
    `date_utilisation_effective` STRING COMMENT 'Date de paiement après utilisation',
    `date_expiration` STRING COMMENT 'Date d''expiration du coupon'
) COMMENT 'Table d''historique d''utilisation des coupons'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_utilisation_coupons_complets/';

DROP TABLE IF EXISTS ods.db.ods_historique_statut_commande_complets;
CREATE EXTERNAL TABLE ods.db.ods_historique_statut_commande_complets(
    `id_log_statut` BIGINT COMMENT 'Identifiant du log de statut',
    `id_commande` BIGINT COMMENT 'ID de la commande',
    `statut_commande` STRING COMMENT 'Nouveau statut de la commande',
    `date_modification` STRING COMMENT 'Date et heure de la modification du statut'
) COMMENT 'Table d''historique des statuts de commande'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_historique_statut_commande_complets/';

DROP TABLE IF EXISTS ods.db.ods_details_commande_complets;
CREATE EXTERNAL TABLE ods.db.ods_details_commande_complets(
    `id_detail_commande` BIGINT COMMENT 'Identifiant du détail de la commande',
    `id_commande` BIGINT COMMENT 'ID de la commande parente',
    `identifiant_sku` BIGINT COMMENT 'ID du SKU',
    `nom_sku` STRING COMMENT 'Nom du SKU',
    `url_image` STRING COMMENT 'URL de l''image du produit (redondant)',
    `prix_commande` DECIMAL(16,2) COMMENT 'Prix d''achat du SKU au moment de la commande',
    `quantite_sku` STRING COMMENT 'Quantité de SKU achetée',
    `date_creation` STRING COMMENT 'Date de création du détail de commande',
    `type_source` STRING COMMENT 'Type de source (ex: promotion, normal)',
    `id_source` BIGINT COMMENT 'ID de la source',
    `montant_total_split` DECIMAL(16,2) COMMENT 'Montant final réparti',
    `montant_reduction_activite_split` DECIMAL(16,2) COMMENT 'Réduction d''activité répartie',
    `montant_reduction_coupon_split` DECIMAL(16,2) COMMENT 'Réduction de coupon répartie'
) COMMENT 'Table des détails de commande'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_details_commande_complets/';

DROP TABLE IF EXISTS ods.db.ods_paiements_info_complets;
CREATE EXTERNAL TABLE ods.db.ods_paiements_info_complets(
    `id_paiement` BIGINT COMMENT 'Identifiant du paiement',
    `numero_transaction_externe` STRING COMMENT 'Numéro de transaction externe',
    `id_commande` BIGINT COMMENT 'ID de la commande associée',
    `id_utilisateur` BIGINT COMMENT 'ID de l''utilisateur',
    `type_paiement` STRING COMMENT 'Type de paiement (ex: WeChat, Alipay)',
    `numero_transaction_interne` STRING COMMENT 'Numéro de transaction interne',
    `montant_total` DECIMAL(16,2) COMMENT 'Montant total payé',
    `sujet_transaction` STRING COMMENT 'Description du sujet de la transaction',
    `statut_paiement` STRING COMMENT 'Statut du paiement',
    `date_creation` STRING COMMENT 'Date de création de l''enregistrement de paiement',
    `date_rappel` STRING COMMENT 'Date de rappel (callback)',
    `contenu_rappel` STRING COMMENT 'Contenu du rappel (callback)'
) COMMENT 'Table des informations de paiement'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_paiements_info_complets/';

DROP TABLE IF EXISTS ods.db.ods_commentaires_info_complets;
CREATE EXTERNAL TABLE ods.db.ods_commentaires_info_complets(
    `id_commentaire` BIGINT COMMENT 'Identifiant du commentaire',
    `id_utilisateur` BIGINT COMMENT 'ID de l''utilisateur',
    `surnom_utilisateur` STRING COMMENT 'Surnom de l''utilisateur',
    `url_avatar_utilisateur` STRING COMMENT 'URL de l''avatar de l''utilisateur',
    `identifiant_sku` BIGINT COMMENT 'ID du SKU commenté',
    `identifiant_spu` BIGINT COMMENT 'ID du SPU commenté',
    `id_commande` BIGINT COMMENT 'ID de la commande associée',
    `evaluation` STRING COMMENT 'Évaluation (1: bon, 2: moyen, 3: mauvais)',
    `texte_commentaire` STRING COMMENT 'Contenu textuel du commentaire',
    `date_creation` STRING COMMENT 'Date de création du commentaire',
    `date_modification` STRING COMMENT 'Date de dernière modification du commentaire'
) COMMENT 'Table des commentaires de produits'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_commentaires_info_complets/';

DROP TABLE IF EXISTS ods.db.ods_remboursements_info_complets;
CREATE EXTERNAL TABLE ods.db.ods_remboursements_info_complets(
    `id_remboursement` BIGINT COMMENT 'Identifiant du remboursement',
    `id_utilisateur` BIGINT COMMENT 'ID de l''utilisateur',
    `id_commande` BIGINT COMMENT 'ID de la commande',
    `identifiant_sku` BIGINT COMMENT 'ID du SKU remboursé',
    `type_remboursement` STRING COMMENT 'Type de demande de remboursement',
    `quantite_remboursee` BIGINT COMMENT 'Quantité d''articles remboursés',
    `montant_rembourse` DECIMAL(16,2) COMMENT 'Montant total remboursé',
    `type_raison_remboursement` STRING COMMENT 'Type de raison du remboursement',
    `texte_raison_remboursement` STRING COMMENT 'Contenu textuel de la raison',
    `statut_remboursement` STRING COMMENT 'Statut du remboursement (0: en attente, 1: remboursé)',
    `date_creation` STRING COMMENT 'Date de création de la demande'
) COMMENT 'Table des demandes de remboursement de commande'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_remboursements_info_complets/';

DROP TABLE IF EXISTS ods.db.ods_details_commande_activite_complets;
CREATE EXTERNAL TABLE ods.db.ods_details_commande_activite_complets(
    `id_detail_activite` BIGINT COMMENT 'Identifiant du lien détail-activité',
    `id_commande` BIGINT COMMENT 'ID de la commande',
    `id_detail_commande` BIGINT COMMENT 'ID du détail de commande',
    `id_activite` BIGINT COMMENT 'ID de l''activité promotionnelle',
    `id_regle_activite` BIGINT COMMENT 'ID de la règle d''activité appliquée',
    `identifiant_sku` BIGINT COMMENT 'ID du SKU',
    `date_creation` STRING COMMENT 'Date de création du lien'
) COMMENT 'Table des associations détails commande-activité'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_details_commande_activite_complets/';

DROP TABLE IF EXISTS ods.db.ods_details_commande_coupon_complets;
CREATE EXTERNAL TABLE ods.db.ods_details_commande_coupon_complets(
    `id_detail_coupon` BIGINT COMMENT 'Identifiant du lien détail-coupon',
    `id_commande` BIGINT COMMENT 'ID de la commande',
    `id_detail_commande` BIGINT COMMENT 'ID du détail de commande',
    `id_coupon` BIGINT COMMENT 'ID du coupon',
    `id_utilisation_coupon` BIGINT COMMENT 'ID de l''utilisation du coupon',
    `identifiant_sku` BIGINT COMMENT 'ID du SKU',
    `date_creation` STRING COMMENT 'Date de création du lien'
) COMMENT 'Table des associations détails commande-coupon'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_details_commande_coupon_complets/';

DROP TABLE IF EXISTS ods.db.ods_paiements_remboursement_complets;
CREATE EXTERNAL TABLE ods.db.ods_paiements_remboursement_complets(
    `id_paiement_remboursement` BIGINT COMMENT 'Identifiant du paiement de remboursement',
    `numero_transaction_externe` STRING COMMENT 'Numéro de transaction externe',
    `id_commande` BIGINT COMMENT 'ID de la commande',
    `identifiant_sku` BIGINT COMMENT 'ID du SKU',
    `type_paiement` STRING COMMENT 'Type de paiement (ex: WeChat, Alipay)',
    `numero_transaction_interne` STRING COMMENT 'Numéro de transaction interne',
    `montant_total` DECIMAL(16,2) COMMENT 'Montant total remboursé',
    `sujet_transaction` STRING COMMENT 'Description du sujet de la transaction',
    `statut_remboursement` STRING COMMENT 'Statut du remboursement',
    `date_creation` STRING COMMENT 'Date de création de l''enregistrement de remboursement',
    `date_rappel` STRING COMMENT 'Date de rappel (callback)',
    `contenu_rappel` STRING COMMENT 'Contenu du rappel (callback)'
) COMMENT 'Table des informations de paiement de remboursement'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_paiements_remboursement_complets/';

Définitions des Tables Métier Incrémentielles


DROP TABLE IF EXISTS ods.db.ods_commandes_inc;
CREATE EXTERNAL TABLE ods.db.ods_commandes_inc (
    `operation_type` STRING COMMENT 'Type d''opération (INSERT, UPDATE, DELETE)',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification (epoch milliseconds)',
    `nouvelles_donnees` STRUCT<
        id_commande :STRING,
        destinataire :STRING,
        telephone_destinataire :STRING,
        montant_total :DECIMAL(16, 2),
        statut_commande :STRING,
        id_utilisateur:STRING,
        mode_paiement :STRING,
        adresse_livraison :STRING,
        commentaire_commande :STRING,
        numero_transaction_externe :STRING,
        description_transaction:STRING,
        date_creation :STRING,
        date_modification :STRING,
        date_expiration :STRING,
        statut_traitement :STRING,
        numero_suivi:STRING,
        id_commande_parente :STRING,
        url_image :STRING,
        id_province :STRING,
        montant_reduction_activite:DECIMAL(16, 2),
        montant_reduction_coupon :DECIMAL(16, 2),
        montant_total_initial :DECIMAL(16, 2),
        frais_port:DECIMAL(16, 2),
        reduction_frais_port :DECIMAL(16, 2),
        date_remboursable :DECIMAL(16, 2)
    > COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
) COMMENT 'Table des commandes - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_commandes_inc/';

DROP TABLE IF EXISTS ods.db.ods_utilisation_coupons_inc;
CREATE EXTERNAL TABLE ods.db.ods_utilisation_coupons_inc(
    `operation_type` STRING COMMENT 'Type d''opération',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification',
    `nouvelles_donnees` STRUCT<
        id_utilisation_coupon :STRING, 
        id_coupon :STRING,
        id_utilisateur :STRING,
        id_commande :STRING,
        statut_coupon :STRING,
        date_obtention :STRING,
        date_utilisation_tentative:STRING,
        date_utilisation_effective :STRING,
        date_expiration :STRING, 
        date_creation :STRING,
        date_modification :STRING
    > COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
) COMMENT 'Table d''utilisation des coupons - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_utilisation_coupons_inc/';

DROP TABLE IF EXISTS ods.db.ods_historique_statut_commande_inc;
CREATE EXTERNAL TABLE ods.db.ods_historique_statut_commande_inc (
    `operation_type` STRING COMMENT 'Type d''opération',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification',
    `nouvelles_donnees` STRUCT<
        id_log_statut :STRING,
        id_commande :STRING,
        statut_commande :STRING,
        date_creation :STRING,
        date_modification :STRING
    >  COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
)  COMMENT 'Table d''historique des statuts de commande - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_historique_statut_commande_inc/';

DROP TABLE IF EXISTS ods.db.ods_utilisateurs_inc;
CREATE EXTERNAL TABLE ods.db.ods_utilisateurs_inc(
    `operation_type` STRING COMMENT 'Type d''opération',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification',
    `nouvelles_donnees` STRUCT<
        id_utilisateur :STRING,
        nom_connexion :STRING,
        surnom :STRING,
        mot_de_passe :STRING,
        nom_complet :STRING,
        numero_telephone :STRING,
        email:STRING,
        url_avatar :STRING,
        niveau_utilisateur :STRING,
        date_naissance :STRING,
        genre :STRING,
        date_creation :STRING,
        date_modification:STRING,
        statut_compte :STRING
    > COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
) COMMENT 'Table des utilisateurs - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_utilisateurs_inc/';

DROP TABLE IF EXISTS ods.db.ods_details_commande_inc;
CREATE EXTERNAL TABLE ods.db.ods_details_commande_inc(
    `operation_type` STRING COMMENT 'Type d''opération',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification',
    `nouvelles_donnees` STRUCT<
        id_detail_commande :STRING,
        id_commande :STRING,
        identifiant_sku :STRING,
        nom_sku :STRING,
        url_image :STRING,
        prix_commande:DECIMAL(16, 2),
        quantite_sku :BIGINT,
        date_creation :STRING,
        type_source :STRING,
        id_source :STRING,
        montant_total_split:DECIMAL(16, 2),
        montant_reduction_activite_split :DECIMAL(16, 2),
        montant_reduction_coupon_split:DECIMAL(16, 2),
        date_modification :STRING
    > COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
) COMMENT 'Table des détails de commande - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_details_commande_inc/';

DROP TABLE IF EXISTS ods.db.ods_paiements_info_inc;
CREATE EXTERNAL TABLE ods.db.ods_paiements_info_inc(
    `operation_type` STRING COMMENT 'Type d''opération',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification',
    `nouvelles_donnees` STRUCT<
        id_paiement :STRING,
        numero_transaction_externe :STRING,
        id_commande :STRING,
        id_utilisateur :STRING,
        type_paiement :STRING,
        numero_transaction_interne:STRING,
        montant_total :DECIMAL(16, 2),
        sujet_transaction :STRING,
        statut_paiement :STRING,
        date_creation :STRING,
        date_rappel:STRING,
        contenu_rappel :STRING,
        date_modification :STRING
    > COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
)  COMMENT 'Table des paiements - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_paiements_info_inc/';

DROP TABLE IF EXISTS ods.db.ods_commentaires_info_inc;
CREATE EXTERNAL TABLE ods.db.ods_commentaires_info_inc(
    `operation_type` STRING COMMENT 'Type d''opération',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification',
    `nouvelles_donnees` STRUCT<
        id_commentaire :STRING,
        id_utilisateur :STRING,
        surnom_utilisateur :STRING,
        url_avatar_utilisateur :STRING,
        identifiant_sku :STRING,
        identifiant_spu :STRING,
        id_commande :STRING,
        evaluation :STRING,
        texte_commentaire :STRING,
        date_creation :STRING,
        date_modification :STRING
    > COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
) COMMENT 'Table des commentaires de produits - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_commentaires_info_inc/';

DROP TABLE IF EXISTS ods.db.ods_remboursements_info_inc;
CREATE EXTERNAL TABLE ods.db.ods_remboursements_info_inc(
    `operation_type` STRING COMMENT 'Type d''opération',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification',
    `nouvelles_donnees` STRUCT<
        id_remboursement :STRING,
        id_utilisateur :STRING,
        id_commande :STRING,
        identifiant_sku :STRING,
        type_remboursement :STRING,
        quantite_remboursee :BIGINT,
        montant_rembourse:DECIMAL(16, 2),
        type_raison_remboursement :STRING,
        texte_raison_remboursement :STRING,
        statut_remboursement :STRING,
        date_creation:STRING,
        date_modification :STRING
    > COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
) COMMENT 'Table des demandes de remboursement - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_remboursements_info_inc/';

DROP TABLE IF EXISTS ods.db.ods_details_commande_activite_inc;
CREATE EXTERNAL TABLE ods.db.ods_details_commande_activite_inc(
    `operation_type` STRING COMMENT 'Type d''opération',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification',
    `nouvelles_donnees` STRUCT<
        id_detail_activite :STRING,
        id_commande :STRING,
        id_detail_commande :STRING,
        id_activite :STRING,
        id_regle_activite :STRING,
        identifiant_sku:STRING,
        date_creation :STRING, 
        date_modification :STRING
    > COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
) COMMENT 'Table des associations détails commande-activité - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_details_commande_activite_inc/';

DROP TABLE IF EXISTS ods.db.ods_details_commande_coupon_inc;
CREATE EXTERNAL TABLE ods.db.ods_details_commande_coupon_inc(
    `operation_type` STRING COMMENT 'Type d''opération',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification',
    `nouvelles_donnees` STRUCT<
        id_detail_coupon :STRING,
        id_commande :STRING,
        id_detail_commande :STRING,
        id_coupon :STRING,
        id_utilisation_coupon :STRING,
        identifiant_sku:STRING,
        date_creation :STRING, 
        date_modification :STRING
    > COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
) COMMENT 'Table des associations détails commande-coupon - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_details_commande_coupon_inc/';

DROP TABLE IF EXISTS ods.db.ods_paiements_remboursement_inc;
CREATE EXTERNAL TABLE ods.db.ods_paiements_remboursement_inc(
    `operation_type` STRING COMMENT 'Type d''opération',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification',
    `nouvelles_donnees` STRUCT<
        id_paiement_remboursement :STRING,
        numero_transaction_externe :STRING,
        id_commande :STRING,
        identifiant_sku :STRING,
        type_paiement :STRING,
        numero_transaction_interne :STRING,
        montant_total:DECIMAL(16, 2),
        sujet_transaction :STRING,
        statut_remboursement :STRING,
        date_creation :STRING,
        date_rappel :STRING,
        contenu_rappel:STRING,
        date_modification :STRING
    > COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
)  COMMENT 'Table des paiements de remboursement - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_paiements_remboursement_inc/';

CREATE EXTERNAL TABLE IF NOT EXISTS ods.db.ods_paniers_inc (
    `operation_type` STRING COMMENT 'Type d''opération',
    `timestamp_chg`   BIGINT COMMENT 'Horodatage de la modification',
    `nouvelles_donnees` STRUCT<
        id_panier :STRING,
        id_utilisateur :STRING,
        identifiant_sku :STRING,
        prix_panier :DECIMAL(16, 2),
        quantite_sku :BIGINT,
        url_image :STRING,
        nom_sku :STRING,
        est_selectionne :STRING,
        date_creation :STRING,
        date_modification :STRING,
        est_commande :STRING,
        date_commande:STRING,
        type_source:STRING,
        id_source:BIGINT
    > COMMENT 'Nouvelles valeurs des champs',
    `anciennes_donnees`  MAP<STRING,STRING> COMMENT 'Anciennes valeurs des champs modifiés'
) COMMENT 'Table du panier - Données incrémentielles'
PARTITIONED BY (`jour_chargement` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_paniers_inc/';

Définition de la Table des Logs de Trafic


DROP TABLE IF EXISTS ods.db.ods_logs_trafic_inc;
CREATE EXTERNAL TABLE ods.db.ods_logs_trafic_inc (
    `contenu_journal` STRING COMMENT 'Ligne brute du journal d''événement'
) COMMENT 'Table des journaux de trafic - Données incrémentielles'
PARTITIONED BY (`date_journal` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/data_warehouse/ods.db/ods_logs_trafic_inc/';

Processus de Chargement des Données

Le chargement des données depuis HDFS vers les tables Hive ODS est une opération régulière, à l'exception des tables de dimension statiques telles que les provinces et les régions, qui ne nécessitent qu'une importation initiale unique. Toutes les autres tables métier, qu'elles soient complètes ou incrémentielles, sont mises à jour quotidiennement.

Chargement Initial des Tables de Référence (Provinces et Régions)

Ces tables, dont le contenu est généralement stable, sont chargées une seule fois après leur création.


hive -e "LOAD DATA INPATH '/data_warehouse/sources_brutes/regions_administratives/2023-12-02' OVERWRITE INTO TABLE ods.db.ods_ref_provinces;"
hive -e "LOAD DATA INPATH '/data_warehouse/sources_brutes/zones_geographiques/2023-12-02' OVERWRITE INTO TABLE ods.db.ods_ref_regions;"

Chargement Quotidien des Données Métier

Il est crucial que cette étape s'exécute uniquement après la finalisation réussie de la synchronisation des données depuis les bases de données opérationnelles vers HDFS, généralement effectuée par des outils comme Sqoop. L'ordonnancement avec crontab ne gère pas nativement les dépendances de tâches, ce qui nécessite des mécanismes supplémentaires pour assurer l'intégrité du processus, comme la vérification de l'existence de fichiers marqueurs ou l'intégration dans un orchestrateur de workflows comme Apache Airflow ou Oozie.

(1) Développement du script d'ingestion

Création du script shell charger_donnees_ods.sh pour automatiser le chargement des données quotidiennes dans Hive.


#!/bin/bash

BASE_ODS="ods.db" # Base de données ODS cible

# Détermine la date de traitement. Utilise le premier argument ou la veille par défaut.
if [ -n "$1" ] ;then
    DATE_DE_TRAITEMENT=$1
else 
    DATE_DE_TRAITEMENT=$(date -d "-1 day" +%F)
fi

echo "================== Démarrage du chargement pour la date: ${DATE_DE_TRAITEMENT} =================="

# Fonctions de chargement pour les tables "full"
charger_table_complete() {
    NOM_TABLE_HIVE="$1"
    CHEMIN_HDFS_SOURCE="$2"
    echo "Chargement de la table complète: ${NOM_TABLE_HIVE} pour la date ${DATE_DE_TRAITEMENT}"
    hive -e "LOAD DATA INPATH '${CHEMIN_HDFS_SOURCE}/${DATE_DE_TRAITEMENT}' OVERWRITE INTO TABLE ${BASE_ODS}.${NOM_TABLE_HIVE} PARTITION(jour_chargement='${DATE_DE_TRAITEMENT}');"
    if [ $? -ne 0 ]; then
        echo "ERREUR: Échec du chargement de ${NOM_TABLE_HIVE}"
        exit 1
    fi
}

# Fonctions de chargement pour les tables "incremental"
charger_table_incrementale() {
    NOM_TABLE_HIVE="$1"
    CHEMIN_HDFS_SOURCE="$2"
    echo "Chargement de la table incrémentielle: ${NOM_TABLE_HIVE} pour la date ${DATE_DE_TRAITEMENT}"
    hive -e "LOAD DATA INPATH '${CHEMIN_HDFS_SOURCE}/${DATE_DE_TRAITEMENT}' OVERWRITE INTO TABLE ${BASE_ODS}.${NOM_TABLE_HIVE} PARTITION(jour_chargement='${DATE_DE_TRAITEMENT}');"
    if [ $? -ne 0 ]; then
        echo "ERREUR: Échec du chargement de ${NOM_TABLE_HIVE}"
        exit 1
    fi
}

# --- Définition des chemins HDFS sources pour les tables complètes ---
declare -A TABLES_COMPLETES
TABLES_COMPLETES["ods_produits_sku_complets"]="/data_warehouse/sources_brutes/sku_complets"
TABLES_COMPLETES["ods_categories_niveau1_complet"]="/data_warehouse/sources_brutes/cat1_complet"
TABLES_COMPLETES["ods_categories_niveau2_complet"]="/data_warehouse/sources_brutes/cat2_complet"
TABLES_COMPLETES["ods_categories_niveau3_complet"]="/data_warehouse/sources_brutes/cat3_complet"
TABLES_COMPLETES["ods_marques_complet"]="/data_warehouse/sources_brutes/marques_complet"
TABLES_COMPLETES["ods_produits_spu_complets"]="/data_warehouse/sources_brutes/spu_complets"
TABLES_COMPLETES["ods_favoris_complets"]="/data_warehouse/sources_brutes/favoris_complets"
TABLES_COMPLETES["ods_paniers_complets"]="/data_warehouse/sources_brutes/paniers_complets"
TABLES_COMPLETES["ods_coupons_info_complets"]="/data_warehouse/sources_brutes/coupons_info_complets"
TABLES_COMPLETES["ods_activites_info_complets"]="/data_warehouse/sources_brutes/activites_info_complets"
TABLES_COMPLETES["ods_regles_activites_complets"]="/data_warehouse/sources_brutes/regles_activites_complets"
TABLES_COMPLETES["ods_dictionnaire_base_complet"]="/data_warehouse/sources_brutes/dictionnaire_base_complet"
TABLES_COMPLETES["ods_attributs_sku_complets"]="/data_warehouse/sources_brutes/attributs_sku_complets"
TABLES_COMPLETES["ods_attributs_vente_sku_complets"]="/data_warehouse/sources_brutes/attributs_vente_sku_complets"
TABLES_COMPLETES["ods_utilisateurs_complets"]="/data_warehouse/sources_brutes/utilisateurs_complets"
TABLES_COMPLETES["ods_commandes_complets"]="/data_warehouse/sources_brutes/commandes_complets"
TABLES_COMPLETES["ods_utilisation_coupons_complets"]="/data_warehouse/sources_brutes/utilisation_coupons_complets"
TABLES_COMPLETES["ods_historique_statut_commande_complets"]="/data_warehouse/sources_brutes/historique_statut_commande_complets"
TABLES_COMPLETES["ods_details_commande_complets"]="/data_warehouse/sources_brutes/details_commande_complets"
TABLES_COMPLETES["ods_paiements_info_complets"]="/data_warehouse/sources_brutes/paiements_info_complets"
TABLES_COMPLETES["ods_commentaires_info_complets"]="/data_warehouse/sources_brutes/commentaires_info_complets"
TABLES_COMPLETES["ods_remboursements_info_complets"]="/data_warehouse/sources_brutes/remboursements_info_complets"
TABLES_COMPLETES["ods_details_commande_activite_complets"]="/data_warehouse/sources_brutes/details_commande_activite_complets"
TABLES_COMPLETES["ods_details_commande_coupon_complets"]="/data_warehouse/sources_brutes/details_commande_coupon_complets"
TABLES_COMPLETES["ods_paiements_remboursement_complets"]="/data_warehouse/sources_brutes/paiements_remboursement_complets"


# --- Définition des chemins HDFS sources pour les tables incrémentielles ---
declare -A TABLES_INCREMENTIELLES
TABLES_INCREMENTIELLES["ods_commandes_inc"]="/data_warehouse/sources_brutes/commandes_inc"
TABLES_INCREMENTIELLES["ods_utilisation_coupons_inc"]="/data_warehouse/sources_brutes/utilisation_coupons_inc"
TABLES_INCREMENTIELLES["ods_historique_statut_commande_inc"]="/data_warehouse/sources_brutes/historique_statut_commande_inc"
TABLES_INCREMENTIELLES["ods_utilisateurs_inc"]="/data_warehouse/sources_brutes/utilisateurs_inc"
TABLES_INCREMENTIELLES["ods_details_commande_inc"]="/data_warehouse/sources_brutes/details_commande_inc"
TABLES_INCREMENTIELLES["ods_paiements_info_inc"]="/data_warehouse/sources_brutes/paiements_info_inc"
TABLES_INCREMENTIELLES["ods_commentaires_info_inc"]="/data_warehouse/sources_brutes/commentaires_info_inc"
TABLES_INCREMENTIELLES["ods_remboursements_info_inc"]="/data_warehouse/sources_brutes/remboursements_info_inc"
TABLES_INCREMENTIELLES["ods_details_commande_activite_inc"]="/data_warehouse/sources_brutes/details_commande_activite_inc"
TABLES_INCREMENTIELLES["ods_details_commande_coupon_inc"]="/data_warehouse/sources_brutes/details_commande_coupon_inc"
TABLES_INCREMENTIELLES["ods_paiements_remboursement_inc"]="/data_warehouse/sources_brutes/paiements_remboursement_inc"
TABLES_INCREMENTIELLES["ods_paniers_inc"]="/data_warehouse/sources_brutes/paniers_inc"


case "$2" in # Utilisation du deuxième argument pour le type de chargement
    "full_sku_info") charger_table_complete "ods_produits_sku_complets" "${TABLES_COMPLETES[ods_produits_sku_complets]}";;
    "full_cat1") charger_table_complete "ods_categories_niveau1_complet" "${TABLES_COMPLETES[ods_categories_niveau1_complet]}";;
    "full_cat2") charger_table_complete "ods_categories_niveau2_complet" "${TABLES_COMPLETES[ods_categories_niveau2_complet]}";;
    "full_cat3") charger_table_complete "ods_categories_niveau3_complet" "${TABLES_COMPLETES[ods_categories_niveau3_complet]}";;
    "full_trademark") charger_table_complete "ods_marques_complet" "${TABLES_COMPLETES[ods_marques_complet]}";;
    "full_spu_info") charger_table_complete "ods_produits_spu_complets" "${TABLES_COMPLETES[ods_produits_spu_complets]}";;
    "full_favor_info") charger_table_complete "ods_favoris_complets" "${TABLES_COMPLETES[ods_favoris_complets]}";;
    "full_cart_info") charger_table_complete "ods_paniers_complets" "${TABLES_COMPLETES[ods_paniers_complets]}";;
    "full_coupon_info") charger_table_complete "ods_coupons_info_complets" "${TABLES_COMPLETES[ods_coupons_info_complets]}";;
    "full_activity_info") charger_table_complete "ods_activites_info_complets" "${TABLES_COMPLETES[ods_activites_info_complets]}";;
    "full_activity_rule") charger_table_complete "ods_regles_activites_complets" "${TABLES_COMPLETES[ods_regles_activites_complets]}";;
    "full_base_dic") charger_table_complete "ods_dictionnaire_base_complet" "${TABLES_COMPLETES[ods_dictionnaire_base_complet]}";;
    "full_sku_attr") charger_table_complete "ods_attributs_sku_complets" "${TABLES_COMPLETES[ods_attributs_sku_complets]}";;
    "full_sku_sale_attr") charger_table_complete "ods_attributs_vente_sku_complets" "${TABLES_COMPLETES[ods_attributs_vente_sku_complets]}";;
    "full_user_info") charger_table_complete "ods_utilisateurs_complets" "${TABLES_COMPLETES[ods_utilisateurs_complets]}";;
    "full_order_info") charger_table_complete "ods_commandes_complets" "${TABLES_COMPLETES[ods_commandes_complets]}";;
    "full_coupon_use") charger_table_complete "ods_utilisation_coupons_complets" "${TABLES_COMPLETES[ods_utilisation_coupons_complets]}";;
    "full_order_status_log") charger_table_complete "ods_historique_statut_commande_complets" "${TABLES_COMPLETES[ods_historique_statut_commande_complets]}";;
    "full_order_detail") charger_table_complete "ods_details_commande_complets" "${TABLES_COMPLETES[ods_details_commande_complets]}";;
    "full_payment_info") charger_table_complete "ods_paiements_info_complets" "${TABLES_COMPLETES[ods_paiements_info_complets]}";;
    "full_comment_info") charger_table_complete "ods_commentaires_info_complets" "${TABLES_COMPLETES[ods_commentaires_info_complets]}";;
    "full_order_refund_info") charger_table_complete "ods_remboursements_info_complets" "${TABLES_COMPLETES[ods_remboursements_info_complets]}";;
    "full_order_detail_activity") charger_table_complete "ods_details_commande_activite_complets" "${TABLES_COMPLETES[ods_details_commande_activite_complets]}";;
    "full_order_detail_coupon") charger_table_complete "ods_details_commande_coupon_complets" "${TABLES_COMPLETES[ods_details_commande_coupon_complets]}";;
    "full_refund_payment") charger_table_complete "ods_paiements_remboursement_complets" "${TABLES_COMPLETES[ods_paiements_remboursement_complets]}";;
    
    "inc_order_info") charger_table_incrementale "ods_commandes_inc" "${TABLES_INCREMENTIELLES[ods_commandes_inc]}";;
    "inc_coupon_use") charger_table_incrementale "ods_utilisation_coupons_inc" "${TABLES_INCREMENTIELLES[ods_utilisation_coupons_inc]}";;
    "inc_order_status_log") charger_table_incrementale "ods_historique_statut_commande_inc" "${TABLES_INCREMENTIELLES[ods_historique_statut_commande_inc]}";;
    "inc_user_info") charger_table_incrementale "ods_utilisateurs_inc" "${TABLES_INCREMENTIELLES[ods_utilisateurs_inc]}";;
    "inc_order_detail") charger_table_incrementale "ods_details_commande_inc" "${TABLES_INCREMENTIELLES[ods_details_commande_inc]}";;
    "inc_payment_info") charger_table_incrementale "ods_paiements_info_inc" "${TABLES_INCREMENTIELLES[ods_paiements_info_inc]}";;
    "inc_comment_info") charger_table_incrementale "ods_commentaires_info_inc" "${TABLES_INCREMENTIELLES[ods_commentaires_info_inc]}";;
    "inc_order_refund_info") charger_table_incrementale "ods_remboursements_info_inc" "${TABLES_INCREMENTIELLES[ods_remboursements_info_inc]}";;
    "inc_order_detail_activity") charger_table_incrementale "ods_details_commande_activite_inc" "${TABLES_INCREMENTIELLES[ods_details_commande_activite_inc]}";;
    "inc_order_detail_coupon") charger_table_incrementale "ods_details_commande_coupon_inc" "${TABLES_INCREMENTIELLES[ods_details_commande_coupon_inc]}";;
    "inc_refund_payment") charger_table_incrementale "ods_paiements_remboursement_inc" "${TABLES_INCREMENTIELLES[ods_paiements_remboursement_inc]}";;
    "inc_cart_info") charger_table_incrementale "ods_paniers_inc" "${TABLES_INCREMENTIELLES[ods_paniers_inc]}";;

    "all"){
        echo "Chargement de toutes les tables pour la date ${DATE_DE_TRAITEMENT}"
        for table_name in "${!TABLES_COMPLETES[@]}"; do
            charger_table_complete "${table_name}" "${TABLES_COMPLETES[${table_name}]}"
        done
        for table_name in "${!TABLES_INCREMENTIELLES[@]}"; do
            charger_table_incrementale "${table_name}" "${TABLES_INCREMENTIELLES[${table_name}]}"
        done
    };;
    *)
        echo "Usage: $0 [date YYYY-MM-DD] {<type_chargement>|all}"
        echo "Exemples: $0 2023-01-01 full_sku_info"
        echo "          $0 all"
        exit 1
    ;;
esac

(2) Attribution des permissions d'exécution

chmod +x /home/user/bin/charger_donnees_ods.sh

(3) Planification de la tâche via Crontab

Pour assurer le chargement quotidien des données métier, une tâche est planifiée pour s'exécuter chaque jour à 3h00 du matin.


# Chargement quotidien des données métier dans ODS à 3h00 du matin
0 3 * * * /home/user/bin/charger_donnees_ods.sh all

Chargement des Données de Log de Trafic

Le chargement des logs est planifié pour s'exécuter à 3h00 du matin, afin de s'assurer que toutes les données de log de la veille ont été entièrement collectées et transférées vers HDFS. Cette synchronisation matinale garantit que les données partitionnées du jour précédent sont disponibles pour l'analyse dans l'entrepôt dès le début de la journée ouvrable.

(1) Développement du script d'ingestion de logs

Création du script shell charger_logs_trafic_ods.sh.


#!/bin/bash

# Détermine la date de traitement. Utilise le premier argument ou la veille par défaut.
if [ -n "$1" ] ;then
   DATE_DE_TRAITEMENT=$1
else 
   DATE_DE_TRAITEMENT=$(date -d "-1 day" +%F)
fi 

echo "================== Chargement des logs pour la date: ${DATE_DE_TRAITEMENT} =================="

SQL_LOAD_LOG="
LOAD DATA INPATH '/data_warehouse/logs_applicatifs/trafic_web/${DATE_DE_TRAITEMENT}' INTO TABLE ods.db.ods_logs_trafic_inc PARTITION(date_journal='${DATE_DE_TRAITEMENT}');
"

hive -e "${SQL_LOAD_LOG}"

if [ $? -ne 0 ]; then
    echo "ERREUR: Échec du chargement des logs pour ${DATE_DE_TRAITEMENT}"
    exit 1
fi

(2) Attribution des permissions d'exécution

chmod +x /home/user/bin/charger_logs_trafic_ods.sh

(3) Planification de la tâche via Crontab

Le script de chargement des logs est également planifié pour s'exécuter quotidiennement à 3h00 du matin.


# Chargement quotidien des logs de trafic dans ODS à 3h00 du matin
0 3 * * * /home/user/bin/charger_logs_trafic_ods.sh

Étiquettes: Hive hdfs ODS Ingestion de données ETL

Publié le 4 juillet à 16h54