Dans le cadre du traitement de fichiers de données volumineux où chaque ligne représente un enregistrement, il est courant d'utiliesr des flux standards pour une intégration flexible avec des outils shell. Par exemple, un script Python peut lire depuis stdin, appliquer une fonction de transformation (ETL) et écrire les résultats vers stdout.
import sys
def transformer_enregistrement(ligne):
# Fonction ETL simulée avec un délai
import time
time.sleep(1)
return ligne.strip().upper()
for ligne in sys.stdin:
output = transformer_enregistrement(ligne)
print(output)
Exécution typique : cat donnees_entree | python traitement.py > donnees_sortie
L'efficacité du multithreading pour améliorer les performances dépend fortement de la nature de la tâche. Si la fonction ETL est principalement liée au CPU, le Global Interpreter Lock (GIL) de Python peut limiter les gains. Dans ce cas, il est recommandé d'uitliser le module multiprocessing. Pour les tâches liées aux E/S, comme les opérations disque, le multithreading peut offrir des améliorations notables.
Pour éviter les conflits d'écriture sur stdout lorsque plusieurs threads sont actifs, l'utilisation de verrous est essentielle. Voici un exemple avec un verrou partagé :
import threading
import time
class ProcesseurLignes(threading.Thread):
_verrou_ecriture = threading.Lock()
def __init__(self, source_donnees):
threading.Thread.__init__(self)
self.source_donnees = source_donnees
def run(self):
for item in self.source_donnees:
time.sleep(2)
with self._verrou_ecriture:
print(item + ' - Traite')
Une approche plus robuste consiste à utiliser des files d'attente (queues) pour distribuer le travail de manière équilibrée et collecter les résultats. Cela permet de répartir la charge efficacement et de gérer les variations dans le temps de traitement des lignes.
import threading
import queue
import sys
import time
def tache_travail(file_taches, file_resultats):
while True:
ligne = file_taches.get()
if ligne is None:
break
# Traitement ETL simulé
time.sleep(0.5)
resultat = ligne.rstrip().capitalize()
file_resultats.put(resultat)
file_taches.task_done()
# Initialisation des files d'attente
taches = queue.Queue()
resultats = queue.Queue()
# Démarrage des threads workers
nombre_threads = 4
for _ in range(nombre_threads):
t = threading.Thread(target=tache_travail, args=(taches, resultats))
t.start()
# Remplissage de la file des tâches depuis stdin
for ligne in sys.stdin:
taches.put(ligne)
# Signal de fin pour les threads
for _ in range(nombre_threads):
taches.put(None)
# Attente de la fin du traitement
taches.join()
# Écriture des résultats dans stdout
while not resultats.empty():
print(resultats.get())
Alternativement, pour les charges fortement liées au CPU, diviser le fichier d'entrée en plusieurs segments (par exemple avec l'utilitaire split) et traiter chaque segment dans un processus distinct peut s'avérer plus performant.