Conception d'un Agent de Visualisation de Données via le Protocole A2A et CrewAI

Objectifs de l'Architecture

Ce guide technique détaille la mise en place d'une architecture distribuée pour la génération automatisée de graphiques. Nous cobminerons le protocole A2A (Agent-to-Agent) pour l'interopérabilité, CrewAI pour l'orchestration des tâches, et OpenRouter comme fournisseur de modèles de langage (LLM). L'objectif principal est de permettre à l'agent de produire et de renvoyer des données binaires (images) via le protocole, plutôt que de simples chaînes de caractères.

Initialisation et Configuration

Récupération du Code Source

Clonez le dépôt de référence et placez-vous dans le répertoire de l'agent d'analyse :

git clone git@github.com:a2aproject/a2a-samples.git
cd a2a-samples/samples/python/agents/analytics

# Alternative avec le dépôt dédié
git clone git@github.com:sing1ee/a2a-crewai-charts-agent.git
cd a2a-crewai-charts-agent

Variables d'Environnement

Créez un fichier .env à la racine pour configurer l'accès au LLM :

OPENROUTER_API_KEY=sk-or-v1-votre_cle_secrete
LLM_PROVIDER_MODEL=openrouter/anthropic/claude-3.7-sonnet

Démarrage du Service

Utilisez uv pour gérer l'environnement virtuel et lancer l'application :

uv venv
source .venv/bin/activate
uv run .

Le serveur A2A sera accessible sur http://localhost:10011.

Diagnostic avec A2A Inspector

