Traitement par Lot Avancé des Métadonnées

Gestion Efficace des Métadonnées par Traitement de Masse

La gestion des actifs numériques, qu'il s'agisse d'images, de vidéos ou de documents, exige souvent des opérations répétitives sur de grandes quantités de fichiers. L'automatisation de ces tâches via des traitements par lot est cruciale pour l'efficacité, la cohérence et la scalabilité. Cet article explore la conception et l'implémentation de systèmes de traitement par lot robustes, capables de manipuler les métadonnées de manière performante et fiable.

Principes Fondamentaux et Architectures

Un système de traitement par lot efficace doit être conçu avec des principes clairs pour la gestion des tâches, l'optimisation des performances et la résilience face aux erreurs. Voici les aspects clés :

  1. Design du Traitement de Masse :
    • Définir des principes de conception clairs pour les opérations groupées.
    • Établir des stratégies do'ptimisation des performances (parallélisation, gestion mémoire).
    • Intégrer des mécanismes de gestion des erreurs et de récupération.
    • Mettre en place un suivi de l'avancement et des retours utilisateur.
  2. Optimisation des Performances :
    • Utiliser des techniques de traitement parallèle.
    • Optimiser la gestion de la mémoire.
    • Améliorer l'accès au système de fichiers.
    • Appliquer des stratégies de cache pertinentes.
  3. Capacités de Niveau Entreprise :
    • Gérer le traitement de données à grande échelle.
    • Mettre en œuvre des architectures de traitement distribué.
    • Assurer l'ordonnancement et la gestion des tâches.
    • Déployer des systèmes de surveillance et de journalisation.
  4. Applications Pratiques :
    • Gestion automatisée des actifs numériques.
    • Organisation de médiathèques volumineuses.
    • Projets de migration de données.
    • Intégration dans des flux de travail automatisés.

Conception de l'Architecture de Traitmeent par Lot

L'architecture d'un système de traitement par lot se compose généralement de plusieurs modules interdépendants. Voici une vue d'ensemble des composants centraux nécessaires à la gestion, l'exécution et le suivi des tâches.

Le code Python suivant implémente une structure de base pour un gestionnaire de traitement par lot, incluant la définition des tâches, des résultats, des modes de traitement et un mécanisme de surveillance.


import asyncio
import concurrent.futures
import multiprocessing
import threading
import time
import json
import logging
from typing import Dict, List, Any, Optional, Callable, Union, Tuple
from pathlib import Path
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import queue
import psutil
import hashlib

# Définition des états possibles pour une tâche
class StatutTache(Enum):
    ATTENTE = "en_attente"
    EN_COURS = "en_cours"
    TERMINE = "termine"
    ECHEC = "echec"
    ANNULE = "annule"
    REESSAI = "reessai"

# Définition des modes d'exécution du traitement
class ModeExecution(Enum):
    SEQUENTIEL = "sequentiel"     # Exécution une par une
    PARALLELE = "parallele"       # Exécution concurrente sur plusieurs threads
    ASYNC = "asynchrone"          # Exécution asynchrone (non bloquante)
    DISTRIBUE = "distribue"       # Exécution sur plusieurs nœuds

# Configuration d'une tâche individuelle
@dataclass
class ConfigurationTache:
    identifiant: str
    operation: str
    parametres: Dict[str, Any]
    priorite: int = 0
    tentatives_restantes: int = 3
    delai_expiration: int = 300
    dependances: List[str] = field(default_factory=list)
    metadonnees: Dict[str, Any] = field(default_factory=dict)

# Résultat d'une tâche traitée
@dataclass
class ResultatTraitement:
    identifiant: str
    statut: StatutTache
    heure_debut: datetime
    heure_fin: Optional[datetime] = None
    donnees_resultat: Optional[Dict[str, Any]] = None
    message_erreur: Optional[str] = None
    fichiers_traites: List[str] = field(default_factory=list)
    fichiers_echoues: List[str] = field(default_factory=list)
    metriques_performance: Dict[str, Any] = field(default_factory=dict)

