Introduction à TPL Dataflow
La bibliothèque TPL Dataflow dans .NET fournit des composants pour gérer les flux de données avec une concurrence efficace. Elle facilite la programmation basée sur les acteurs en offrant un mécanisme de messagerie intra-processus pour les tâches de pipeline et les données à granularité grossière.
Scénario de synchronisation de données
Dans un processus typique de synchronisation de données, on procède généralement en plusieurs étapes : extraction depuis une source, transformation et traitement des données, puis chargement vers une cible. Lorsque les étapes de traitement sont nombreuses et couvrent différents domaines, il est judicieux d'abstraire un pipeline de données avec des gestionnaires pour chaque étape.
Conception abstraite d'un pipeline
Une approche abstraite consiste à créer un pipeline générique avec un contexte de pipeline et une liste de gestionnaires. Voici un exemple de code illustrant cette conception :
public class PipelineGenerique<TEntree>
{
private List<IGestionnaire> _gestionnaires;
private ContextePipeline<TEntree> _contexte;
public PipelineGenerique()
{
this._gestionnaires = new List<IGestionnaire>();
}
public PipelineGenerique(object param) : this()
{
this._contexte = new ContextePipeline<TEntree>(this._gestionnaires, param);
}
public void AjouterGestionnaire(IGestionnaire gestionnaire)
{
_gestionnaires.Add(gestionnaire);
}
public virtual void Executer()
{
_contexte.ExecuterSuivant();
}
public virtual async Task ExecuterAsync()
{
_contexte.ExecuterSuivantAsync();
}
public Dictionary<string, object> Resultat
{
get { return this._contexte.DonneesSortie; }
}
}
Le contexte du pipeline gère l'exécution séquentielle des gestionnaires :
public class ContextePipeline<TEntree>
{
private readonly IEnumerable<IGestionnaire> _gestionnaires;
public IGestionnaire GestionnaireActuel { get; set; }
public TEntree DonneesEntree { get; }
public Dictionary<string, object> DonneesSortie { get; set; }
public ContextePipeline(List<IGestionnaire> gestionnaires, object param)
{
this._gestionnaires = gestionnaires;
this.DonneesEntree = (TEntree)param;
this.DonneesSortie = new Dictionary<string, object>();
}
public void ExecuterSuivant()
{
foreach (var gestionnaire in this._gestionnaires)
{
GestionnaireActuel = gestionnaire;
gestionnaire.Executer(this);
}
}
public void ExecuterSuivantAsync()
{
foreach (var gestionnaire in this._gestionnaires)
{
Task.Run(() => gestionnaire.Executer(this));
}
}
}
Utilisation client :
var pipeline = new PipelineGenerique<SourceDonnees>(donneesSource);
pipeline.AjouterGestionnaire(new ExtracteurDonnees());
pipeline.AjouterGestionnaire(new ChargeurDonnees());
pipeline.AjouterGestionnaire(new EditeurDonnees());
pipeline.Executer();
Cette conception sépare la logique de traitement, mais le contexte est limité et les gestionnaires doivent connaître les détails du contexte.
Pipeline avec TPL Dataflow
En analysant le processus, on peut modéliser un pipeline où chaque nœud transforme les données d'entrée en sortie, permettant une chaîne de traitement. L'utilisation des blocs de flux de données, comme TransformBlock et ActionBlock, offre une solution plus fluide et performante.
Exemple d'appel client avec une API chaînée :
var pipeline = new PipelineFluxDonnees<int, string>();
pipeline
.AjouterBlocInitial(x => $"{x}-premiere_etape-")
.AjouterBlocTransformation(src => src += " stre2-")
.AjouterBlocTransformation(src => src += " stre3-")
.AjouterBlocFinal(src => System.Console.WriteLine(src))
.Traiter(8).Wait();
Implémentation générique utilisant TPL Dataflow :
public class PipelineFluxDonnees<TEntree, TSortie>
{
private IPropagatorBlock<TEntree, TSortie> _blocInitial;
private List<TransformBlock<TSortie, TSortie>> _blocsIntermediaires = new List<TransformBlock<TSortie, TSortie>>();
public PipelineFluxDonnees<TEntree, TSortie> AjouterBlocInitial(Func<TEntree, TSortie> fonction)
{
var bloc = new TransformBlock<TEntree, TSortie>(fonction);
_blocInitial = bloc;
return this;
}
public PipelineFluxDonnees<TEntree, TSortie> AjouterBlocTransformation(Func<TSortie, TSortie> fonction)
{
var blocTransformation = new TransformBlock<TSortie, TSortie>(fonction);
if (_blocsIntermediaires.Any())
{
_blocsIntermediaires.Last().LinkTo(blocTransformation);
_blocsIntermediaires.Add(blocTransformation);
}
else
{
_blocsIntermediaires.Add(blocTransformation);
_blocInitial.LinkTo(blocTransformation);
}
return this;
}
public PipelineFluxDonnees<TEntree, TSortie> AjouterBlocFinal(Action<TSortie> action)
{
if (_blocsIntermediaires == null || !_blocsIntermediaires.Any())
{
throw new ArgumentException("Au moins un bloc de transformation est requis.");
}
var blocFinal = new ActionBlock<TSortie>(action);
_blocsIntermediaires.Last().LinkTo(blocFinal);
return this;
}
public void Traiter(TEntree source)
{
_blocInitial.Post(source);
}
}
Cette approche utilise des blocs de flux de données pour une meilleure gestion de la concurrence. Pour les scénarios avec des volumes importants, on peut intégrer un BufferBlock pour bufferiser les fllux d'entrée et implémenter un modèle producteur-consommateur avec des consommateurs parallèles, améliorant ainsi les performances.
Les blocs TransformBlock et ActionBlock constituent la base pour construire des pipelines de données fiibles et performants avec TPL Dataflow.