L'inspecteur A2A est un utilitaire essentiel pour valider les échanges de messages selon le protocole.

  1. Accès : Naviguez vers https://inspector.a2aprotocol.ai.
  2. Connexion : Renseignez l'URL de votre agent local (http://localhost:10011) et initiez la connexion.
  3. Invocation : Envoyez une requête telle que "Tracer les ventes: Q1,500 Q2,750 Q3,1200".
  4. Analyse : Observez le cycle de vie de la requête, inspectez les capacités déclarées par l'agent et vérifiez la réception de l'artifact binaire (l'image générée).

Architecture et Flux de Données

Diagramme de Séquence

sequenceDiagram
    participant C as Client
    participant I as A2A Inspector
    participant S as Serveur A2A
    participant R as Routeur
    participant X as ExecuteurAgent
    participant O as OrchestrateurVisuel
    participant K as Crew
    participant T as OutilRendu
    participant P as Matplotlib
    participant M as MemoireVive
    
    C->>I: Demande de graphique (Q1:500, Q2:750)
    I->>S: POST /tasks (Message A2A)
    S->>R: Transmission au gestionnaire
    R->>X: execute(context, queue)
    X->>O: invoke(requete, session_id)
    O->>K: kickoff(inputs)
    K->>T: appel_outil(donnees, session_id)
    T->>T: Extraction des paires clé:valeur
    T->>P: Génération du bar chart
    P-->>T: Flux binaire PNG
    T->>M: Stockage avec identifiant unique
    M-->>T: Confirmation
    T-->>K: Retour de la clé d'image
    K-->>O: Transmission de la clé
    O-->>X: Retour de la clé
    X->>O: get_image_data(session_id, cle)
    O->>M: Récupération binaire
    M-->>O: Données Image
    O-->>X: Données Image
    X->>R: Encapsulation FilePart
    R->>S: Envoi événement completed_task
    S-->>I: Réponse A2A avec artifact
    I-->>C: Affichage du graphique

Implémentation des Composants

1. Point d'Entrée A2A (main.py)

Ce module définit l'identité de l'agent et ses compétences exposées au réseau.

specs_fonctionnelles = AgentCapabilities(streaming=False)
competence_visuelle = AgentSkill(
    id='generateur_visuel',
    name='Generateur de Graphiques',
    description='Produit des représentations graphiques à partir de données structurées.',
    tags=['visualisation', 'image', 'donnees'],
    examples=['Tracer les revenus: Jan,1000 Fev,2000 Mar,1500'],
)

carte_identite = AgentCard(
    name='Agent Visualisation',
    description='Transforme des entrées de type CSV en graphiques en barres.',
    url=f'http://{host}:{port}/',
    version='2.0.0',
    defaultInputModes=OrchestrateurVisuel.TYPES_CONTENU_SUPPORTE,
    defaultOutputModes=OrchestrateurVisuel.TYPES_CONTENU_SUPPORTE,
    capabilities=specs_fonctionnelles,
    skills=[competence_visuelle],
)

2. Orchestration CrewAI (orchestrator.py)

Définition des rôles et des tâches pour l'IA générative.

class OrchestrateurVisuel:
    def __init__(self):
        self.expert_donnees = Agent(
            role='Specialiste en Visualisation',
            goal='Generer une image de graphique en barres depuis des donnees CSV.',
            backstory='Expert en transformation de donnees brutes en representations visuelles claires.',
            verbose=False,
            allow_delegation=False,
            tools=[creer_visuel],
        )

        self.tache_tracage = Task(
            description=(
                "Analysez la requete : '{user_prompt}'.\n"
                "Si elle contient des paires separees par des virgules (ex: 'a:100, b:200'), "
                "reformatez-la en CSV avec l'en-tete 'Libelle,Montant'.\n"
                "Transmettez ce CSV a l'outil 'OutilRenduGraphique'.\n"
                "Utilisez l'identifiant de session : '{session_id}'."
            ),
            expected_output="L'identifiant unique de l'image generee",
            agent=self.expert_donnees,
        )

3. Outil de Rendu (outils.py)

Fonction responsable de la parsing des données et de la génération binaire.

@tool('OutilRenduGraphique')
def creer_visuel(donnees_brutes: str, identifiant_session: str) -> str:
    """Genere un graphique en barres et le retourne sous forme d'identifiant cache."""
    
    # Parsing des donnees
    tableau = pd.read_csv(StringIO(donnees_brutes), header=None)
    tableau.columns = ['Libelle', 'Montant']
    tableau['Montant'] = pd.to_numeric(tableau['Montant'], errors='coerce')
    
    # Creation du visuel
    figure, axe = plt.subplots()
    axe.bar(tableau['Libelle'], tableau['Montant'], color='skyblue')
    axe.set_xlabel('Categories')
    axe.set_ylabel('Valeurs')
    axe.set_title('Analyse des Donnees')
    
    # Export en memoire
    tampon = BytesIO()
    plt.savefig(tampon, format='png', dpi=100)
    plt.close(figure)
    tampon.seek(0)
    flux_binaire = tampon.read()
    
    # Encodage et mise en cache
    ressource_image = Imagedata(
        bytes=base64.b64encode(flux_binaire).decode('utf-8'),
        mime_type='image/png',
        name='analyse_visuelle.png',
        id=uuid4().hex,
    )
    
    donnees_session = cache.get(identifiant_session) or {}
    donnees_session[ressource_image.id] = ressource_image
    cache.set(identifiant_session, donnees_session)
    
    return ressource_image.id

4. Exécuteur de Tâches (executeur.py)

Pont entre le framework CrewAI et le protocole A2A.

class PiloteExecutionAgent(AgentExecutor):
    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        requete_utilisateur = context.get_user_input()
        
        # Execution de la chaine CrewAI
        resultat_brut = self.agent.invoke(requete_utilisateur, context.context_id)
        
        # Recuperation de l'asset binaire
        asset = self.agent.get_image_data(
            session_id=context.context_id, 
            image_key=resultat_brut.raw
        )
        
        if asset and not asset.error:
            artifacts = [
                Part(
                    root=FilePart(
                        file=FileWithBytes(
                            bytes=asset.bytes,
                            mimeType=asset.mime_type,
                            name=asset.name,
                        )
                    )
                )
            ]
        else:
            artifacts = [Part(root=TextPart(text=asset.error or 'Echec de la generation.'))]
        
        # Notification de fin de tache
        event_queue.enqueue_event(
            completed_task(
                context.task_id,
                context.context_id,
                [new_artifact(artifacts, f'visuel_{context.task_id}')],
                [context.message],
            )
        )

5. Gestion du Cache (memoire.py)

Implémentatoin thread-safe pour isoler les sessions utilisateurs.

class MemoireVive:
    """Cache memoire synchronise pour le stockage temporaire d'assets."""
    
    def __init__(self):
        self._verrou = threading.Lock()
        self._registre: dict[str, Any] = {}
    
    def get(self, cle: str) -> Any | None:
        with self._verrou:
            return self._registre.get(cle)
    
    def set(self, cle: str, valeur: Any) -> None:
        with self._verrou:
            self._registre[cle] = valeur

Écosystème et Pistes d'Évolution

L'architecture repose sur un ensemble d'outils modernes : le protocole A2A pour la communication, CrewAI pour l'orchestration, OpenRouter pour l'accès aux LLM, Matplotlib et Pandas pour la manipulation de données, et UV pour la gestion des dépendances Python.

Pour étendre les capacités de ce système, plusieurs axes d'améliorasion peuvent être explorés :

  • Intégration de nouveaux types de visualisations (camemberts, nuages de points, courbes d'évolution).
  • Mise en place de mécanismes de validation stricts pour les données d'entrée afin de prévenir les erreurs de parsing.
  • Remplacement du cache en mémoire par une solution persistante (Redis, système de fichiers) pour le partage d'assets.
  • Ajout du support du streaming pour notifier le client de l'avancement de la génération.
  • Extension des modes d'entrée pour accepter directement des fichiers CSV uploadés plutôt que du texte brut.

Étiquettes: a2a-protocol crewai openrouter Matplotlib Pandas

Publié le 1 juillet à 22h26