# Classe de base pour un gestionnaire de traitement par lots
class GestionnaireLots:
    def __init__(self,
                 nombre_max_ouvriers: int = None,
                 mode_execution: ModeExecution = ModeExecution.PARALLELE,
                 taille_segment: int = 100,
                 limite_memoire_mo: int = 1024,  # MB
                 activer_surveillance: bool = True):

        self.nombre_max_ouvriers = nombre_max_ouvriers or multiprocessing.cpu_count()
        self.mode_execution = mode_execution
        self.taille_segment = taille_segment
        self.limite_memoire_octets = limite_memoire_mo * 1024 * 1024  # Convertir en octets
        self.activer_surveillance = activer_surveillance

        # Gestion des tâches
        self.file_attente_taches = queue.PriorityQueue()
        self.taches_actives = {}
        self.taches_terminees = {}
        self.taches_echec = {}

        # Suivi des performances
        self.statistiques_globales = {
            'total_taches': 0,
            'taches_terminees': 0,
            'taches_echec': 0,
            'temps_total_traitement': 0,
            'temps_moyen_traitement': 0,
            'debit_taches_par_sec': 0,
            'utilisation_memoire': 0,
            'utilisation_cpu': 0
        }

        # Fonctions de rappel
        self.callback_progression: Optional[Callable] = None
        self.callback_completion: Optional[Callable] = None
        self.callback_erreur: Optional[Callable] = None

        # Flags de contrôle
        self._evenement_arret = threading.Event()
        self._evenement_pause = threading.Event()
        self._evenement_pause.set()  # Initialement non en pause

        # Configuration du journal
        self.journal = self._initialiser_journaliseur()

        # Démarrage du thread de surveillance
        if self.activer_surveillance:
            self.thread_surveillance = threading.Thread(
                target=self._surveiller_performances, daemon=True
            )
            self.thread_surveillance.start()

    def _initialiser_journaliseur(self) -> logging.Logger:
        """Configure le journaliseur pour la classe."""
        journal = logging.getLogger(f'GestionnaireLots_{id(self)}')
        journal.setLevel(logging.INFO)

        if not journal.handlers:
            gestionnaire = logging.StreamHandler()
            formateur = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            gestionnaire.setFormatter(formateur)
            journal.addHandler(gestionnaire)
        return journal

    def ajouter_tache(self, config_tache: ConfigurationTache) -> bool:
        """Ajoute une tâche à la file d'attente prioritaire."""
        try:
            # File prioritaire: les nombres les plus bas ont la plus haute priorité
            priorite_ajustee = config_tache.priorite
            self.file_attente_taches.put((priorite_ajustee, time.time(), config_tache))
            self.statistiques_globales['total_taches'] += 1
            self.journal.info(f"Tâche ajoutée: {config_tache.identifiant}")
            return True
        except Exception as e:
            self.journal.error(f"Échec de l'ajout de tâche: {e}")
            return False

    def ajouter_plusieurs_taches(self, configurations: List[ConfigurationTache]) -> Dict[str, bool]:
        """Ajoute une liste de tâches au gestionnaire."""
        resultats_ajout = {}
        for config_tache in configurations:
            resultats_ajout[config_tache.identifiant] = self.ajouter_tache(config_tache)
        return resultats_ajout

    def demarrer_traitement(self) -> bool:
        """Démarre le processus de traitement des tâches."""
        self.journal.info(f"Démarrage du traitement par lot en mode: {self.mode_execution.value}")
        try:
            if self.mode_execution == ModeExecution.SEQUENTIEL:
                return self._traiter_sequentiellement()
            elif self.mode_execution == ModeExecution.PARALLELE:
                return self._traiter_en_parallele()
            elif self.mode_execution == ModeExecution.ASYNC:
                return asyncio.run(self._traiter_en_asynchrone())
            elif self.mode_execution == ModeExecution.DISTRIBUE:
                self.journal.error("Le mode distribué nécessite un implémentation spécifique (ex: Redis).")
                return False
            else:
                raise ValueError(f"Mode d'exécution non supporté: {self.mode_execution}")
        except Exception as e:
            self.journal.error(f"Échec du démarrage du traitement: {e}")
            return False

    def _traiter_sequentiellement(self) -> bool:
        """Exécute les tâches une par une."""
        try:
            while not self.file_attente_taches.empty() and not self._evenement_arret.is_set():
                self._evenement_pause.wait()  # Attendre si en pause
                try:
                    _, _, config_tache = self.file_attente_taches.get(timeout=1)
                    resultat = self._executer_tache(config_tache)
                    self._gerer_resultat_tache(resultat)
                except queue.Empty:
                    continue
                except Exception as e:
                    self.journal.error(f"Erreur lors de l'exécution séquentielle d'une tâche: {e}")
            return True
        except Exception as e:
            self.journal.error(f"Échec du traitement séquentiel: {e}")
            return False

    def _traiter_en_parallele(self) -> bool:
        """Exécute les tâches en utilisant un pool de threads."""
        try:
            with concurrent.futures.ThreadPoolExecutor(
                max_workers=self.nombre_max_ouvriers
            ) as executeur:
                futures_actifs = []
                while not self.file_attente_taches.empty() or futures_actifs:
                    self._evenement_pause.wait()
                    # Vérifier l'utilisation de la mémoire
                    if self._verifier_utilisation_memoire():
                        time.sleep(0.1)  # Attendre si la mémoire est pleine
                        continue

                    # Soumettre de nouvelles tâches si possible
                    while len(futures_actifs) < self.nombre_max_ouvriers and not self.file_attente_taches.empty():
                        try:
                            _, _, config_tache = self.file_attente_taches.get_nowait()
                            future = executeur.submit(self._executer_tache, config_tache)
                            futures_actifs.append(future)
                        except queue.Empty:
                            break
                        except Exception as e:
                            self.journal.error(f"Échec de la soumission d'une tâche parallèle: {e}")

                    # Traiter les tâches complétées
                    taches_terminees_dans_cycle = []
                    for future in concurrent.futures.as_completed(futures_actifs, timeout=1):
                        try:
                            resultat = future.result()
                            self._gerer_resultat_tache(resultat)
                            taches_terminees_dans_cycle.append(future)
                        except concurrent.futures.TimeoutError:
                            continue # Aucun futur terminé dans le délai
                        except Exception as e:
                            self.journal.error(f"Erreur lors de l'exécution d'une tâche parallèle: {e}")
                    
                    for future in taches_terminees_dans_cycle:
                        futures_actifs.remove(future)

                    if self._evenement_arret.is_set():
                        break # Arrêter si demandé
                    
                    time.sleep(0.01) # Petit délai pour éviter de boucler à vide
                
            return True
        except Exception as e:
            self.journal.error(f"Échec du traitement parallèle: {e}")
            return False

    async def _traiter_en_asynchrone(self) -> bool:
        """Exécute les tâches en mode asynchrone."""
        try:
            semaphore = asyncio.Semaphore(self.nombre_max_ouvriers)
            taches_asynchrones = []

            while not self.file_attente_taches.empty() and not self._evenement_arret.is_set():
                try:
                    _, _, config_tache = self.file_attente_taches.get_nowait()
                    tache_asynchrone = asyncio.create_task(
                        self._executer_tache_asynchrone(config_tache, semaphore)
                    )
                    taches_asynchrones.append(tache_asynchrone)
                except queue.Empty:
                    break
                except Exception as e:
                    self.journal.error(f"Échec de la création d'une tâche asynchrone: {e}")

            if taches_asynchrones:
                resultats = await asyncio.gather(*taches_asynchrones, return_exceptions=True)
                for res in resultats:
                    if isinstance(res, Exception):
                        self.journal.error(f"Exception dans une tâche asynchrone: {res}")
                    else:
                        self._gerer_resultat_tache(res)
            return True
        except Exception as e:
            self.journal.error(f"Échec du traitement asynchrone: {e}")
            return False

    async def _executer_tache_asynchrone(self,
                                         config_tache: ConfigurationTache,
                                         semaphore: asyncio.Semaphore) -> ResultatTraitement:
        """Exécute une tâche de manière asynchrone, en gérant le parallélisme."""
        async with semaphore:
            boucle = asyncio.get_event_loop()
            # Les tâches CPU-intensives devraient être exécutées dans un pool de threads
            return await boucle.run_in_executor(
                None, self._executer_tache, config_tache
            )

    def _executer_tache(self, config_tache: ConfigurationTache) -> ResultatTraitement:
        """Exécute une seule tâche de traitement."""
        debut_execution = datetime.now()
        resultat = ResultatTraitement(
            identifiant=config_tache.identifiant,
            statut=StatutTache.EN_COURS,
            heure_debut=debut_execution
        )
        self.taches_actives[config_tache.identifiant] = resultat
        self.journal.info(f"Exécution de la tâche: {config_tache.identifiant}")

        try:
            donnees_tache = self._processus_operation_tache(
                config_tache.operation,
                config_tache.parametres
            )
            resultat.statut = StatutTache.TERMINE
            resultat.heure_fin = datetime.now()
            resultat.donnees_resultat = donnees_tache
            resultat.metriques_performance = self._calculer_metriques_performance(
                debut_execution, resultat.heure_fin
            )
            self.journal.info(f"Tâche complétée: {config_tache.identifiant}")
        except Exception as e:
            resultat.statut = StatutTache.ECHEC
            resultat.heure_fin = datetime.now()
            resultat.message_erreur = str(e)
            self.journal.error(f"Tâche en échec: {config_tache.identifiant}, Erreur: {e}")

            # Logique de réessai
            if config_tache.tentatives_restantes > 0:
                config_tache.tentatives_restantes -= 1
                resultat.statut = StatutTache.REESSAI
                self.ajouter_tache(config_tache)  # Réajouter à la file d'attente
                self.journal.info(f"Réessai de la tâche: {config_tache.identifiant}, tentatives restantes: {config_tache.tentatives_restantes}")
        finally:
            if config_tache.identifiant in self.taches_actives:
                del self.taches_actives[config_tache.identifiant]
        return resultat

    def _processus_operation_tache(self, operation: str, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Méthode abstraite pour la logique de traitement d'une tâche, à implémenter par les sous-classes."""
        raise NotImplementedError("Les sous-classes doivent implémenter cette méthode.")

    def _gerer_resultat_tache(self, resultat: ResultatTraitement):
        """Traite le résultat d'une tâche, met à jour les statistiques et déclenche les callbacks."""
        if resultat.statut == StatutTache.TERMINE:
            self.taches_terminees[resultat.identifiant] = resultat
            self.statistiques_globales['taches_terminees'] += 1
            if self.callback_completion:
                self.callback_completion(resultat)
        elif resultat.statut == StatutTache.ECHEC:
            self.taches_echec[resultat.identifiant] = resultat
            self.statistiques_globales['taches_echec'] += 1
            if self.callback_erreur:
                self.callback_erreur(resultat)
        
        self._mettre_a_jour_statistiques_globales(resultat)
        if self.callback_progression:
            progression_actuelle = self._calculer_progression()
            self.callback_progression(progression_actuelle)

    def _calculer_metriques_performance(self,
                                         debut_execution: datetime,
                                         fin_execution: datetime) -> Dict[str, Any]:
        """Calcule les métriques de performance pour une tâche spécifique."""
        temps_traitement = (fin_execution - debut_execution).total_seconds()
        return {
            'temps_traitement_sec': temps_traitement,
            'utilisation_memoire_processus_octets': psutil.Process().memory_info().rss,
            'pourcentage_cpu_processus': psutil.cpu_percent(),
            'horodatage': fin_execution.isoformat()
        }

    def _mettre_a_jour_statistiques_globales(self, resultat: ResultatTraitement):
        """Met à jour les statistiques globales du gestionnaire."""
        if resultat.metriques_performance:
            temps_traitement = resultat.metriques_performance.get('temps_traitement_sec', 0)
            self.statistiques_globales['temps_total_traitement'] += temps_traitement
            completed = self.statistiques_globales['taches_terminees']
            if completed > 0:
                self.statistiques_globales['temps_moyen_traitement'] = (
                    self.statistiques_globales['temps_total_traitement'] / completed
                )
                total_time = self.statistiques_globales['temps_total_traitement']
                if total_time > 0:
                    self.statistiques_globales['debit_taches_par_sec'] = completed / total_time

    def _calculer_progression(self) -> Dict[str, Any]:
        """Calcule l'état d'avancement du traitement."""
        total = self.statistiques_globales['total_taches']
        terminees = self.statistiques_globales['taches_terminees']
        echec = self.statistiques_globales['taches_echec']
        actives = len(self.taches_actives)
        en_attente = self.file_attente_taches.qsize()
        pourcentage_progression = (terminees + echec) / total * 100 if total > 0 else 0
        return {
            'total_taches': total,
            'taches_terminees': terminees,
            'taches_echec': echec,
            'taches_actives': actives,
            'taches_en_attente': en_attente,
            'pourcentage_progression': pourcentage_progression,
            'statistiques_performance': self.statistiques_globales.copy()
        }

    def _verifier_utilisation_memoire(self) -> bool:
        """Vérifie si l'utilisation de la mémoire dépasse la limite configurée."""
        memoire_actuelle = psutil.Process().memory_info().rss
        return memoire_actuelle > self.limite_memoire_octets

    def _surveiller_performances(self):
        """Thread de surveillance pour collecter les métriques système et de traitement."""
        while not self._evenement_arret.is_set():
            try:
                self.statistiques_globales['utilisation_memoire'] = psutil.Process().memory_info().rss
                self.statistiques_globales['utilisation_cpu'] = psutil.cpu_percent()
                if self.statistiques_globales['total_taches'] > 0:
                    progression = self._calculer_progression()
                    self.journal.debug(f"Surveillance: {progression}")
                time.sleep(5)  # Surveiller toutes les 5 secondes
            except Exception as e:
                self.journal.error(f"Erreur dans le thread de surveillance: {e}")

    def suspendre_traitement(self):
        """Met en pause le traitement des tâches."""
        self._evenement_pause.clear()
        self.journal.info("Traitement suspendu.")

    def reprendre_traitement(self):
        """Reprend le traitement des tâches."""
        self._evenement_pause.set()
        self.journal.info("Traitement repris.")

    def arreter_traitement(self):
        """Arrête le traitement des tâches en cours et en attente."""
        self._evenement_arret.set()
        self._evenement_pause.set()  # Assurer que le traitement n'est pas bloqué si en pause
        self.journal.info("Traitement arrêté.")

    def obtenir_etat(self) -> Dict[str, Any]:
        """Retourne l'état actuel du gestionnaire de lots."""
        return {
            'en_cours': not self._evenement_arret.is_set(),
            'en_pause': not self._evenement_pause.is_set(),
            'progression': self._calculer_progression(),
            'taches_actives_ids': list(self.taches_actives.keys()),
            'statistiques_performance_generales': self.statistiques_globales.copy()
        }

    def obtenir_resultat_tache(self, identifiant_tache: str) -> Optional[ResultatTraitement]:
        """Récupère le résultat d'une tâche spécifique."""
        if identifiant_tache in self.taches_terminees:
            return self.taches_terminees[identifiant_tache]
        elif identifiant_tache in self.taches_echec:
            return self.taches_echec[identifiant_tache]
        elif identifiant_tache in self.taches_actives:
            return self.taches_actives[identifiant_tache]
        else:
            return None

    def effacer_taches_terminees(self):
        """Efface les tâches complétées et en échec de l'historique."""
        self.taches_terminees.clear()
        self.taches_echec.clear()
        self.journal.info("Historique des tâches terminées effacé.")

    def exporter_resultats(self, chemin_fichier: str) -> bool:
        """Exporte les résultats du traitement vers un fichier JSON."""
        try:
            resultats_exportables = {
                'taches_terminees': {
                    t_id: {
                        'id': res.identifiant,
                        'statut': res.statut.value,
                        'debut': res.heure_debut.isoformat(),
                        'fin': res.heure_fin.isoformat() if res.heure_fin else None,
                        'donnees': res.donnees_resultat,
                        'fichiers_traites': res.fichiers_traites,
                        'metriques': res.metriques_performance
                    }
                    for t_id, res in self.taches_terminees.items()
                },
                'taches_echec': {
                    t_id: {
                        'id': res.identifiant,
                        'statut': res.statut.value,
                        'debut': res.heure_debut.isoformat(),
                        'fin': res.heure_fin.isoformat() if res.heure_fin else None,
                        'erreur': res.message_erreur,
                        'fichiers_echoues': res.fichiers_echoues
                    }
                    for t_id, res in self.taches_echec.items()
                },
                'statistiques_generales': self.statistiques_globales,
                'heure_export': datetime.now().isoformat()
            }
            with open(chemin_fichier, 'w', encoding='utf-8') as f:
                json.dump(resultats_exportables, f, indent=2, ensure_ascii=False)
            self.journal.info(f"Résultats exportés vers: {chemin_fichier}")
            return True
        except Exception as e:
            self.journal.error(f"Échec de l'exportation des résultats: {e}")
            return False

Gestionnaire Spécialisé ExifTool

Pour des opérations spécifiques sur les métadonnées de fichiers multimédias, l'outil en ligne de commande ExifTool est un choix puissant. Le gestionnaire de lots peut être étendu pour intégrer cette fonctionnalité, permettant des actions telles que la lecture, l'écriture, la copie et la suppression de métadonnées, ainsi que des opérations de renommage et d'organisation de fichiers basées sur ces informations.


import os
import shutil
import subprocess
from typing import Dict, List, Any, Optional, Set
from pathlib import Path
import tempfile
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from dataclasses import dataclass

# Extension du gestionnaire de lots pour ExifTool
class ProcesseurExifToolEnLot(GestionnaireLots):
    """Processeur de lots dédié à l'utilisation d'ExifTool."""
    def __init__(self,
                 chemin_exiftool: str = 'exiftool',
                 **kwargs):
        super().__init__(**kwargs)
        self.chemin_exiftool = chemin_exiftool
        self.formats_supportes = {
            '.jpg', '.jpeg', '.png', '.tiff', '.tif', '.bmp', '.gif',
            '.raw', '.cr2', '.nef', '.arw', '.dng', '.orf', '.rw2',
            '.mp4', '.mov', '.avi', '.mkv', '.wmv', '.flv',
            '.pdf', '.eps', '.ai', '.psd'
        }
        self._verifier_exiftool()
        self.repertoire_temporaire = tempfile.mkdtemp(prefix='exiftool_batch_')
        self.fichiers_temporaires = set()
        self.verrou_temporaire = threading.Lock()

    def _verifier_exiftool(self):
        """Vérifie la disponibilité et la version d'ExifTool."""
        try:
            resultat = subprocess.run(
                [self.chemin_exiftool, '-ver'],
                capture_output=True,
                text=True,
                timeout=10,
                check=True # Lève une CalledProcessError si le code de retour est non nul
            )
            version = resultat.stdout.strip()
            self.journal.info(f"ExifTool version: {version}")
        except FileNotFoundError:
            raise Exception(f"ExifTool introuvable à {self.chemin_exiftool}. Assurez-vous qu'il est installé et dans le PATH.")
        except subprocess.CalledProcessError as e:
            raise Exception(f"Échec de l'exécution d'ExifTool: {e.stderr}")
        except Exception as e:
            raise Exception(f"Échec de la vérification d'ExifTool: {e}")

    def _processus_operation_tache(self, operation: str, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Dirige l'opération spécifique d'ExifTool."""
        operations_disponibles = {
            'lire_metadonnees': self._lire_metadonnees_lot,
            'ecrire_metadonnees': self._ecrire_metadonnees_lot,
            'copier_metadonnees': self._copier_metadonnees_lot,
            'supprimer_metadonnees': self._supprimer_metadonnees_lot,
            'renommer_fichiers': self._renommer_fichiers_lot,
            'organiser_fichiers': self._organiser_fichiers_lot,
            'convertir_format': self._convertir_format_lot,
            'valider_fichiers': self._valider_fichiers_lot,
            'extraire_vignettes': self._extraire_vignettes_lot,
            'generer_rapports': self._generer_apports_lot
        }
        if operation not in operations_disponibles:
            raise ValueError(f"Opération ExifTool non supportée: {operation}")
        return operations_disponibles[operation](parametres)

    def _lire_metadonnees_lot(self, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Lit les métadonnées d'un ensemble de fichiers."""
        fichiers = parametres.get('fichiers', [])
        balises = parametres.get('balises', [])
        format_sortie = parametres.get('format_sortie', 'json')

        if not fichiers:
            raise ValueError("Aucun fichier spécifié pour la lecture.")
        fichiers_valides = self._filtrer_fichiers_supportes(fichiers)
        if not fichiers_valides:
            return {'succes': False, 'erreur': 'Aucun format de fichier supporté trouvé.', 'traites': [], 'echoues': fichiers}

        try:
            commande = [self.chemin_exiftool]
            commande.extend([f'-{b}' for b in balises]) if balises else commande.append('-all')
            commande.append(f'-{format_sortie}') if format_sortie in ['json', 'csv'] else None
            commande.extend(fichiers_valides)

            resultat = subprocess.run(commande, capture_output=True, text=True, timeout=parametres.get('delai_expiration', 300), check=True)
            metadonnees = json.loads(resultat.stdout) if format_sortie == 'json' else resultat.stdout
            return {'succes': True, 'metadonnees': metadonnees, 'traites': fichiers_valides, 'echoues': [], 'commande_executee': ' '.join(commande)}
        except subprocess.CalledProcessError as e:
            return {'succes': False, 'erreur': e.stderr, 'traites': [], 'echoues': fichiers_valides}
        except Exception as e:
            return {'succes': False, 'erreur': str(e), 'traites': [], 'echoues': fichiers_valides}

    def _ecrire_metadonnees_lot(self, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Écrit des métadonnées dans un ensemble de fichiers."""
        fichiers = parametres.get('fichiers', [])
        metadonnees_a_ecrire = parametres.get('metadonnees', {})
        conserver_original = parametres.get('conserver_original', True) # Si False, utilise -overwrite_original
        remplacer_sur_place = parametres.get('remplacer_sur_place', False) # Si True, utilise -overwrite_original_in_place

        if not fichiers or not metadonnees_a_ecrire:
            raise ValueError("Fichiers ou métadonnées à écrire manquants.")
        fichiers_valides = self._filtrer_fichiers_supportes(fichiers)
        fichiers_traites, fichiers_echec = [], []

        try:
            commande_base = [self.chemin_exiftool]
            if not conserver_original:
                commande_base.append('-overwrite_original')
            if remplacer_sur_place:
                commande_base.append('-overwrite_original_in_place')
            
            for balise, valeur in metadonnees_a_ecrire.items():
                if isinstance(valeur, (list, tuple)):
                    for v in valeur:
                        commande_base.extend([f'-{balise}={v}'])
                else:
                    commande_base.extend([f'-{balise}={valeur}'])
            
            for segment in self._segmenter_fichiers(fichiers_valides, self.taille_segment):
                commande_segment = commande_base + segment
                resultat = subprocess.run(commande_segment, capture_output=True, text=True, timeout=parametres.get('delai_expiration', 300))
                if resultat.returncode == 0:
                    fichiers_traites.extend(segment)
                else:
                    fichiers_echec.extend(segment)
                    self.journal.error(f"Échec de l'écriture en lot: {resultat.stderr}")
            return {'succes': len(fichiers_traites) > 0, 'traites': fichiers_traites, 'echoues': fichiers_echec, 'total_traites': len(fichiers_traites)}
        except Exception as e:
            return {'succes': False, 'erreur': str(e), 'traites': fichiers_traites, 'echoues': fichiers_valides}

    def _copier_metadonnees_lot(self, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Copie des métadonnées entre fichiers."""
        fichiers_source = parametres.get('fichiers_source', [])
        fichiers_cible = parametres.get('fichiers_cible', [])
        balises = parametres.get('balises', ['all'])
        conserver_original = parametres.get('conserver_original', True)

        if len(fichiers_source) != len(fichiers_cible):
            raise ValueError("Les listes de fichiers source et cible doivent avoir la même taille.")

        paires_traitees, paires_echec = [], []
        try:
            for source, cible in zip(fichiers_source, fichiers_cible):
                if not Path(source).exists() or not Path(cible).exists():
                    paires_echec.append((source, cible))
                    continue
                commande = [self.chemin_exiftool]
                if not conserver_original:
                    commande.append('-overwrite_original')
                for balise in balises:
                    commande.extend(['-tagsFromFile', source, f'-{balise}'])
                commande.append(cible)

                resultat = subprocess.run(commande, capture_output=True, text=True, timeout=parametres.get('delai_expiration', 60))
                if resultat.returncode == 0:
                    paires_traitees.append((source, cible))
                else:
                    paires_echec.append((source, cible))
                    self.journal.error(f"Échec de la copie de métadonnées: {resultat.stderr}")
            return {'succes': len(paires_traitees) > 0, 'traites': paires_traitees, 'echoues': paires_echec}
        except Exception as e:
            return {'succes': False, 'erreur': str(e), 'traites': paires_traitees, 'echoues': paires_echec}

    def _supprimer_metadonnees_lot(self, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Supprime des métadonnées d'un ensemble de fichiers."""
        fichiers = parametres.get('fichiers', [])
        balises = parametres.get('balises', ['all'])
        conserver_original = parametres.get('conserver_original', True)
        
        fichiers_valides = self._filtrer_fichiers_supportes(fichiers)
        fichiers_traites, fichiers_echec = [], []

        try:
            commande_base = [self.chemin_exiftool]
            if not conserver_original:
                commande_base.append('-overwrite_original')
            
            for balise in balises:
                commande_base.append('-all=') if balise == 'all' else commande_base.append(f'-{balise}=')
            
            for segment in self._segmenter_fichiers(fichiers_valides, self.taille_segment):
                commande_segment = commande_base + segment
                resultat = subprocess.run(commande_segment, capture_output=True, text=True, timeout=parametres.get('delai_expiration', 300))
                if resultat.returncode == 0:
                    fichiers_traites.extend(segment)
                else:
                    fichiers_echec.extend(segment)
                    self.journal.error(f"Échec de la suppression en lot: {resultat.stderr}")
            return {'succes': len(fichiers_traites) > 0, 'traites': fichiers_traites, 'echoues': fichiers_echec, 'total_traites': len(fichiers_traites)}
        except Exception as e:
            return {'succes': False, 'erreur': str(e), 'traites': fichiers_traites, 'echoues': fichiers_valides}

    def _renommer_fichiers_lot(self, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Renomme des fichiers en lot en utilisant des métadonnées."""
        fichiers = parametres.get('fichiers', [])
        modele_nom = parametres.get('modele', '%f') # %f pour nom de fichier original, %d pour date, etc.
        mode_test = parametres.get('mode_test', False) # Si True, ne renomme pas, juste affiche le nouveau nom

        fichiers_valides = self._filtrer_fichiers_supportes(fichiers)
        fichiers_traites, fichiers_echec = [], []
        carte_renommage = {}

        try:
            commande = [self.chemin_exiftool, '-filename<' + modele_nom]
            if mode_test:
                commande.append('-testname')
            commande.extend(fichiers_valides)
            
            resultat = subprocess.run(commande, capture_output=True, text=True, timeout=parametres.get('delai_expiration', 300))
            if resultat.returncode == 0:
                for ligne in resultat.stdout.strip().split('\n'):
                    if '-->' in ligne:
                        ancien_nom, nouveau_nom = ligne.split(' --> ')
                        ancien_nom = ancien_nom.strip().strip("'")
                        nouveau_nom = nouveau_nom.strip().strip("'")
                        carte_renommage[ancien_nom] = nouveau_nom
                        fichiers_traites.append(ancien_nom)
                return {'succes': True, 'traites': fichiers_traites, 'echoues': [], 'carte_renommage': carte_renommage, 'mode_test': mode_test}
            else:
                return {'succes': False, 'erreur': resultat.stderr, 'traites': [], 'echoues': fichiers_valides}
        except Exception as e:
            return {'succes': False, 'erreur': str(e), 'traites': fichiers_traites, 'echoues': fichiers_valides}

    def _organiser_fichiers_lot(self, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Organise les fichiers en les déplaçant vers des répertoires basés sur leurs métadonnées."""
        repertoire_source = parametres.get('repertoire_source')
        repertoire_cible = parametres.get('repertoire_cible')
        modele_organisation = parametres.get('modele', '%Y/%m') # Ex: Année/Mois
        mode_copie = parametres.get('mode_copie', True) # True pour copier, False pour déplacer

        if not repertoire_source or not repertoire_cible:
            raise ValueError("Répertoire source ou cible non spécifié.")
        
        chemin_source = Path(repertoire_source)
        chemin_cible = Path(repertoire_cible)

        if not chemin_source.exists():
            raise ValueError(f"Le répertoire source n'existe pas: {repertoire_source}")
        chemin_cible.mkdir(parents=True, exist_ok=True)

        tous_fichiers_supportes = []
        for ext in self.formats_supportes:
            tous_fichiers_supportes.extend(chemin_source.rglob(f'*{ext}'))
            tous_fichiers_supportes.extend(chemin_source.rglob(f'*{ext.upper()}'))
        
        fichiers_traites, fichiers_echec = [], []
        carte_organisation = {}

        for fichier_path in tous_fichiers_supportes:
            try:
                commande_date = [self.chemin_exiftool, '-json', '-DateTimeOriginal', '-CreateDate', '-ModifyDate', str(fichier_path)]
                resultat_date = subprocess.run(commande_date, capture_output=True, text=True, timeout=30, check=True)
                metadonnees = json.loads(resultat_date.stdout)[0]
                
                date_str = metadonnees.get('DateTimeOriginal') or metadonnees.get('CreateDate') or metadonnees.get('ModifyDate')
                
                if date_str:
                    try:
                        date_obj = datetime.strptime(date_str.split()[0], '%Y:%m:%d')
                    except ValueError:
                        date_obj = datetime.fromtimestamp(fichier_path.stat().st_mtime)
                else:
                    date_obj = datetime.fromtimestamp(fichier_path.stat().st_mtime)
                
                chemin_relatif = date_obj.strftime(modele_organisation)
                repertoire_cible_fichier = chemin_cible / chemin_relatif
                repertoire_cible_fichier.mkdir(parents=True, exist_ok=True)
                
                chemin_cible_final = repertoire_cible_fichier / fichier_path.name
                compteur = 1
                original_cible = chemin_cible_final
                while chemin_cible_final.exists(): # Gérer les doublons de noms
                    stem = original_cible.stem
                    suffixe = original_cible.suffix
                    chemin_cible_final = original_cible.parent / f"{stem}_{compteur}{suffixe}"
                    compteur += 1
                
                if mode_copie:
                    shutil.copy2(fichier_path, chemin_cible_final)
                else:
                    shutil.move(str(fichier_path), str(chemin_cible_final))
                
                fichiers_traites.append(str(fichier_path))
                carte_organisation[str(fichier_path)] = str(chemin_cible_final)
            except Exception as e:
                fichiers_echec.append(str(fichier_path))
                self.journal.error(f"Échec de l'organisation du fichier {fichier_path}: {e}")
        
        return {'succes': len(fichiers_traites) > 0, 'traites': fichiers_traites, 'echoues': fichiers_echec, 'carte_organisation': carte_organisation, 'mode_copie': mode_copie}

    def _convertir_format_lot(self, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Convertit le format d'un ensemble de fichiers."""
        fichiers = parametres.get('fichiers', [])
        format_cible = parametres.get('format_cible', 'jpg')
        qualite = parametres.get('qualite', 90) # Pour JPEG
        repertoire_sortie = parametres.get('repertoire_sortie')

        fichiers_valides = self._filtrer_fichiers_supportes(fichiers)
        fichiers_traites, fichiers_echec = [], []
        carte_conversion = {}

        try:
            if repertoire_sortie:
                Path(repertoire_sortie).mkdir(parents=True, exist_ok=True)
            
            for fichier_path in fichiers_valides:
                try:
                    chemin_source = Path(fichier_path)
                    chemin_cible = Path(repertoire_sortie or chemin_source.parent) / f"{chemin_source.stem}.{format_cible}"
                    
                    commande = [self.chemin_exiftool, '-o', str(chemin_cible)]
                    if format_cible.lower() == 'jpg':
                        commande.append(f'-jpegquality={qualite}')
                    commande.append(str(chemin_source))
                    
                    resultat = subprocess.run(commande, capture_output=True, text=True, timeout=60)
                    if resultat.returncode == 0 and chemin_cible.exists():
                        fichiers_traites.append(fichier_path)
                        carte_conversion[fichier_path] = str(chemin_cible)
                    else:
                        fichiers_echec.append(fichier_path)
                        self.journal.error(f"Échec de la conversion de format pour {fichier_path}: {resultat.stderr}")
                except Exception as e:
                    fichiers_echec.append(fichier_path)
                    self.journal.error(f"Erreur lors de la conversion de {fichier_path}: {e}")
            return {'succes': len(fichiers_traites) > 0, 'traites': fichiers_traites, 'echoues': fichiers_echec, 'carte_conversion': carte_conversion}
        except Exception as e:
            return {'succes': False, 'erreur': str(e), 'traites': fichiers_traites, 'echoues': fichiers_valides}
            
    def _valider_fichiers_lot(self, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Valide l'intégrité et les métadonnées d'un ensemble de fichiers."""
        fichiers = parametres.get('fichiers', [])
        verifier_corruption = parametres.get('verifier_corruption', True)
        verifier_metadonnees = parametres.get('verifier_metadonnees', True)

        fichiers_valides = self._filtrer_fichiers_supportes(fichiers)
        resultats_validation_fichiers = []
        fichiers_corrompus = []
        problemes_metadonnees = []

        for fichier_path in fichiers_valides:
            try:
                problemes = []
                if verifier_corruption:
                    commande_corruption = [self.chemin_exiftool, '-validate', str(fichier_path)]
                    resultat_corruption = subprocess.run(commande_corruption, capture_output=True, text=True, timeout=30)
                    if resultat_corruption.returncode != 0 or 'error' in resultat_corruption.stderr.lower():
                        fichiers_corrompus.append(fichier_path)
                        problemes.append('Fichier corrompu ou invalide')
                
                if verifier_metadonnees:
                    commande_meta = [self.chemin_exiftool, '-json', '-validate', str(fichier_path)]
                    resultat_meta = subprocess.run(commande_meta, capture_output=True, text=True, timeout=30)
                    if resultat_meta.returncode == 0:
                        try:
                            meta = json.loads(resultat_meta.stdout)[0]
                            champs_requis = ['FileSize', 'FileType', 'MIMEType']
                            champs_manquants = [f for f in champs_requis if f not in meta]
                            if champs_manquants:
                                problèmes_metadonnees.append({'fichier': fichier_path, 'champs_manquants': champs_manquants})
                                problèmes.append(f"Métadonnées manquantes: {', '.join(champs_manquants)}")
                        except json.JSONDecodeError:
                            problemes_metadonnees.append({'fichier': fichier_path, 'erreur': 'Échec de l\'analyse des métadonnées JSON'})
                            problemes.append('Échec de l\'analyse des métadonnées')
                    else:
                        problemes_metadonnees.append({'fichier': fichier_path, 'erreur': resultat_meta.stderr})
                        problemes.append(f'Échec de la validation des métadonnées: {resultat_meta.stderr}')
                
                if not problemes:
                    resultats_validation_fichiers.append(fichier_path)
            except Exception as e:
                fichiers_corrompus.append(fichier_path)
                self.journal.error(f"Erreur lors de la validation de {fichier_path}: {e}")
        
        return {'succes': True, 'valides': resultats_validation_fichiers, 'corrompus': fichiers_corrompus, 'problemes_metadonnees': problemes_metadonnees}

    def _extraire_vignettes_lot(self, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Extrait les vignettes (miniatures) d'un ensemble de fichiers."""
        fichiers = parametres.get('fichiers', [])
        repertoire_sortie = parametres.get('repertoire_sortie')
        taille_vignette = parametres.get('taille_vignette', 'ThumbnailImage') # ExifTool tag for thumbnail

        if not repertoire_sortie:
            raise ValueError("Répertoire de sortie non spécifié.")
        
        chemin_sortie = Path(repertoire_sortie)
        chemin_sortie.mkdir(parents=True, exist_ok=True)

        fichiers_valides = self._filtrer_fichiers_supportes(fichiers)
        fichiers_traites, fichiers_echec = [], []
        carte_vignettes = {}

        for fichier_path in fichiers_valides:
            try:
                chemin_source = Path(fichier_path)
                chemin_vignette = chemin_sortie / f"{chemin_source.stem}_vignette.jpg"
                
                commande = [self.chemin_exiftool, f'-{taille_vignette}', '-b', str(chemin_source)]
                resultat = subprocess.run(commande, capture_output=True, timeout=30)
                
                if resultat.returncode == 0 and resultat.stdout:
                    with open(chemin_vignette, 'wb') as f:
                        f.write(resultat.stdout)
                    fichiers_traites.append(fichier_path)
                    carte_vignettes[fichier_path] = str(chemin_vignette)
                else:
                    fichiers_echec.append(fichier_path)
                    self.journal.error(f"Échec de l'extraction de vignette pour {fichier_path}: {resultat.stderr.decode('utf-8') if resultat.stderr else 'Pas de sortie'}")
            except Exception as e:
                fichiers_echec.append(fichier_path)
                self.journal.error(f"Erreur lors de l'extraction de vignette de {fichier_path}: {e}")
        
        return {'succes': len(fichiers_traites) > 0, 'traites': fichiers_traites, 'echoues': fichiers_echec, 'carte_vignettes': carte_vignettes}

    def _generer_apports_lot(self, parametres: Dict[str, Any]) -> Dict[str, Any]:
        """Génère des rapports sur les métadonnées de fichiers."""
        fichiers = parametres.get('fichiers', [])
        type_rapport = parametres.get('type_rapport', 'summary')
        fichier_sortie = parametres.get('fichier_sortie')
        inclure_vignettes = parametres.get('inclure_vignettes', False)

        fichiers_valides = self._filtrer_fichiers_supportes(fichiers)

        try:
            commande = [self.chemin_exiftool]
            if type_rapport == 'html':
                commande.extend(['-htmldump', '-w', 'html'])
            elif type_rapport == 'csv':
                commande.append('-csv')
            elif type_rapport == 'xml':
                commande.append('-X')
            else: # default to json
                commande.append('-json')
            
            if inclure_vignettes:
                commande.extend(['-b', '-ThumbnailImage'])
            
            commande.extend(fichiers_valides)
            
            resultat = subprocess.run(commande, capture_output=True, text=True, timeout=600)
            if resultat.returncode == 0:
                contenu_rapport = resultat.stdout
                if fichier_sortie:
                    with open(fichier_sortie, 'w', encoding='utf-8') as f:
                        f.write(contenu_rapport)
                return {'succes': True, 'contenu_rapport': contenu_rapport, 'fichier_sortie': fichier_sortie, 'traites': fichiers_valides, 'type_rapport': type_rapport}
            else:
                return {'succes': False, 'erreur': resultat.stderr, 'traites': [], 'echoues': fichiers_valides}
        except Exception as e:
            return {'succes': False, 'erreur': str(e), 'traites': [], 'echoues': fichiers_valides}

    def _filtrer_fichiers_supportes(self, fichiers: List[str]) -> List[str]:
        """Filtre les fichiers pour ne conserver que ceux dont le format est supporté."""
        fichiers_valides = [
            str(Path(fp)) for fp in fichiers
            if Path(fp).exists() and Path(fp).is_file() and Path(fp).suffix.lower() in self.formats_supportes
        ]
        return fichiers_valides

    def _segmenter_fichiers(self, fichiers: List[str], taille_segment: int) -> List[List[str]]:
        """Divise une liste de fichiers en segments de taille fixe."""
        return [fichiers[i:i + taille_segment] for i in range(0, len(fichiers), taille_segment)]

    def __del__(self):
        """Nettoie les fichiers temporaires lors de la destruction de l'objet."""
        try:
            if hasattr(self, 'repertoire_temporaire') and os.path.exists(self.repertoire_temporaire):
                shutil.rmtree(self.repertoire_temporaire)
        except Exception as e:
            if hasattr(self, 'journal'):
                self.journal.error(f"Échec du nettoyage des fichiers temporaires: {e}")

Fonctionnalités Avancées de Traitement par Lot

Planificateur de Tâches Intelligent

Au-delà de l'exécution immédiate, les systèmes d'entreprise nécessitent souvent une planification flexible. Un planificateur intelligent permet de déclencher des tâches à des moments précis, de les répéter à intervalles réguliers ou de les soumettre uniquement lorsque certaines conditions sont remplies.


import heapq
import time
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import threading
import json

# Types de planification pour les tâches
class TypePlanification(Enum):
    IMMEDIATE = "immediate"
    DIFFERE = "differe"
    RECURRENT = "recurrent"
    CONDITIONNEL = "conditionnel"

# Représentation d'une tâche planifiée
@dataclass
class TachePlanifiee:
    config_tache: ConfigurationTache
    type_planification: TypePlanification
    heure_planifiee: datetime
    intervalle_recurrent: Optional[timedelta] = None
    fonction_condition: Optional[Callable] = None
    max_tentatives_planification: int = 3
    intervalle_reessai_planification: timedelta = field(default_factory=lambda: timedelta(minutes=5))
    boost_priorite: int = 0  # Augmentation de la priorité en cas de réessai

class PlanificateurIntelligentTaches:
    """Gère l'ordonnancement de tâches basées sur le temps ou des conditions."""
    def __init__(self, gestionnaire_lots: GestionnaireLots):
        self.gestionnaire_lots = gestionnaire_lots
        self.file_attente_planifiee = []  # File prioritaire pour les tâches temporelles
        self.taches_recurrentes = {}      # Tâches à répéter
        self.taches_conditionnelles = {}  # Tâches avec des conditions
        
        self.thread_planificateur = None
        self.en_cours = False
        self.verrou_acces = threading.Lock()
        
        self.statistiques_planificateur = {
            'total_planifiees': 0,
            'planifiees_terminees': 0,
            'planifiees_echec': 0,
            'delai_moyen_sec': 0,
            'delai_max_sec': 0
        }
    
    def planifier_tache(self, 
                       config_tache: ConfigurationTache,
                       type_planification: TypePlanification = TypePlanification.IMMEDIATE,
                       heure_planifiee: Optional[datetime] = None,
                       intervalle_recurrent: Optional[timedelta] = None,
                       fonction_condition: Optional[Callable] = None) -> str:
        """Planifie une tâche pour exécution future."""
        if heure_planifiee is None:
            heure_planifiee = datetime.now()
        
        tache_planifiee = TachePlanifiee(
            config_tache=config_tache,
            type_planification=type_planification,
            heure_planifiee=heure_planifiee,
            intervalle_recurrent=intervalle_recurrent,
            fonction_condition=fonction_condition
        )
        
        with self.verrou_acces:
            if type_planification == TypePlanification.RECURRENT:
                self.taches_recurrentes[config_tache.identifiant] = tache_planifiee
            elif type_planification == TypePlanification.CONDITIONNEL:
                self.taches_conditionnelles[config_tache.identifiant] = tache_planifiee
            
            priorite_file = (heure_planifiee.timestamp(), config_tache.priorite)
            heapq.heappush(self.file_attente_planifiee, (priorite_file, tache_planifiee))
            self.statistiques_planificateur['total_planifiees'] += 1
        
        return config_tache.identifiant
    
    def demarrer_planificateur(self):
        """Démarre le thread principal du planificateur."""
        if self.en_cours:
            return
        self.en_cours = True
        self.thread_planificateur = threading.Thread(
            target=self._boucle_planification, daemon=True
        )
        self.thread_planificateur.start()
        self.gestionnaire_lots.journal.info("Planificateur de tâches démarré.")
    
    def arreter_planificateur(self):
        """Arrête le thread du planificateur."""
        self.en_cours = False
        if self.thread_planificateur:
            self.thread_planificateur.join(timeout=5)
        self.gestionnaire_lots.journal.info("Planificateur de tâches arrêté.")
    
    def _boucle_planification(self):
        """Boucle principale du planificateur, vérifiant et soumettant les tâches."""
        while self.en_cours:
            try:
                heure_actuelle = datetime.now()
                self._traiter_taches_echeance(heure_actuelle)
                self._verifier_taches_conditionnelles(heure_actuelle)
                self._traiter_taches_recurrentes(heure_actuelle)
                time.sleep(1)  # Vérifier chaque seconde
            except Exception as e:
                self.gestionnaire_lots.journal.error(f"Erreur dans la boucle du planificateur: {e}")
                time.sleep(5) # Attendre un peu en cas d'erreur grave
    
    def _traiter_taches_echeance(self, heure_actuelle: datetime):
        """Extrait et soumet les tâches dont l'heure est venue."""
        taches_a_soumettre = []
        with self.verrou_acces:
            while (self.file_attente_planifiee and 
                   self.file_attente_planifiee[0][0][0] <= heure_actuelle.timestamp()):
                _, tache_planifiee = heapq.heappop(self.file_attente_planifiee)
                taches_a_soumettre.append(tache_planifiee)
            
            for tache_planifiee in taches_a_soumettre:
                try:
                    delai = (heure_actuelle - tache_planifiee.heure_planifiee).total_seconds()
                    self._mettre_a_jour_statistiques_delai(delai)
                    
                    succes = self.gestionnaire_lots.ajouter_tache(tache_planifiee.config_tache)
                    
                    if succes:
                        self.statistiques_planificateur['planifiees_terminees'] += 1
                        self.gestionnaire_lots.journal.info(f"Tâche planifiée soumise: {tache_planifiee.config_tache.identifiant}")
                    else:
                        self.statistiques_planificateur['planifiees_echec'] += 1
                        # Logique de réessai pour la planification
                        if tache_planifiee.max_tentatives_planification > 0:
                            tache_planifiee.max_tentatives_planification -= 1
                            tache_planifiee.heure_planifiee = (heure_actuelle + tache_planifiee.intervalle_reessai_planification)
                            tache_planifiee.config_tache.priorite += tache_planifiee.boost_priorite
                            
                            priorite_nouvelle = (tache_planifiee.heure_planifiee.timestamp(), tache_planifiee.config_tache.priorite)
                            heapq.heappush(self.file_attente_planifiee, (priorite_nouvelle, tache_planifiee))
                            self.gestionnaire_lots.journal.warning(f"Réessai de planification pour {tache_planifiee.config_tache.identifiant}. Tentatives restantes: {tache_planifiee.max_tentatives_planification}")
                except Exception as e:
                    self.gestionnaire_lots.journal.error(f"Échec de traitement d'une tâche planifiée: {e}")
                    self.statistiques_planificateur['planifiees_echec'] += 1
    
    def _verifier_taches_conditionnelles(self, heure_actuelle: datetime):
        """Vérifie les conditions des tâches et les soumet si elles sont remplies."""
        taches_terminees = []
        for identifiant_tache, tache_planifiee in list(self.taches_conditionnelles.items()): # Utiliser list() pour itérer sur une copie
            try:
                if tache_planifiee.fonction_condition and tache_planifiee.fonction_condition():
                    tache_planifiee.heure_planifiee = heure_actuelle # Marquer comme due maintenant
                    priorite = (heure_actuelle.timestamp(), tache_planifiee.config_tache.priorite)
                    heapq.heappush(self.file_attente_planifiee, (priorite, tache_planifiee))
                    taches_terminees.append(identifiant_tache)
                    self.gestionnaire_lots.journal.info(f"Condition remplie pour la tâche: {identifiant_tache}. Soumise au gestionnaire.")
            except Exception as e:
                self.gestionnaire_lots.journal.error(f"Erreur lors de la vérification de la condition pour {identifiant_tache}: {e}")
        
        for identifiant_tache in taches_terminees:
            del self.taches_conditionnelles[identifiant_tache]
    
    def _traiter_taches_recurrentes(self, heure_actuelle: datetime):
        """Gère la soumission des tâches récurrentes."""
        for identifiant_tache, tache_planifiee in self.taches_recurrentes.items():
            try:
                if (heure_actuelle >= tache_planifiee.heure_planifiee and
                    tache_planifiee.intervalle_recurrent):
                    
                    # Créer une nouvelle instance de tâche pour cette occurrence
                    nouvelle_config_tache = ConfigurationTache(
                        identifiant=f"{identifiant_tache}_{int(heure_actuelle.timestamp())}",
                        operation=tache_planifiee.config_tache.operation,
                        parametres=tache_planifiee.config_tache.parametres.copy(),
                        priorite=tache_planifiee.config_tache.priorite
                    )
                    
                    priorite = (heure_actuelle.timestamp(), nouvelle_config_tache.priorite)
                    nouvelle_tache_planifiee = TachePlanifiee(
                        config_tache=nouvelle_config_tache,
                        type_planification=TypePlanification.IMMEDIATE,
                        heure_planifiee=heure_actuelle
                    )
                    
                    heapq.heappush(self.file_attente_planifiee, (priorite, nouvelle_tache_planifiee))
                    
                    # Mettre à jour l'heure de la prochaine exécution récurrente
                    tache_planifiee.heure_planifiee += tache_planifiee.intervalle_recurrent
                    self.gestionnaire_lots.journal.info(f"Tâche récurrente {identifiant_tache} soumise. Prochaine exécution à {tache_planifiee.heure_planifiee}.")
            except Exception as e:
                self.gestionnaire_lots.journal.error(f"Erreur lors du traitement de la tâche récurrente {identifiant_tache}: {e}")
    
    def _mettre_a_jour_statistiques_delai(self, delai: float):
        """Met à jour les statistiques de délai d'exécution."""
        if delai > self.statistiques_planificateur['delai_max_sec']:
            self.statistiques_planificateur['delai_max_sec'] = delai
        
        # Calcul de la moyenne mobile simple ou pondérée
        total_terminees = self.statistiques_planificateur['planifiees_terminees']
        if total_terminees > 0:
            moyenne_actuelle = self.statistiques_planificateur['delai_moyen_sec']
            self.statistiques_planificateur['delai_moyen_sec'] = (
                (moyenne_actuelle * (total_terminees - 1) + delai) / total_terminees
            )
    
    def obtenir_statut_planificateur(self) -> Dict[str, Any]:
        """Retourne l'état actuel du planificateur."""
        with self.verrou_acces:
            return {
                'en_cours': self.en_cours,
                'taches_en_attente_file': len(self.file_attente_planifiee),
                'taches_recurrentes_actives': len(self.taches_recurrentes),
                'taches_conditionnelles_actives': len(self.taches_conditionnelles),
                'statistiques': self.statistiques_planificateur.copy()
            }
    
    def annuler_tache_planifiee(self, identifiant_tache: str) -> bool:
        """Annule une tâche précédemment planifiée."""
        with self.verrou_acces:
            if identifiant_tache in self.taches_recurrentes:
                del self.taches_recurrentes[identifiant_tache]
                self.gestionnaire_lots.journal.info(f"Tâche récurrente {identifiant_tache} annulée.")
                return True
            
            if identifiant_tache in self.taches_conditionnelles:
                del self.taches_conditionnelles[identifiant_tache]
                self.gestionnaire_lots.journal.info(f"Tâche conditionnelle {identifiant_tache} annulée.")
                return True
            
            # Annuler une tâche dans la file prioritaire (nécessite une reconstruction)
            nouvelle_file = []
            tache_trouvee = False
            while self.file_attente_planifiee:
                priorite, tache_planifiee = heapq.heappop(self.file_attente_planifiee)
                if tache_planifiee.config_tache.identifiant != identifiant_tache:
                    nouvelle_file.append((priorite, tache_planifiee))
                else:
                    tache_trouvee = True
                    self.gestionnaire_lots.journal.info(f"Tâche planifiée {identifiant_tache} annulée de la file d'attente.")
            
            self.file_attente_planifiee = nouvelle_file
            heapq.heapify(self.file_attente_planifiee) # Reconstruire le tas
            
            return tache_trouvee

Prise en Charge du Traitement Distribué

Pour des volumes de données encore plus importants ou pour une meilleure résilience, le traitement distribué permet de répartir la charge sur plusieurs machines (nœuds de travail). Une architecture basée sur une file de messages (comme Redis) est souvent employée pour découpler les soumissions de tâches de leur exécution et pour agréger les résultats.


import redis
import pickle
import uuid
from typing import Dict, List, Any, Optional
import json
import time
from datetime import datetime

class GestionnaireLotsDistribue:
    """Implémente un système de traitement par lot distribué via Redis."""
    def __init__(self, 
                 hote_redis: str = 'localhost',
                 port_redis: int = 6379,
                 db_redis: int = 0,
                 identifiant_ouvrier: Optional[str] = None,
                 nom_file_taches: str = 'file_exiftool_taches',
                 nom_file_resultats: str = 'file_exiftool_resultats'):
        
        self.client_redis = redis.Redis(
            host=hote_redis, 
            port=port_redis, 
            db=db_redis,
            decode_responses=False # Conserver les données binaires pour pickle
        )
        
        self.identifiant_ouvrier = identifiant_ouvrier or f"ouvrier_{uuid.uuid4().hex[:8]}"
        self.nom_file_taches = nom_file_taches
        self.nom_file_resultats = nom_file_resultats
        
        # Le processeur local pour exécuter les tâches sur cet ouvrier
        self.processeur_local = ProcesseurExifToolEnLot() 
        
        self.ouvrier_en_cours = False
        self.thread_ouvrier = None
        
        self.statistiques_distribuees = {
            'taches_soumises': 0,
            'taches_terminees': 0,
            'taches_echec': 0,
            'temps_activite_ouvrier_sec': 0
        }
        self.journal = self.processeur_local.journal # Réutiliser le journal du processeur local

    def soumettre_tache_distribuee(self, config_tache: ConfigurationTache) -> str:
        """Soumet une tâche au système distribué via Redis."""
        try:
            donnees_tache = {
                'id_tache': config_tache.identifiant,
                'config': pickle.dumps(config_tache),
                'heure_soumission': datetime.now().isoformat(),
                'soumis_par': self.identifiant_ouvrier
            }
            self.client_redis.lpush(self.nom_file_taches, pickle.dumps(donnees_tache))
            self.statistiques_distribuees['taches_soumises'] += 1
            self.journal.info(f"Tâche distribuée soumise: {config_tache.identifiant}")
            return config_tache.identifiant
        except Exception as e:
            raise Exception(f"Échec de la soumission de tâche distribuée: {e}")
    
    def demarrer_ouvrier(self):
        """Démarre le processus de l'ouvrier pour traiter les tâches."""
        if self.ouvrier_en_cours:
            return
        self.ouvrier_en_cours = True
        self.thread_ouvrier = threading.Thread(
            target=self._boucle_ouvrier, daemon=True
        )
        self.thread_ouvrier.start()
        self._enregistrer_ouvrier()
        self.journal.info(f"Ouvrier distribué {self.identifiant_ouvrier} démarré.")
    
    def arreter_ouvrier(self):
        """Arrête le processus de l'ouvrier."""
        self.ouvrier_en_cours = False
        if self.thread_ouvrier:
            self.thread_ouvrier.join(timeout=10)
        self._desenregistrer_ouvrier()
        self.journal.info(f"Ouvrier distribué {self.identifiant_ouvrier} arrêté.")

    def _boucle_ouvrier(self):
        """Boucle principale de l'ouvrier, récupérant et exécutant les tâches."""
        heure_demarrage = time.time()
        while self.ouvrier_en_cours:
            try:
                # Récupérer une tâche de la file d'attente (bloquant avec timeout)
                donnees_recues = self.client_redis.brpop(self.nom_file_taches, timeout=5)
                
                if donnees_recues:
                    _, donnees_serialisees = donnees_recues
                    info_tache = pickle.loads(donnees_serialisees)
                    config_tache = pickle.loads(info_tache['config'])
                    
                    self.journal.info(f"Ouvrier {self.identifiant_ouvrier} exécute la tâche: {config_tache.identifiant}")
                    resultat = self._executer_tache_distribuee(config_tache)
                    
                    self._publier_resultat(info_tache['id_tache'], resultat)
                    
                    if resultat.statut == StatutTache.TERMINE:
                        self.statistiques_distribuees['taches_terminees'] += 1
                    else:
                        self.statistiques_distribuees['taches_echec'] += 1
                
                self.statistiques_distribuees['temps_activite_ouvrier_sec'] = time.time() - heure_demarrage
                self._mettre_a_jour_statut_ouvrier()
            except Exception as e:
                self.journal.error(f"Erreur dans la boucle de l'ouvrier {self.identifiant_ouvrier}: {e}")
                time.sleep(1)
    
    def _executer_tache_distribuee(self, config_tache: ConfigurationTache) -> ResultatTraitement:
        """Exécute une tâche individuelle en utilisant le processeur local."""
        try:
            return self.processeur_local._executer_tache(config_tache) # Utilise la méthode protégée pour bypasser la file locale
        except Exception as e:
            return ResultatTraitement(
                identifiant=config_tache.identifiant,
                statut=StatutTache.ECHEC,
                heure_debut=datetime.now(),
                heure_fin=datetime.now(),
                message_erreur=str(e)
            )

    def _publier_resultat(self, identifiant_tache: str, resultat: ResultatTraitement):
        """Publie le résultat d'une tâche dans la file de résultats et le stocke en cache."""
        try:
            donnees_resultat = {
                'id_tache': identifiant_tache,
                'id_ouvrier': self.identifiant_ouvrier,
                'resultat': pickle.dumps(resultat),
                'heure_completion': datetime.now().isoformat()
            }
            self.client_redis.lpush(self.nom_file_resultats, pickle.dumps(donnees_resultat))
            
            # Stocker le résultat en cache avec une expiration
            cle_resultat = f"resultat:{identifiant_tache}"
            self.client_redis.setex(cle_resultat, 86400, pickle.dumps(resultat)) # 24 heures
            self.journal.info(f"Résultat pour la tâche {identifiant_tache} publié et mis en cache.")
        except Exception as e:
            self.journal.error(f"Échec de la publication du résultat: {e}")
    
    def obtenir_resultat_distribue(self, identifiant_tache: str, delai_attente: int = 60) -> Optional[ResultatTraitement]:
        """Tente de récupérer le résultat d'une tâche distribuée, potentiellement en attendant."""
        try:
            cle_resultat = f"resultat:{identifiant_tache}"
            resultat_cache = self.client_redis.get(cle_resultat)
            if resultat_cache:
                return pickle.loads(resultat_cache)
            
            # Si pas en cache, attendre dans la file de résultats
            heure_depart = time.time()
            while time.time() - heure_depart < delai_attente:
                donnees_recues = self.client_redis.brpop(self.nom_file_resultats, timeout=1)
                if donnees_recues:
                    _, resultat_serialise = donnees_recues
                    info_resultat = pickle.loads(resultat_serialise)
                    if info_resultat['id_tache'] == identifiant_tache:
                        self.client_redis.setex(cle_resultat, 86400, info_resultat['resultat']) # Cache le résultat pour les futures requêtes
                        return pickle.loads(info_resultat['resultat'])
                    else:
                        # Ce n'est pas notre résultat, le remettre dans la file
                        self.client_redis.lpush(self.nom_file_resultats, resultat_serialise)
            self.journal.warning(f"Délai dépassé pour obtenir le résultat de la tâche {identifiant_tache}.")
            return None
        except Exception as e:
            self.journal.error(f"Échec de l'obtention du résultat distribué: {e}")
            return None

    def _enregistrer_ouvrier(self):
        """Enregistre l'ouvrier dans le registre Redis."""
        try:
            info_ouvrier = {
                'id_ouvrier': self.identifiant_ouvrier,
                'heure_demarrage': datetime.now().isoformat(),
                'statut': 'actif',
                'capacites': ['exiftool_traitement_lot'],
                'statistiques': self.statistiques_distribuees
            }
            cle_ouvrier = f"ouvrier:{self.identifiant_ouvrier}"
            self.client_redis.setex(cle_ouvrier, 300, json.dumps(info_ouvrier)) # 5 minutes d'expiration
            self.client_redis.sadd('ouvriers_actifs', self.identifiant_ouvrier)
            self.journal.info(f"Ouvrier {self.identifiant_ouvrier} enregistré.")
        except Exception as e:
            self.journal.error(f"Échec de l'enregistrement de l'ouvrier: {e}")

    def _desenregistrer_ouvrier(self):
        """Désenregistre l'ouvrier de Redis."""
        try:
            cle_ouvrier = f"ouvrier:{self.identifiant_ouvrier}"
            self.client_redis.delete(cle_ouvrier)
            self.client_redis.srem('ouvriers_actifs', self.identifiant_ouvrier)
            self.journal.info(f"Ouvrier {self.identifiant_ouvrier} désenregistré.")
        except Exception as e:
            self.journal.error(f"Échec du désenregistrement de l'ouvrier: {e}")

    def _mettre_a_jour_statut_ouvrier(self):
        """Met à jour le statut de l'ouvrier dans Redis pour indiquer qu'il est actif."""
        try:
            info_ouvrier = {
                'id_ouvrier': self.identifiant_ouvrier,
                'derniere_mise_a_jour': datetime.now().isoformat(),
                'statut': 'actif',
                'statistiques': self.statistiques_distribuees
            }
            cle_ouvrier = f"ouvrier:{self.identifiant_ouvrier}"
            self.client_redis.setex(cle_ouvrier, 300, json.dumps(info_ouvrier)) # Rafraîchir l'expiration
        except Exception as e:
            self.journal.error(f"Échec de la mise à jour du statut de l'ouvrier: {e}")

    def obtenir_etat_cluster(self) -> Dict[str, Any]:
        """Récupère l'état global du cluster de traitement distribué."""
        try:
            ouvriers_actifs_ids = self.client_redis.smembers('ouvriers_actifs')
            details_ouvriers = []
            for ouvrier_id_bytes in ouvriers_actifs_ids:
                ouvrier_id = ouvrier_id_bytes.decode()
                cle_ouvrier = f"ouvrier:{ouvrier_id}"
                info_ouvrier = self.client_redis.get(cle_ouvrier)
                if info_ouvrier:
                    details_ouvriers.append(json.loads(info_ouvrier))
            
            longueur_file_taches = self.client_redis.llen(self.nom_file_taches)
            longueur_file_resultats = self.client_redis.llen(self.nom_file_resultats)
            
            return {
                'nombre_ouvriers_actifs': len(ouvriers_actifs_ids),
                'details_ouvriers': details_ouvriers,
                'longueur_file_taches': longueur_file_taches,
                'longueur_file_resultats': longueur_file_resultats,
                'statistiques_cluster': self._calculer_statistiques_cluster(details_ouvriers)
            }
        except Exception as e:
            self.journal.error(f"Échec de l'obtention de l'état du cluster: {e}")
            return {'erreur': str(e), 'nombre_ouvriers_actifs': 0, 'details_ouvriers': []}

    def _calculer_statistiques_cluster(self, details_ouvriers: List[Dict]) -> Dict[str, Any]:
        """Calcule les statistiques agrégées du cluster."""
        total_soumises = sum(o.get('statistiques', {}).get('taches_soumises', 0) for o in details_ouvriers)
        total_terminees = sum(o.get('statistiques', {}).get('taches_terminees', 0) for o in details_ouvriers)
        total_echec = sum(o.get('statistiques', {}).get('taches_echec', 0) for o in details_ouvriers)
        
        return {
            'total_taches_soumises': total_soumises,
            'total_taches_terminees': total_terminees,
            'total_taches_echec': total_echec,
            'taux_succes': (total_terminees / max(total_soumises, 1)) * 100,
            'taux_echec': (total_echec / max(total_soumises, 1)) * 100
        }

Cas Pratiques : Systèmes Intégrés

Système de Traitement de Photos d'Entreprise

L'intégration de ces composants peut aboutir à des solutions puissantes, comme un système de gestion de photos qui automatise l'ingestion, le classement, la sauvegarde, la validation et la génération de vignettes pour des milliers d'images.


import os
import sqlite3
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import json
import hashlib
import shutil

class SystemeTraitementPhotosEntreprise:
    """Un système complet pour la gestion et le traitement automatisé de photos."""
    
    def __init__(self, 
                 chemin_base_donnees: str = 'base_photos.db',
                 racine_stockage: str = './stockage_photos',
                 racine_sauvegarde: str = './sauvegarde_photos'):
        
        self.chemin_base_donnees = chemin_base_donnees
        self.racine_stockage = Path(racine_stockage)
        self.racine_sauvegarde = Path(racine_sauvegarde)
        
        self.racine_stockage.mkdir(parents=True, exist_ok=True)
        self.racine_sauvegarde.mkdir(parents=True, exist_ok=True)
        
        self._initialiser_base_donnees()
        
        self.gestionnaire_lots = ProcesseurExifToolEnLot() # Utilise le processeur ExifTool
        self.planificateur = PlanificateurIntelligentTaches(self.gestionnaire_lots)
        
        self.regles_traitement = {
            'sauvegarde_auto': True,
            'controle_qualite': True,
            'validation_metadonnees': True,
            'detection_doublons': True,
            'categorisation_auto': True,
            'generation_vignettes': True
        }
    
    def _initialiser_base_donnees(self):
        """Crée les tables nécessaires dans la base de données SQLite."""
        conn = sqlite3.connect(self.chemin_base_donnees)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS photos (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                chemin_fichier TEXT UNIQUE NOT NULL,
                hachage_fichier TEXT NOT NULL,
                taille_fichier INTEGER,
                date_creation TIMESTAMP,
                date_modification TIMESTAMP,
                date_traitement TIMESTAMP,
                statut TEXT DEFAULT 'en_attente',
                categorie TEXT,
                score_qualite REAL,
                json_metadonnees TEXT,
                chemin_vignette TEXT,
                chemin_sauvegarde TEXT,
                mots_cles TEXT
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS taches_traitement (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                id_tache TEXT UNIQUE NOT NULL,
                type_tache TEXT NOT NULL,
                chemins_fichiers TEXT,
                parametres_json TEXT,
                statut TEXT DEFAULT 'en_attente',
                date_creation TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                date_debut TIMESTAMP,
                date_completion TIMESTAMP,
                resultat_json TEXT,
                message_erreur TEXT
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS statistiques_traitement (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                date_stat TEXT NOT NULL UNIQUE,
                total_traite INTEGER DEFAULT 0,
                total_echec INTEGER DEFAULT 0,
                taille_totale_mb REAL DEFAULT 0,
                temps_moyen_traitement_sec REAL DEFAULT 0,
                distribution_qualite_json TEXT
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def ingestion_photos(self, chemins_sources: List[str]) -> Dict[str, Any]:
        """Importe des photos dans le système, gère les doublons et planifie les tâches de traitement."""
        resultats_ingestion = {
            'total_fichiers_fournis': len(chemins_sources),
            'fichiers_importes': [],
            'fichiers_ignores': [],
            'fichiers_echec_import': [],
            'fichiers_doublons': []
        }
        
        conn = sqlite3.connect(self.chemin_base_donnees)
        cursor = conn.cursor()
        
        try:
            for chemin_source_str in chemins_sources:
                try:
                    chemin_source = Path(chemin_source_str)
                    if not chemin_source.exists() or not chemin_source.is_file():
                        resultats_ingestion['fichiers_echec_import'].append({'chemin': chemin_source_str, 'erreur': 'Fichier inexistant ou non valide'})
                        continue
                    
                    hachage_fichier = self._calculer_hachage_fichier(chemin_source)
                    
                    cursor.execute('SELECT chemin_fichier FROM photos WHERE hachage_fichier = ?', (hachage_fichier,))
                    existant = cursor.fetchone()
                    
                    if existant:
                        resultats_ingestion['fichiers_doublons'].append({'chemin_nouveau': chemin_source_str, 'chemin_existant': existant[0]})
                        continue
                    
                    chemin_stockage = self._generer_chemin_stockage(chemin_source)
                    
                    chemin_stockage.parent.mkdir(parents=True, exist_ok=True)
                    shutil.copy2(chemin_source, chemin_stockage)
                    
                    statistiques_fichier = chemin_source.stat()
                    cursor.execute('''
                        INSERT INTO photos (chemin_fichier, hachage_fichier, taille_fichier, date_creation, date_modification) 
                        VALUES (?, ?, ?, ?, ?)
                    ''', (
                        str(chemin_stockage),
                        hachage_fichier,
                        statistiques_fichier.st_size,
                        datetime.fromtimestamp(statistiques_fichier.st_ctime),
                        datetime.fromtimestamp(statistiques_fichier.st_mtime)
                    ))
                    
                    resultats_ingestion['fichiers_importes'].append(str(chemin_stockage))
                    self._creer_taches_traitement_pour_fichier(str(chemin_stockage))
                    
                except Exception as e:
                    resultats_ingestion['fichiers_echec_import'].append({'chemin': chemin_source_str, 'erreur': str(e)})
            
            conn.commit()
            
        finally:
            conn.close()
        
        return resultats_ingestion
    
    def _calculer_hachage_fichier(self, chemin_fichier: Path) -> str:
        """Calcule le hachage MD5 d'un fichier."""
        hachage_md5 = hashlib.md5()
        with open(chemin_fichier, "rb") as f:
            for morceau in iter(lambda: f.read(4096), b""):
                hachage_md5.update(morceau)
        return hachage_md5.hexdigest()
    
    def _generer_chemin_stockage(self, fichier_source: Path) -> Path:
        """Génère un chemin de stockage unique basé sur la date et le nom du fichier."""
        maintenant = datetime.now()
        annee_mois = maintenant.strftime('%Y/%m')
        
        horodatage = maintenant.strftime('%Y%m%d_%H%M%S_%f') # Inclure microsecondes pour unicité
        nom_unique = f"{horodatage}_{fichier_source.name}"
        
        return self.racine_stockage / annee_mois / nom_unique
    
    def _creer_taches_traitement_pour_fichier(self, chemin_fichier: str):
        """Crée et planifie les tâches de traitement pour un nouveau fichier."""
        taches_a_planifier = []
        
        if self.regles_traitement['controle_qualite']:
            taches_a_planifier.append({
                'type': 'valider_fichiers',
                'params': {'verifier_corruption': True, 'verifier_metadonnees': True}
            })
        if self.regles_traitement['metadata_validation']:
            taches_a_planifier.append({
                'type': 'lire_metadonnees',
                'params': {'balises': ['all']}
            })
        if self.regles_traitement['generation_vignettes']:
            taches_a_planifier.append({
                'type': 'extraire_vignettes',
                'params': {'repertoire_sortie': str(self.racine_stockage / 'vignettes'), 'taille_vignette': 'ThumbnailImage'}
            })
        if self.regles_traitement['sauvegarde_auto']:
            taches_a_planifier.append({
                'type': 'sauvegarder_fichier', # Ceci serait une opération personnalisée ou simulée
                'params': {'chemin_cible_sauvegarde': str(self.racine_sauvegarde)}
            })
        
        for info_tache in taches_a_planifier:
            config_tache = ConfigurationTache(
                identifiant=f"{info_tache['type']}_{Path(chemin_fichier).name}_{int(datetime.now().timestamp())}",
                operation=info_tache['type'],
                parametres={
                    'fichiers': [chemin_fichier],
                    **info_tache['params']
                }
            )
            self.planificateur.planifier_tache(config_tache)
            self.gestionnaire_lots.journal.info(f"Tâche {config_tache.identifiant} créée et planifiée pour {chemin_fichier}.")
    
    def demarrer_systeme(self):
        """Démarre le gestionnaire de lots et le planificateur."""
        self.gestionnaire_lots.start_processing()
        self.planificateur.demarrer_planificateur()
        self._planifier_taches_maintenance()
        self.gestionnaire_lots.journal.info("Système de traitement de photos démarré.")
    
    def arreter_systeme(self):
        """Arrête le gestionnaire de lots et le planificateur."""
        self.planificateur.arreter_planificateur()
        self.gestionnaire_lots.arreter_traitement()
        self.gestionnaire_lots.journal.info("Système de traitement de photos arrêté.")
    
    def _planifier_taches_maintenance(self):
        """Planifie des tâches de maintenance régulières."""
        config_stats_quotidiennes = ConfigurationTache(
            identifiant="rapport_stats_quotidiennes",
            operation="generer_statistiques_quotidiennes",
            parametres={}
        )
        self.planificateur.planifier_tache(
            config_stats_quotidiennes,
            type_planification=TypePlanification.RECURRENT,
            heure_planifiee=datetime.now().replace(hour=23, minute=59, second=0, microsecond=0),
            intervalle_recurrent=timedelta(days=1)
        )
        
        config_nettoyage_sauvegardes = ConfigurationTache(
            identifiant="nettoyage_anciennes_sauvegardes",
            operation="nettoyer_anciennes_sauvegardes",
            parametres={'jours_retention': 30}
        )
        self.planificateur.planifier_tache(
            config_nettoyage_sauvegardes,
            type_planification=TypePlanification.RECURRENT,
            heure_planifiee=datetime.now().replace(hour=2, minute=0, second=0, microsecond=0),
            intervalle_recurrent=timedelta(days=7)
        )

    def obtenir_tableau_bord_traitement(self) -> Dict[str, Any]:
        """Récupère les données pour le tableau de bord de supervision."""
        conn = sqlite3.connect(self.chemin_base_donnees)
        cursor = conn.cursor()
        
        try:
            cursor.execute('''
                SELECT 
                    COUNT(*) AS total_photos,
                    SUM(taille_fichier) AS taille_totale_octets,
                    COUNT(CASE WHEN statut = 'termine' THEN 1 END) AS photos_terminees,
                    COUNT(CASE WHEN statut = 'echec' THEN 1 END) AS photos_echec,
                    COUNT(CASE WHEN statut = 'en_attente' THEN 1 END) AS photos_en_attente
                FROM photos
            ''')
            stats_generales = cursor.fetchone()
            
            cursor.execute('''
                SELECT chemin_fichier, date_traitement, statut, score_qualite
                FROM photos 
                WHERE date_traitement IS NOT NULL
                ORDER BY date_traitement DESC 
                LIMIT 10
            ''')
            fichiers_recents = cursor.fetchall()
            
            cursor.execute('''
                SELECT statut, COUNT(*) 
                FROM taches_traitement 
                GROUP BY statut
            ''')
            stats_taches = dict(cursor.fetchall())
            
            cursor.execute('''
                SELECT 
                    CASE 
                        WHEN score_qualite >= 0.8 THEN 'haute'
                        WHEN score_qualite >= 0.6 THEN 'moyenne'
                        WHEN score_qualite >= 0.4 THEN 'basse'
                        ELSE 'pauvre'
                    END AS niveau_qualite,
                    COUNT(*)
                FROM photos 
                WHERE score_qualite IS NOT NULL
                GROUP BY niveau_qualite
            ''')
            distribution_qualite = dict(cursor.fetchall())
            
            return {
                'total_photos': stats_generales[0] or 0,
                'taille_totale_mb': (stats_generales[1] or 0) / (1024 * 1024),
                'photos_terminees': stats_generales[2] or 0,
                'photos_echec': stats_generales[3] or 0,
                'photos_en_attente': stats_generales[4] or 0,
                'fichiers_recents': fichiers_recents,
                'statistiques_taches': stats_taches,
                'distribution_qualite': distribution_qualite,
                'etat_systeme': {
                    'gestionnaire_lots': self.gestionnaire_lots.obtenir_etat(),
                    'planificateur': self.planificateur.obtenir_statut_planificateur()
                }
            }
        finally:
            conn.close()
    
    def rechercher_photos(self, 
                          requete: str = None,
                          categorie: str = None,
                          intervalle_dates: tuple = None,
                          qualite_min: float = None,
                          mots_cles: List[str] = None) -> List[Dict[str, Any]]:
        """Recherche des photos en fonction de divers critères."""
        conn = sqlite3.connect(self.chemin_base_donnees)
        cursor = conn.cursor()
        
        try:
            sql_query = "SELECT * FROM photos WHERE 1=1"
            params = []
            
            if requete:
                sql_query += " AND chemin_fichier LIKE ?"
                params.append(f"%{requete}%")
            
            if categorie:
                sql_query += " AND categorie = ?"
                params.append(categorie)
            
            if intervalle_dates:
                date_debut, date_fin = intervalle_dates
                sql_query += " AND date_creation BETWEEN ? AND ?"
                params.extend([date_debut, date_fin])
            
            if qualite_min:
                sql_query += " AND score_qualite >= ?"
                params.append(qualite_min)
            
            if mots_cles:
                for mot_cle in mots_cles:
                    sql_query += " AND mots_cles LIKE ?"
                    params.append(f"%{mot_cle}%")
            
            sql_query += " ORDER BY date_creation DESC LIMIT 100"
            
            cursor.execute(sql_query, params)
            colonnes = [desc[0] for desc in cursor.description]
            
            resultats_recherche = []
            for row in cursor.fetchall():
                donnees_photo = dict(zip(colonnes, row))
                if donnees_photo['json_metadonnees']:
                    try:
                        donnees_photo['metadonnees'] = json.loads(donnees_photo['json_metadonnees'])
                    except json.JSONDecodeError:
                        donnees_photo['metadonnees'] = {}
                resultats_recherche.append(donnees_photo)
            
            return resultats_recherche
            
        finally:
            conn.close()

Système de Surveillance Intelligent

Pour assurer la stabilité et les performances continues des traitements par lot, un système de surveillance est indispensable. Il collecte des métriques système et d'application, détecte les anomalies et déclenche des alertes ou des actions correctives automatiques.


import time
import psutil
import threading
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import logging

class MoniteurTraitementLot:
    """Système de surveillance pour les processus de traitement par lot."""
    
    def __init__(self, 
                 gestionnaire_lots: GestionnaireLots,
                 configuration_alertes: Dict[str, Any] = None):
        
        self.gestionnaire_lots = gestionnaire_lots
        self.configuration_alertes = configuration_alertes or {}
        
        self.parametres_surveillance = {
            'intervalle_verification_sec': 30,  # Intervalle entre chaque vérification
            'fenetre_performance_sec': 3600,  # Historique de performance sur 1 heure (3600s)
            'seuils_alerte': {
                'cpu_utilisation_pct': 90,  # Seuil d'utilisation CPU
                'memoire_utilisation_pct': 85,  # Seuil d'utilisation mémoire
                'disque_utilisation_pct': 90,  # Seuil d'utilisation disque
                'taux_erreur_pct': 10,  # Seuil de taux d'erreur des tâches (%)
                'longueur_file_taches': 1000,  # Seuil de longueur de la file d'attente
                'delai_traitement_moyen_sec': 300  # Seuil de temps moyen de traitement
            }
        }
        
        self.en_surveillance = False
        self.thread_surveillance = None
        
        self.historique_performances = []
        self.historique_alertes = []
        
        self.journal = logging.getLogger('MoniteurTraitementLot')
        
    def demarrer_surveillance(self):
        """Démarre le processus de surveillance."""
        if self.en_surveillance:
            return
        
        self.en_surveillance = True
        self.thread_surveillance = threading.Thread(
            target=self._boucle_surveillance, daemon=True
        )
        self.thread_surveillance.start()
        
        self.journal.info("Surveillance du traitement par lot démarrée.")
    
    def arreter_surveillance(self):
        """Arrête le processus de surveillance."""
        self.en_surveillance = False
        if self.thread_surveillance:
            self.thread_surveillance.join(timeout=10)
        
        self.journal.info("Surveillance du traitement par lot arrêtée.")
    
    def _boucle_surveillance(self):
        """Boucle principale de la surveillance, collectant les données et vérifiant les alertes."""
        while self.en_surveillance:
            try:
                donnees_performance = self._collecter_donnees_performance()
                alertes_detectees = self._verifier_conditions_alerte(donnees_performance)
                
                if alertes_detectees:
                    self._gerer_alertes(alertes_detectees)
                
                self._sauvegarder_donnees_performance(donnees_performance)
                self._nettoyer_historique()
                
                time.sleep(self.parametres_surveillance['intervalle_verification_sec'])
                
            except Exception as e:
                self.journal.error(f"Erreur dans la boucle de surveillance: {e}")
                time.sleep(5)
    
    def _collecter_donnees_performance(self) -> Dict[str, Any]:
        """Collecte les métriques système et les statistiques du gestionnaire de lots."""
        heure_actuelle = datetime.now()
        
        cpu_pourcentage = psutil.cpu_percent(interval=1)
        memoire_virtuelle = psutil.virtual_memory()
        utilisation_disque = psutil.disk_usage('/')
        
        statut_gestionnaire = self.gestionnaire_lots.obtenir_etat().get('progression', {}) # Utilise les stats du gestionnaire
        
        total_taches = statut_gestionnaire.get('total_taches', 0)
        taches_echec = statut_gestionnaire.get('taches_echec', 0)
        taux_erreur = (taches_echec / max(total_taches, 1)) * 100
        
        taches_terminees = statut_gestionnaire.get('taches_terminees', 0)
        temps_total_traitement = statut_gestionnaire.get('statistiques_performance', {}).get('temps_total_traitement', 0)
        temps_moyen_traitement = temps_total_traitement / max(taches_terminees, 1)
        
        return {
            'horodatage': heure_actuelle.isoformat(),
            'systeme': {
                'cpu_utilisation': cpu_pourcentage,
                'memoire_utilisation': memoire_virtuelle.percent,
                'memoire_disponible_mo': memoire_virtuelle.available / (1024 * 1024),
                'disque_utilisation': utilisation_disque.percent,
                'disque_libre_go': utilisation_disque.free / (1024 * 1024 * 1024)
            },
            'processeur_lot': {
                'taches_actives': statut_gestionnaire.get('taches_actives', 0),
                'taches_en_attente': statut_gestionnaire.get('taches_en_attente', 0),
                'taches_terminees': taches_terminees,
                'taches_echec': taches_echec,
                'taux_erreur': taux_erreur,
                'temps_moyen_traitement': temps_moyen_traitement,
                'longueur_file': statut_gestionnaire.get('taches_en_attente', 0) # La file d'attente du gestionnaire de lots
            }
        }
    
    def _verifier_conditions_alerte(self, donnees_performance: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Vérifie si les données de performance actuelles dépassent les seuils d'alerte."""
        alertes = []
        seuils = self.parametres_surveillance['seuils_alerte']
        
        donnees_systeme = donnees_performance['systeme']
        donnees_processeur = donnees_performance['processeur_lot']
        
        if donnees_systeme['cpu_utilisation'] > seuils['cpu_utilisation_pct']:
            alertes.append({'type': 'cpu_elevee', 'gravite': 'avertissement', 'message': f"Utilisation CPU élevée: {donnees_systeme['cpu_utilisation']:.1f}%", 'valeur': donnees_systeme['cpu_utilisation'], 'seuil': seuils['cpu_utilisation_pct']})
        
        if donnees_systeme['memoire_utilisation'] > seuils['memoire_utilisation_pct']:
            alertes.append({'type': 'memoire_elevee', 'gravite': 'avertissement', 'message': f"Utilisation mémoire élevée: {donnees_systeme['memoire_utilisation']:.1f}%", 'valeur': donnees_systeme['memoire_utilisation'], 'seuil': seuils['memoire_utilisation_pct']})
        
        if donnees_systeme['disque_utilisation'] > seuils['disque_utilisation_pct']:
            alertes.append({'type': 'disque_plein', 'gravite': 'critique', 'message': f"Utilisation disque élevée: {donnees_systeme['disque_utilisation']:.1f}%", 'valeur': donnees_systeme['disque_utilisation'], 'seuil': seuils['disque_utilisation_pct']})
        
        if donnees_processeur['taux_erreur'] > seuils['taux_erreur_pct']:
            alertes.append({'type': 'taux_erreur_elevé', 'gravite': 'critique', 'message': f"Taux d'erreur des tâches élevé: {donnees_processeur['taux_erreur']:.1f}%", 'valeur': donnees_processeur['taux_erreur'], 'seuil': seuils['taux_erreur_pct']})
        
        if donnees_processeur['longueur_file'] > seuils['longueur_file_taches']:
            alertes.append({'type': 'file_taches_longue', 'gravite': 'avertissement', 'message': f"File d'attente des tâches trop longue: {donnees_processeur['longueur_file']}", 'valeur': donnees_processeur['longueur_file'], 'seuil': seuils['longueur_file_taches']})
        
        if donnees_processeur['temps_moyen_traitement'] > seuils['delai_traitement_moyen_sec']:
            alertes.append({'type': 'delai_traitement_elevé', 'gravite': 'avertissement', 'message': f"Temps moyen de traitement élevé: {donnees_processeur['temps_moyen_traitement']:.1f} sec", 'valeur': donnees_processeur['temps_moyen_traitement'], 'seuil': seuils['delai_traitement_moyen_sec']})
        
        return alertes
    
    def _gerer_alertes(self, alertes: List[Dict[str, Any]]):
        """Traite les alertes détectées : les logue, envoie des notifications et déclenche des actions automatiques."""
        for alerte in alertes:
            alerte['horodatage'] = datetime.now().isoformat()
            alerte['id'] = f"{alerte['type']}_{int(time.time())}"
            
            self.historique_alertes.append(alerte)
            self.journal.warning(f"ALERTE: {alerte['message']} (Type: {alerte['type']}, Gravité: {alerte['gravite']})")
            
            if self.configuration_alertes.get('email_active'):
                self._envoyer_alerte_email(alerte)
            
            self._action_automatique_alerte(alerte)
    
    def _envoyer_alerte_email(self, alerte: Dict[str, Any]):
        """Envoie une notification d'alerte par email."""
        try:
            config_email = self.configuration_alertes.get('email', {})
            
            msg = MIMEMultipart()
            msg['From'] = config_email.get('email_expediteur')
            msg['To'] = ', '.join(config_email.get('destinataires', []))
            msg['Subject'] = f"[ALERTE TRAITEMENT LOT] {alerte['type']}"
            
            corps_email = f"""
            Type d'alerte: {alerte['type']}
            Gravité: {alerte['gravite']}
            Message: {alerte['message']}
            Valeur actuelle: {alerte['valeur']}
            Seuil: {alerte['seuil']}
            Heure: {alerte['horodatage']}
            """
            
            msg.attach(MIMEText(corps_email, 'plain', 'utf-8'))
            
            serveur = smtplib.SMTP(config_email.get('serveur_smtp'), config_email.get('port_smtp', 587))
            serveur.starttls()
            serveur.login(config_email.get('utilisateur_smtp'), config_email.get('mot_de_passe_smtp'))
            
            texte_email = msg.as_string()
            serveur.sendmail(config_email.get('email_expediteur'), config_email.get('destinataires'), texte_email)
            serveur.quit()
            
            self.journal.info(f"Alerte email envoyée pour {alerte['type']}.")
        except Exception as e:
            self.journal.error(f"Échec de l'envoi de l'alerte email: {e}")
    
    def _action_automatique_alerte(self, alerte: Dict[str, Any]):
        """Déclenche des actions automatiques en réponse à une alerte."""
        type_alerte = alerte['type']
        
        if type_alerte == 'file_taches_longue':
            current_workers = self.gestionnaire_lots.nombre_max_ouvriers
            if current_workers < 20: # Limite arbitraire
                new_workers = min(current_workers + 2, 20)
                self.gestionnaire_lots.nombre_max_ouvriers = new_workers # Nécessite une méthode set_max_workers ou réinitialiser le ThreadPool
                self.journal.info(f"Action auto: Augmentation des ouvriers de {current_workers} à {new_workers}.")
        
        elif type_alerte == 'memoire_elevee':
            current_workers = self.gestionnaire_lots.nombre_max_ouvriers
            if current_workers > 2: # Minimum
                new_workers = max(current_workers - 1, 2)
                self.gestionnaire_lots.nombre_max_ouvriers = new_workers
                self.journal.info(f"Action auto: Diminution des ouvriers de {current_workers} à {new_workers}.")
        
        elif type_alerte == 'taux_erreur_elevé':
            self.gestionnaire_lots.suspendre_traitement()
            self.journal.warning("Action auto: Traitement suspendu en raison d'un taux d'erreur élevé pour investigation.")
        
    def _sauvegarder_donnees_performance(self, donnees_performance: Dict[str, Any]):
        """Ajoute les données de performance à l'historique."""
        self.historique_performances.append(donnees_performance)
    
    def _nettoyer_historique(self):
        """Nettoie les anciennes données d'historique de performance et d'alertes."""
        heure_actuelle = datetime.now()
        heure_limite = heure_actuelle - timedelta(seconds=self.parametres_surveillance['fenetre_performance_sec'])
        
        self.historique_performances = [
            data for data in self.historique_performances
            if datetime.fromisoformat(data['horodatage']) > heure_limite
        ]
        
        self.historique_alertes = [
            alerte for alerte in self.historique_alertes
            if datetime.fromisoformat(alerte['horodatage']) > heure_limite
        ]
    
    def obtenir_tableau_bord_surveillance(self) -> Dict[str, Any]:
        """Génère un résumé pour le tableau de bord de surveillance."""
        if not self.historique_performances:
            return {'erreur': 'Aucune donnée de surveillance disponible.'}
        
        derniere_donnee = self.historique_performances[-1]
        
        tendances = {'cpu': 0, 'memoire': 0, 'taux_erreur': 0}
        if len(self.historique_performances) >= 2:
            avant_derniere_donnee = self.historique_performances[-2]
            tendances['cpu'] = derniere_donnee['systeme']['cpu_utilisation'] - avant_derniere_donnee['systeme']['cpu_utilisation']
            tendances['memoire'] = derniere_donnee['systeme']['memoire_utilisation'] - avant_derniere_donnee['systeme']['memoire_utilisation']
            tendances['taux_erreur'] = derniere_donnee['processeur_lot']['taux_erreur'] - avant_derniere_donnee['processeur_lot']['taux_erreur']
        
        alertes_recentes = sorted(
            self.historique_alertes[-10:], # Les 10 dernières
            key=lambda x: x['horodatage'], 
            reverse=True
        )
        
        resume_alertes = {
            'total_alertes': len(self.historique_alertes),
            'alertes_critiques': len([a for a in self.historique_alertes if a['gravite'] == 'critique']),
            'alertes_avertissement': len([a for a in self.historique_alertes if a['gravite'] == 'avertissement'])
        }
        
        return {
            'statut_actuel': derniere_donnee,
            'tendances': tendances,
            'alertes_recentes': alertes_recentes,
            'resume_alertes': resume_alertes,
            'configuration_surveillance': self.parametres_surveillance
        }
    
    def mettre_a_jour_seuils_alerte(self, nouveaux_seuils: Dict[str, Any]):
        """Met à jour les seuils d'alerte configurés."""
        self.parametres_surveillance['seuils_alerte'].update(nouveaux_seuils)
        self.journal.info(f"Seuils d'alerte mis à jour: {nouveaux_seuils}")

Étiquettes: Python métadonnées TraitementLot ExifTool ProgrammationAsynchrone

Publié le 30 juin à 18h57