Analyse Approfondie des Techniques d'Évaluation et d'Itération dans Langflow

Introduction et Philosophie de Conception

Synthèse des Techniques d'Évaluation et d'Itération

Les techniques d'évaluation et d'itération de Langflow constituent un système de surveillance et d'optimisation à plusieurs niveaux, conçu pour offrir :

  • Surveillance de l'exécution en temps réel : Suivi de l'état d'exécution des flux via un système d'événements.
  • Évaluation des performances : Collecte et analyse d'indicateurs tels que le temps d'exécution et l'utilisation des ressources.
  • Suivi des erreurs : Mécanismes complets de capture d'exceptions et d'analyse des erreurs.
  • Optimisation itérative : Ajustement adaptatif et amélioration continue basés sur les données de surveillance.
  • Assurance qualité : Indicateurs d'évaluation multidimensionnels et contrôle qualité.

Philosophie de Conception Clé

Les principes fondamentaux de conception sont intégrés dans le moteur d'exécution du graphe :


# Principes de conception clés intégrés dans le moteur d'exécution du graphe
class GraphExecutor:
   def __init__(self):
       # Surveillance de l'exécution
       self._execution_count = 0
       self._update_count = 0
       self._start_timestamp = datetime.now(timezone.utc)
       self._execution_snapshots: list[dict[str, Any]] = []
       self._execution_order: list[str] = []
       
       # Service de traçage
       self.tracing_service: TracingService | None = self.get_tracing_service_instance()
       
       # Gestion des exécutions
       self.run_manager = RunnableVerticesManager()
       self.inactive_vertices_set: set = set()
       self.active_vertices_list: list[str] = []
 
  • Observabilité d'abord : Mécanismes de surveillance et de traçage intégrés.
  • Optimisation progressive : Prise en charge de l'amélioration incrémentale et itérative.
  • Récupération après défaillance : Capacités de gestion des erreurs et de restauration d'état.
  • Axé sur les performances : Indicateurs de performance guidant les décisions d'optimisation.

Architecture Principale et Modèle de Surveillance de l'Exécution

Architecture de Surveillance de l'Exécution

L'architecture de surveillance de l'exécution est conçue pour capturer chaque étape du processus du graphe, depuis le démarrage jusqu'à la complétion, en passant par les erreurs potentielles.

Modèle d'État d'Exécution

La gestion de l'état de chaque sommet est cruciale pour suivre le déroulement de l'exécution.


# Gestion des états des sommets
class VertexStates(str, Enum):
   ACTIVE = "ACTIVE"
   INACTIVE = "INACTIVE"
   ERROR = "ERROR"

class Vertex:
   def __init__(self):
       self.is_built = False
       self.execution_result: ResultData | None = None
       self.artifacts: dict[str, Any] = {}
       self.execution_logs: dict[str, list[Log]] = {}
       self.current_state = VertexStates.ACTIVE
       
   async def build(self, event_manager: EventManager = None):
       """Construit le sommet et enregistre les métriques d'exécution."""
       start_time = time.time()
       try:
           # Logique d'implémentation de la construction
           result = await self._build_implementation_logic()
           
           # Enregistrement des métriques de succès
           self.is_built = True
           self.execution_result = result
           
       except Exception as e:
           # Enregistrement des métriques d'erreur
           self.current_state = VertexStates.ERROR
           await self._log_execution_error(e)
           raise
       finally:
           # Enregistrement du temps d'exécution
           execution_duration = time.time() - start_time
           await self._record_performance_metrics(execution_duration)
 

Analyse du Moteur d'Exécution des Flux

Gestion Asynchrone de l'Exécution

Le moteur gère l'exécution des flux de manière asynchrone, optimisant l'utilisation des ressources et permettant une parallélisation efficace.


# Surveillance principale de l'exécution du graphe
class Graph:
   async def process(self, *, fallback_to_env_vars: bool, 
                    start_component_id: str | None = None,
                    event_manager: EventManager | None = None) -> Graph:
       """Traite l'exécution du graphe et effectue une surveillance complète."""
       
       # Initialisation du traçage de l'exécution
       await self.initialize_run_tracing()
       
       # Traitement par couches
       vertex_task_run_count: dict[str, int] = {}
       to_process_queue = deque(first_layer_vertices)
       layer_index = 0
       
       while to_process_queue:
           current_batch = list(to_process_queue)
           to_process_queue.clear()
           
           # Création des tâches concurrentes
           tasks = []
           for vertex_id in current_batch:
               vertex = self.get_vertex_by_id(vertex_id)
               task = asyncio.create_task(
                   self.build_vertex_execution(
                       vertex_id=vertex_id,
                       event_manager=event_manager,
                   ),
                   name=f"{vertex.identifier} Run {vertex_task_run_count.get(vertex_id, 0)}"
               )
               tasks.append(task)
               vertex_task_run_count[vertex_id] = vertex_task_run_count.get(vertex_id, 0) + 1

           logger.debug(f"Executing layer {layer_index} with {len(tasks)} tasks")
           
           try:
               # Exécution des tâches et collecte des résultats
               next_runnable_vertices = await self._execute_concurrent_tasks(
                   tasks, lock=asyncio.Lock(), has_webhook_component=True # Exemple de valeur
               )
           except Exception:
               logger.exception(f"Error executing tasks in layer {layer_index}")
               raise
               
           # Préparation de l'exécution de la couche suivante
           if next_runnable_vertices:
               to_process_queue.extend(next_runnable_vertices)
           layer_index += 1

       return self
 

Surveillance de l'Exécution des Tâches

Chaque tâche d'exécution de sommet est surveillée pour détecter les succès et les échecs.


async def _execute_concurrent_tasks(self, tasks: list[asyncio.Task], 
                                  lock: asyncio.Lock, 
                                  *, has_webhook_component: bool = False) -> list[str]:
   """Exécute les tâches et gère les exceptions."""
   
   results_collection = []
   completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
   processed_vertices: list[Vertex] = []

   for i, task_result in enumerate(completed_tasks):
       task_identifier = tasks[i].get_name()
       vertex_identifier = tasks[i].get_name().split(" ")[0]

       if isinstance(task_result, Exception):
           logger.error(f"Task {task_identifier} failed with exception: {task_result}")
           
           # Journalisation de l'exception dans le système de surveillance
           if has_webhook_component:
               await self._log_vertex_build_from_exception_details(vertex_identifier, task_result)

           # Annulation des tâches restantes
           for remaining_task in tasks[i + 1:]:
               remaining_task.cancel()
           raise task_result
           
       # Journalisation de l'exécution réussie
       if isinstance(task_result, VertexBuildResult):
           if self.flow_identifier is not None:
               await self.log_vertex_build_result(
                   flow_id=self.flow_identifier,
                   vertex_id=result.vertex.id,
                   is_valid=result.valid,
                   parameters=result.params,
                   data=result.result_dict,
                   artifacts=result.artifacts,
               )
           processed_vertices.append(task_result.vertex)

   return results_collection
 

Collecte des Métriques de Performance

Modèle des Métriques de Performance

Les résultats de la construction des sommets sont enregistrés pour l'analyse des performances et le débogage.


# Enregistrement du résultat de la construction du sommet
class VertexBuildRecord(SQLModel):
   record_timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
   vertex_id: str = Field(nullable=False)
   result_data: dict | None = Field(default=None)
   associated_artifacts: dict | None = Field(default=None)
   input_parameters: str | None = Field(default=None)
   is_valid: bool = Field(nullable=False)
   flow_id: UUID = Field()
   
   # Support multi-tenant
   job_uuid: UUID | None = Field(default=None)
   tenant_uuid: UUID | None = Field(default=None)
   organization_uuid: UUID | None = Field(default=None)
   business_domain_uuid: UUID | None = Field(default=None)

   @field_serializer("result_data")
   def serialize_data_with_limits(self, data) -> dict:
       """Sérialise les données en appliquant des limites de taille."""
       return serialize_data_to_primitive(data, 
                                          max_length=get_max_text_length_limit(), 
                                          max_items=get_max_items_length_limit())
 

Journalisation des Transactions

Chaque transaction est journalisée pour permettre une analyse détaillée des performances.


async def log_transaction_details(flow_id: str | UUID, source_vertex: Vertex, 
                                transaction_status, target_vertex: Vertex | None = None, 
                                error_details=None) -> None:
   """Journalise les transactions de manière asynchrone pour l'analyse des performances."""
   
   try:
       if not get_settings_service().are_transactions_storage_enabled():
           return
           
       # Nettoyage des paramètres sensibles
       inputs_serialized = _vertex_to_primitive_dict(source_vertex)
       
       # Sérialisation des données de résultat
       if source_vertex.execution_result:
           try:
               result_dict = source_vertex.execution_result.model_dump()
               for key, value in result_dict.items():
                   if isinstance(value, pd.DataFrame):
                       result_dict[key] = value.to_dict()
               outputs_serialized = result_dict
           except Exception as e:
               logger.warning(f"Error serializing result: {e!s}")
               outputs_serialized = None
       else:
           outputs_serialized = None

       # Création de l'enregistrement de transaction
       transaction_record = TransactionBase(
           vertex_id=source_vertex.id,
           inputs=inputs_serialized,
           outputs=outputs_serialized,
           status=transaction_status,
           error=str(error_details) if error_details else None,
           flow_id=str(flow_id),
           timestamp=datetime.now(timezone.utc)
       )
       
       # Stockage asynchrone
       await crud_log_transaction(transaction_record)
       
   except Exception as e:
       logger.error(f"Error logging transaction: {e}")
 

Système d'Événements et Suivi d'État

Architecture du Gestionnaire d'Événements

Le gestionnaire d'événements orchestre la communication entre les différents composants du système via une file d'attente d'événements.


class EventManager:
   def __init__(self, event_queue: asyncio.Queue, job_id: str | None = None):
       self.event_queue = event_queue
       self.registered_event_handlers: dict[str, PartialEventCallback] = {}
       self.job_id = job_id

   def register_event_handler(self, event_name: str, event_type: str, 
                              callback: EventCallback | None = None) -> None:
       """Enregistre un gestionnaire d'événements."""
       if not event_name.startswith("on_"):
           raise ValueError("Event name must start with 'on_'")
           
       if callback is None:
           # Utilise une fonction partielle pour envoyer l'événement
           handler_function = partial(self.emit_event, event_type=event_type)
       else:
           handler_function = partial(callback, manager=self, event_type=event_type)
       self.registered_event_handlers[event_name] = handler_function

   def emit_event(self, *, event_type: str, data: LoggableType):
       """Envoie un événement au système de surveillance."""
       try:
           # Création d'un événement standardisé par type
           if isinstance(data, dict) and event_type in {"message", "error", "warning", "info", "token"}:
               data = create_event_by_type(event_type, **data)
               
       except TypeError as e:
           logger.debug(f"Error creating playground event: {e}")
       except Exception:
           raise
           
       # Sérialisation des données de l'événement
       jsonable_data = jsonable_encoder(data)
       event_payload = {"event": event_type, "data": jsonable_data}
       event_unique_id = f"{event_type}-{uuid.uuid4()}"
       serialized_data = json.dumps(event_payload) + "\n\n"

       # Mise à jour de l'état du job
       from langflow.services.apa.resource.job.job_service import JobService
       run_until_complete_safe(JobService.update_job_status_from_event(self.job_id, event_type, data))

       # Ajout de l'événement à la file d'attente
       self.event_queue.put_nowait((event_unique_id, serialized_data.encode("utf-8"), time.time()))


Gestionnaire d'Événements par Défaut

Un ensemble standard d'événements est préconfiguré pour couvrir les cas d'utilisation courants.


def create_default_event_manager(queue: asyncio.Queue) -> EventManager:
   """Crée un gestionnaire d'événements par défaut."""
   manager = EventManager(queue)
   
   # Enregistrement des événements standards
   manager.register_event_handler("on_token", "token")
   manager.register_event_handler("on_vertices_sorted", "vertices_sorted")
   manager.register_event_handler("on_error", "error")
   manager.register_event_handler("on_end", "end")
   manager.register_event_handler("on_message", "add_message")
   manager.register_event_handler("on_remove_message", "remove_message")
   manager.register_event_handler("on_end_vertex", "end_vertex")
   manager.register_event_handler("on_build_start", "build_start")
   manager.register_event_handler("on_build_end", "build_end")
   
   return manager

Système de Journalisation et Support de Débogage

Système de Journalisation Structuré

Un tampon de journalisation de taille limitée permet une récupération efficace des journaux en temps réel.


class SizedLogBuffer:
   def __init__(self, max_buffer_size: int = 1000):
       """Tampon de journalisation pour la récupération des journaux en temps réel."""
       self.log_entries: deque = deque(maxlen=max_buffer_size)
       self._write_lock = Lock()
       self._max_retention = max_buffer_size

   def write(self, log_message: str) -> None:
       """Écrit un message de journal."""
       try:
           record = json.loads(log_message)
           log_content = record["text"]
           timestamp_epoch_ms = int(record["record"]["time"]["timestamp"] * 1000)
           
           with self._write_lock:
               # Assure que la taille maximale n'est pas dépassée
               while len(self.log_entries) >= self._max_retention:
                   self.log_entries.popleft()
               self.log_entries.append((timestamp_epoch_ms, log_content))
       except json.JSONDecodeError:
           logger.warning(f"Failed to decode log message: {log_message}")
       except KeyError as e:
           logger.warning(f"Missing key in log message structure: {e}")

   def get_logs_after_timestamp(self, since_timestamp: int, max_lines: int = 5) -> dict[int, str]:
       """Récupère les journaux après un horodatage spécifié."""
       relevant_logs = {}
       with self._write_lock:
           # Recherche à partir de la fin pour plus d'efficacité si les journaux sont récents
           for ts, msg in reversed(self.log_entries):
               if max_lines <= 0:
                   break
               if ts >= since_timestamp:
                   relevant_logs[ts] = msg
                   max_lines -= 1
           # Inverser pour maintenir l'ordre chronologique
           return dict(sorted(relevant_logs.items()))

Configuration et Gestion des Journaux

La configuration flexible du système de journalisation permet de s'adapter à différents environnements, du développement à la production.


def setup_logging(*, level: str | None = None,
            file_path: Path | None = None,
            disable_logging: bool | None = False,
            environment_type: str | None = None,
            log_format: str | None = None,
            use_async_file: bool = False) -> None:
   """Configure le système de journalisation."""
   
   # Suppression des gestionnaires par défaut
   logger.remove()
   logger.patch(patch_log_record)
   
   # Configuration pour l'environnement conteneurisé
   if environment_type.lower() in ["container", "container_json"]:
       logger.add(sys.stderr, format="{message}", serialize=True)
   elif environment_type.lower() == "container_csv":
       logger.add(sys.stderr, format="{time:YYYY-MM-DD HH:mm:ss.SSS} {level} {file} {line} {function} {message}")
   else:
       # Configuration pour l'environnement de développement
       if log_format is None or not is_valid_log_format(log_format):
           log_format = DEFAULT_CONSOLE_LOG_FORMAT
           
       # Activation de l'affichage formaté dans la console (si configuré)
       enable_pretty_logs = os.getenv("LANGFLOW_PRETTY_LOGS", "false").lower() == "true"
       if enable_pretty_logs:
           logger.configure(
               handlers=[{
                   "sink": RichHandler(rich_tracebacks=True, markup=True),
                   "format": log_format,
                   "level": level.upper() if level else "INFO",
               }]
           )
       else:
           logger.add(sys.stderr, level=level.upper() if level else "INFO", format=log_format, 
                     backtrace=True, diagnose=True)

       # Configuration de la journalisation dans un fichier
       if not file_path:
           cache_directory = Path(user_cache_dir("langflow"))
           file_path = cache_directory / "langflow.log"
           
       try:
           # Utilisation d'un Sink de fichier asynchrone si spécifié
           log_sink = AsyncFileSink(file_path) if use_async_file else file_path
           logger.add(
               sink=log_sink,
               level=level.upper() if level else "INFO",
               format=log_format,
               serialize=True, # Journalisation en format JSON
           )
       except Exception:
           logger.exception("Error setting up log file sink")

   # Activation du tampon de journalisation en mémoire
   if log_buffer.is_enabled():
       logger.add(sink=log_buffer.write, format="{time} {level} {message}", serialize=True)

Gestion des Erreurs et Analyse des Exceptions

Gestion des Erreurs de Construction de Composant

Les exceptions survenant lors de la construction des composants sont capturées et journalisées pour une analyse approfondie.


async def _log_vertex_build_from_exception_details(self, vertex_id: str, error_instance: Exception) -> None:
   """Enregistre les détails d'une exception de construction de sommet."""
   
   error_message_details: str
   traceback_formatted: str
   
   if isinstance(error_instance, ComponentBuildError):
       error_message_details = error_instance.message
       traceback_formatted = error_instance.formatted_traceback
   else:
       from langflow.api.utils import format_exception_message
       traceback_formatted = traceback.format_exc()
       logger.exception("Error during Component build process")
       error_message_details = format_exception_message(error_instance)
       
   # Construction du message d'erreur formaté
   formatted_error = {"errorMessage": error_message_details, "stackTrace": traceback_formatted}
   
   # Détermination de l'étiquette de sortie par défaut
   output_label = "output" # Valeur par défaut
   vertex_object = self.get_vertex_by_id(vertex_id)
   if vertex_object and vertex_object.outputs:
       output_label = vertex_object.outputs[0]["name"]
       
   vertex_outputs = {output_label: OutputValue(message=formatted_error, type="error")}
   
   # Création de la structure de réponse d'erreur
   error_response_data = {
       "results": {},
       "outputs": vertex_outputs,
       "logs": {},
       "messages": {},
       "artifacts": {},
       "timedelta": None,
       "duration": None,
       "used_frozen_result": False,
   }

   # Journalisation de l'erreur dans le système de suivi
   await self.log_vertex_build_result(
       flow_id=self.flow_id or "",
       vertex_id=vertex_id or "error_vertex",
       is_valid=False,
       parameters=error_message_details,
       data=error_response_data,
       artifacts={},
   )

Intercepteur d'Exceptions

Un intercepteur standard permet de rediriger les logs du module logging Python vers le système Loguru.


class ExceptionInterceptorHandler(logging.Handler):
   """Intercepteur pour rediriger les logs standard vers Loguru."""
   
   @override
   def emit(self, record: logging.LogRecord) -> None:
       # Récupération du niveau Loguru correspondant
       try:
           loguru_level = logger.level(record.levelname).name
       except ValueError:
           loguru_level = record.levelno # Utilisation du code numérique si le niveau n'est pas trouvé

       # Recherche des informations de l'appelant
       frame, depth = logging.currentframe(), 2
       while frame and frame.f_code.co_filename == logging.__file__:
           frame = frame.f_back
           depth += 1

       # Journalisation avec Loguru, en incluant les informations d'exception si présentes
       logger.opt(depth=depth, exception=record.exc_info).log(loguru_level, record.getMessage())

Optimisation Itérative et Mécanismes Adaptatifs

Instantanés d'État du Graphe

Des instantanés de l'état du graphe sont créés pour permettre le retour arrière et l'analyse des exécutions.


class Graph:
   def _capture_state_snapshot(self):
       """Crée un instantané de l'état du graphe pour retour arrière et analyse."""
       return {
           "_execution_queue": self._execution_queue.copy(),
           "_initial_layer": self._initial_layer.copy(),
           "vertex_execution_layers": copy.deepcopy(self.vertex_execution_layers),
           "vertices_scheduled_for_run": copy.deepcopy(self.vertices_scheduled_for_run),
           "run_manager_state": copy.deepcopy(self.run_manager.get_state_as_dict()),
       }

   def _record_execution_snapshot(self, vertex_id: str | None = None) -> None:
       """Enregistre un instantané de l'exécution."""
       self._execution_snapshots.append(self.get_current_state_snapshot())
       if vertex_id:
           self._execution_order.append(vertex_id)

   def get_current_state_snapshot(self):
       """Récupère l'instantané de l'état actuel."""
       return copy.deepcopy({
           "run_manager_state": self.run_manager.get_state_as_dict(),
           "execution_queue": self._execution_queue,
           "vertex_execution_layers": self.vertex_execution_layers,
           "initial_layer": self.initial_layer,
           "inactive_vertices_set": self.inactive_vertices_set,
           "active_vertices_list": self.active_vertices_list,
       })

Gestion Adaptative des Sommets

Le système peut activer dynamiquement des sommets en fonction du contexte d'exécution, comme les changements d'état.


def activate_state_dependent_vertices(self, state_name: str, caller_vertex_id: str) -> None:
   """Active les sommets associés à un nom d'état donné."""
   vertex_ids_to_activate = set()
   updated_predecessor_map = {}
   newly_activated_vertices = []
   
   for vertex_id in self.state_dependent_vertex_ids:
       calling_vertex = self.get_vertex_by_id(caller_vertex_id)
       current_vertex = self.get_vertex_by_id(vertex_id)
       
       if vertex_id == caller_vertex_id or current_vertex.display_name == calling_vertex.display_name:
           continue
           
       context_key_param = current_vertex.raw_parameters.get("context_key")
       if isinstance(context_key_param, str) and state_name in context_key_param and vertex_id != caller_vertex_id:
           if isinstance(current_vertex, StateVertex): # Supposant qu'il existe une classe StateVertex
               newly_activated_vertices.append(vertex_id)
               vertex_ids_to_activate.add(vertex_id)
               
               # Récupération de tous les successeurs directs et indirects
               all_successors = self.get_all_successors(current_vertex, flat=True)
               
               # Collecte des prédécesseurs des successeurs
               successors_predecessors = set()
               for successor in all_successors:
                   successors_predecessors.update(self.get_all_predecessors(successor))

               # Reconstruction de l'ensemble des arêtes
               edge_set = set()
               for v in [current_vertex, *all_successors, *successors_predecessors]:
                   edge_set.update(v.edges)
                   if v.current_state == VertexStates.INACTIVE:
                       v.set_state("ACTIVE") # Mise à jour de l'état
                   vertex_ids_to_activate.add(v.id)

               # Reconstruction de la carte des prédécesseurs
               edges_list = list(edge_set)
               predecessor_map, _ = self.build_adjacency_maps(edges_list)
               updated_predecessor_map.update(predecessor_map)

   # Mise à jour de l'ensemble des sommets à exécuter
   vertex_ids_to_activate.update(updated_predecessor_map.keys())
   vertex_ids_to_activate.update(v_id for successors_list in updated_predecessor_map.values() for v_id in successors_list)

   self.active_vertices_list = newly_activated_vertices # Mise à jour de la liste des actifs
   self.vertices_scheduled_for_run.update(vertex_ids_to_activate)
   self.run_manager.update_run_state(
       run_predecessors=updated_predecessor_map,
       vertices_to_run=self.vertices_scheduled_for_run,
   )

Indicateurs d'Évaluation et Assurance Qualité

Système de Traçage Distribué

Le service de traçage supporte l'intégration avec divers backends de traçage distribué pour une visibilité complète.


class TracingService(Service):
   """Service de traçage distribué."""
   
   async def initialize_tracers_for_run(self, run_uuid: UUID, run_name: str, 
                                      user_identifier: str | None, session_identifier: str | None,
                                      project_name: str | None = None) -> None:
       """Initialise le traçage pour une exécution de graphe."""
       
       if self.is_deactivated:
           return
           
       try:
           resolved_project_name = project_name or os.getenv("LANGCHAIN_PROJECT", "Langflow")
           trace_context = TraceContext(run_uuid, run_name, resolved_project_name, user_identifier, session_identifier)
           trace_context_variable.set(trace_context)
           
           # Démarrage des threads de travail pour le traçage
           await self._start_background_tracing(trace_context)
           
           # Initialisation des différents traceurs spécifiques
           self._setup_langsmith_tracer(trace_context)
           self._setup_langwatch_tracer(trace_context)
           self._setup_langfuse_tracer(trace_context)
           self._setup_arize_phoenix_tracer(trace_context)
           self._setup_opik_tracer(trace_context)
           
       except Exception as e:
           logger.debug(f"Error initializing tracers: {e}")

   @asynccontextmanager
   async def trace_component_execution(self, component: Component, trace_span_name: str,
                                      input_data: dict[str, Any], 
                                      metadata: dict[str, Any] | None = None):
       """Gère le traçage d'un composant individuel."""
       
       if self.is_deactivated:
           yield self
           return
           
       span_id = trace_span_name
       if component._vertex:
           span_id = component._vertex.id # Utilise l'ID du sommet comme span ID si disponible
           
       span_type = component.trace_type
       component_trace_context = ComponentTraceContext(
           span_id, trace_span_name, span_type, component._vertex, input_data, metadata
       )
       
       component_context_variable.set(component_trace_context)
       current_trace_context = trace_context_variable.get()
       
       if current_trace_context is None:
           msg = "Cannot call trace_component_execution without an active trace context."
           raise RuntimeError(msg)
           
       # Ajout des entrées au contexte global du traceur
       current_trace_context.all_inputs[trace_span_name].update(input_data or {})
       
       # Démarrage du span du composant
       await current_trace_context.traces_processing_queue.put(
           (self._start_component_span, (component_trace_context, current_trace_context))
       )
       
       try:
           yield self # Permet l'exécution du code du composant
       except Exception as e:
           # Enregistrement de la fin du span avec exception
           await current_trace_context.traces_processing_queue.put(
               (self._end_component_span, (component_trace_context, current_trace_context, e))
           )
           raise # Propage l'exception
       else:
           # Enregistrement de la fin normale du span
           await current_trace_context.traces_processing_queue.put(
               (self._end_component_span, (component_trace_context, current_trace_context, None))
           )

Collecte des Données Télémétriques

Le service de télémétrie collecte des données anonymisées pour l'analyse des performances et l'amélioration du produit.


class TelemetryService(Service):
   """Service de télémétrie pour la collecte des métriques de performance."""
   
   def __init__(self, settings_manager: SettingsService):
       super().__init__()
       self.settings_manager = settings_manager
       self.telemetry_endpoint_url = settings_manager.settings.telemetry_base_url
       self.telemetry_event_queue: asyncio.Queue = asyncio.Queue()
       self.http_client = httpx.AsyncClient(timeout=10.0)
       self.is_operational = False
       self._stop_event_flag = False
       
       # Intégration OpenTelemetry
       self.open_telemetry_integration = OpenTelemetry(prometheus_enabled=settings_manager.settings.prometheus_enabled)
       
       # Paramètre de confidentialité
       self.disable_tracking = (
           os.getenv("DO_NOT_TRACK", "False").lower() == "true" 
           or settings_manager.settings.do_not_track
       )

   async def send_telemetry_payload(self, payload: BaseModel, endpoint_path: str | None = None) -> None:
       """Envoie une charge utile de télémétrie au serveur."""
       if self.disable_tracking:
           logger.debug("Telemetry tracking is disabled. Data will not be sent.")
           return

       target_url = f"{self.telemetry_endpoint_url}"
       if endpoint_path:
           target_url = f"{target_url}/{endpoint_path}"
           
       try:
           payload_dictionary = payload.model_dump(by_alias=True, exclude_none=True, exclude_unset=True)
           response = await self.http_client.get(target_url, params=payload_dictionary)
           
           if response.status_code != httpx.codes.OK:
               logger.error(f"Failed to send telemetry data to {target_url}: {response.status_code} {response.text}")
           else:
               logger.debug("Telemetry data sent successfully.")
               
       except httpx.RequestError:
           logger.error("Network error occurred while sending telemetry data.")
       except Exception:
           logger.error("An unexpected error occurred during telemetry data transmission.")

   async def report_package_version_info(self) -> None:
       """Rapporte les informations de version du package et de l'environnement."""
       python_version_short = ".".join(platform.python_version().split(".")[:2])
       package_version_details = get_version_info()
       
       # Détermination de l'architecture du système si non déjà fait
       if self.system_architecture is None:
           self.system_architecture = (await asyncio.to_thread(platform.architecture))[0]
           
       version_payload = VersionPayload(
           package_name=package_version_details["package"].lower(),
           package_version=package_version_details["version"],
           system_platform=platform.platform(),
           python_version=python_version_short,
           cache_type_used=self.settings_manager.settings.cache_type,
           backend_only_mode=self.settings_manager.settings.backend_only,
           architecture=self.system_architecture,
           auto_login_enabled=self.settings_manager.auth_settings.AUTO_LOGIN,
           is_desktop_app=self._check_langflow_desktop_mode(),
       )
       
       # Ajout de l'événement d'envoi à la file d'attente
       await self._queue_telemetry_event((self.send_telemetry_payload, version_payload, None))


Tableaux de Bord de Surveillance et Visualisation

Flux de Journaux en Temps Réel

Une interface SSE (Server-Sent Events) permet de diffuser les journaux en temps réel vers le client.


@log_api_router.get("/logs-stream")
async def stream_realtime_logs(request: Request):
   """Point d'accès SSE pour le streaming des journaux en temps réel."""
   
   global log_buffer # Accès au tampon de journalisation global
   if not log_buffer.is_enabled():
       raise HTTPException(
           status_code=HTTPStatus.NOT_IMPLEMENTED,
           detail="Log retrieval is currently disabled.",
       )

   # Utilisation de StreamingResponse pour la diffusion des événements
   return StreamingResponse(event_stream_generator(request), media_type="text/event-stream")

async def event_stream_generator(request: Request):
   """Générateur pour le flux d'événements SSE des journaux."""
   global log_buffer
   last_processed_entry = None
   consecutive_no_new_entries = 0
   
   while not await request.is_disconnected(): # Continuer tant que la connexion est active
       new_entries_to_send: list[Any] = []
       
       with log_buffer.get_read_write_lock(): # Verrou pour accès concurrentiel au tampon
           if last_processed_entry is None:
               # Si c'est la première lecture, prendre la dernière entrée comme référence
               if log_buffer.log_entries:
                   last_processed_entry = log_buffer.log_entries[-1]
           else:
               # Trouver les nouvelles entrées depuis la dernière lecture
               found_last = False
               for entry in log_buffer.log_entries:
                   if found_last:
                       new_entries_to_send.append(entry)
                       last_processed_entry = entry
                       continue
                   if entry is last_processed_entry:
                       found_last = True
                       continue

               # Gérer le cas où le tampon a été vidé et rempli à nouveau
               if not found_last and log_buffer.log_entries:
                   new_entries_to_send.extend(log_buffer.log_entries)
                   last_processed_entry = log_buffer.log_entries[-1]
                       
       if new_entries_to_send:
           for timestamp_ms, log_message_content in new_entries_to_send:
               # Formater chaque entrée comme un événement JSON
               yield f"data: {json.dumps({timestamp_ms: log_message_content})}\n\n"
       else:
           consecutive_no_new_entries += 1
           # Envoyer un signal de "keepalive" pour maintenir la connexion active
           if consecutive_no_new_entries >= KEEPALIVE_THRESHOLD:
               consecutive_no_new_entries = 0
               yield "event: keepalive\ndata: {}\n\n"

       await asyncio.sleep(1) # Attendre avant la prochaine vérification

API de Requête des Journaux

Une API RESTful permet de récupérer l'historique des journaux selon divers critères.


@log_api_router.get("/logs")
async def query_historical_logs(
   lines_before_ts: Annotated[int, Query(description="Nombre de lignes avant l'horodatage")] = 0,
   lines_after_ts: Annotated[int, Query(description="Nombre de lignes après l'horodatage")] = 0,
   timestamp_query: Annotated[int, Query(description="Horodatage de référence pour la requête")] = 0,
):
   """Récupère les données historiques des journaux."""
   
   global log_buffer
   if not log_buffer.is_enabled():
       raise HTTPException(
           status_code=HTTPStatus.NOT_IMPLEMENTED,
           detail="Log retrieval is disabled.",
       )
       
   # Validation des paramètres de requête
   if lines_after_ts > 0 and lines_before_ts > 0:
       raise HTTPException(
           status_code=HTTPStatus.BAD_REQUEST,
           detail="Cannot request logs both before and after the specified timestamp.",
       )
       
   retrieved_content: dict[int, str]
   if timestamp_query <= 0:
       # Si aucun horodatage n'est fourni, récupérer les N dernières lignes ou un défaut
       if lines_after_ts > 0: # Doit spécifier un timestamp pour 'after'
            raise HTTPException(
               status_code=HTTPStatus.BAD_REQUEST,
               detail="Timestamp is required when requesting logs after a specific point.",
           )
       retrieved_content = log_buffer.get_last_n_logs(10) if lines_before_ts <= 0 else log_buffer.get_last_n_logs(lines_before_ts)
   elif lines_before_ts > 0:
       # Récupérer les lignes avant l'horodatage spécifié
       retrieved_content = log_buffer.get_logs_before_timestamp(timestamp=timestamp_query, max_lines=lines_before_ts)
   elif lines_after_ts > 0:
       # Récupérer les lignes après l'horodatage spécifié
       retrieved_content = log_buffer.get_logs_after_timestamp(since_timestamp=timestamp_query, max_lines=lines_after_ts)
   else:
       # Cas par défaut : récupérer les 10 dernières lignes avant l'horodatage si aucun autre critère n'est spécifié
       retrieved_content = log_buffer.get_logs_before_timestamp(timestamp=timestamp_query, max_lines=10)
       
   return JSONResponse(content=retrieved_content)

Outils de Débogage et Support au Développement

Support à l'Exécution Pas à Pas

Le moteur d'exécution permet une exécution pas à pas des sommets du graphe pour faciliter le débogage.


class Graph:
   async def step_forward_async(self, inputs: InputValueRequest | None = None,
                              files: list[str] | None = None,
                              user_id: str | None = None,
                              event_manager: EventManager | None = None):
       """Exécute de manière asynchrone le prochain sommet du graphe."""
       
       if not self._is_prepared:
           raise ValueError("Graph must be prepared before stepping. Call prepare() first.")
           
       if not self._execution_queue: # Si la file d'exécution est vide
           self._terminate_all_traces_async() # Terminer les traces
           return Finish() # Retourner un indicateur de fin
           
       # Obtenir l'identifiant du prochain sommet à exécuter
       next_vertex_id = self.get_next_vertex_from_queue()
       chat_service = get_chat_service() # Accès au service de chat pour la mise en cache
       
       # Exécution du sommet
       vertex_build_result = await self.build_vertex_execution(
           vertex_id=next_vertex_id,
           user_id=user_id,
           input_parameters_dict=inputs.model_dump() if inputs else {},
           files_provided=files,
           cache_retriever=chat_service.get_cache,
           cache_setter=chat_service.set_cache,
           event_manager=event_manager,
       )

       # Déterminer les prochains sommets exécutables après celui-ci
       next_runnable_vertices = await self.get_next_runnable_vertices_after(
           lock=self._mutex_lock, current_vertex=vertex_build_result.vertex, use_cache=False
       )
       
       # Gérer le cas où un sommet spécifique doit arrêter l'exécution
       if self.stop_at_vertex and self.stop_at_vertex in next_runnable_vertices:
           next_runnable_vertices = [self.stop_at_vertex]
           
       self.add_vertices_to_execution_queue(next_runnable_vertices) # Ajouter à la file d'attente
       self.reset_inactivated_vertex_status() # Réinitialiser les états inactifs
       self.reset_activated_vertex_status() # Réinitialiser les états actifs

       # Mise à jour du cache du graphe
       await chat_service.set_cache(str(self.flow_id or self._run_id), self)
       self._record_execution_snapshot(next_vertex_id) # Enregistrer l'instantané
       
       return vertex_build_result # Retourner le résultat de la construction du sommet

   def step_forward_sync(self, inputs: InputValueRequest | None = None,
                        files: list[str] | None = None,
                        user_id: str | None = None):
       """Wrapper synchrone pour l'exécution pas à pas."""
       return run_until_complete(self.step_forward_async(inputs, files, user_id))

Collecte d'Informations de Débogage

Des fonctions utilitaires aident à déterminer le type des artefacts et à formater les données pour l'affichage en débogage.


def determine_artifact_type(value, build_result) -> str:
   """Détermine le type d'un artefact pour l'affichage en débogage."""
   artifact_category = ArtifactCategory.UNKNOWN
   
   match value:
       case DataRecord(): # Supposant une classe DataRecord
           artifact_category = ArtifactCategory.RECORD
       case str():
           artifact_category = ArtifactCategory.TEXT
       case dict():
           artifact_category = ArtifactCategory.OBJECT
       case list():
           artifact_category = ArtifactCategory.ARRAY
       case Message():
           artifact_category = ArtifactCategory.MESSAGE

   # Cas spécial pour les générateurs ou les flux de messages
   if artifact_category == ArtifactCategory.UNKNOWN and (
       isinstance(build_result, Generator) or 
       (isinstance(value, Message) and isinstance(value.text, Generator))
   ):
       artifact_category = ArtifactCategory.STREAM

   return artifact_category.value # Retourne la valeur enum (string)

def post_process_for_debugging_display(raw_data, artifact_type: str):
   """Traite les données brutes pour l'affichage en mode débogage."""
   if artifact_type == ArtifactCategory.STREAM.value:
       # Pour les flux, on peut afficher une chaîne vide ou un indicateur
       return "[Streaming Data]" 
   return raw_data

Amélioration Continue et Boucle de Rétroaction

Mécanismes d'Optimisation des Performances

Le système peut réorganiser l'ordre d'exécution des sommets en fonction de leurs temps de construction moyens pour optimiser le flux global.


class Graph:
   def sort_vertices_by_average_build_time(self, vertex_execution_layers: list[list[str]]) -> list[list[str]]:
       """Trie les sommets en fonction de leur temps de construction moyen."""
       
       def sort_layer_by_avg_build_time(vertex_ids_in_layer: list[str]) -> list[str]:
           if len(vertex_ids_in_layer) <= 1:
               return vertex_ids_in_layer
               
           # Tri basé sur l'attribut avg_build_time du sommet
           vertex_ids_in_layer.sort(key=lambda vid: self.get_vertex_by_id(vid).avg_build_time)
           return vertex_ids_in_layer

       return [sort_layer_by_avg_build_time(layer) for layer in vertex_execution_layers]

   @staticmethod
   def prioritize_interface_components(vertex_execution_layers: list[list[str]]) -> list[list[str]]:
       """Place les composants d'interface en tête de chaque couche d'exécution."""
       
       def is_interface_component(vertex_id):
           # Vérifie si le type de composant fait partie des types d'interface
           component_type_prefix = vertex_id.split("-")[0] # Extrait le type du préfixe
           return component_type_prefix in {"Input", "Output", "ChatInput"} # Exemples de types d'interface

       # Utilise une clé de tri pour mettre les composants d'interface en premier
       return [
           sorted(
               layer_vertices,
               key=lambda vid: not is_interface_component(vid), # False (0) pour interface, True (1) sinon
           )
           for layer_vertices in vertex_execution_layers
       ]

Stratégies de Mise en Cache Adaptatives

Le système désactive intelligemment la mise en cache pour les sommets impliqués dans des cycles ou pour des composants spécifiques comme Listen/Notify.


def _configure_caching_for_cyclic_vertices(self) -> None:
   """Configure la stratégie de mise en cache pour les sommets dans des cycles."""
   all_edges = self._get_all_edges_as_list_of_tuples()
   vertices_in_cycles = set(find_cycle_vertices(all_edges)) # Supposant une fonction find_cycle_vertices
   
   for vertex in self.vertices:
       if vertex.id in vertices_in_cycles:
           # Désactiver la mise en cache pour les sommets dans des cycles pour assurer l'exécution correcte
           vertex.apply_to_all_outputs(lambda output_obj: setattr(output_obj, "cache_enabled", False))

def _disable_caching_for_listen_notify_components(self) -> None:
   """Désactive la mise en cache si des composants Listen/Notify sont présents."""
   has_special_component = any(
       vertex.type_prefix in {"Listen", "Notify"} for vertex in self.vertices
   )
   
   if has_special_component:
       # Si un composant Listen ou Notify est détecté, désactiver la mise en cache pour tous les sommets
       for vertex in self.vertices:
           vertex.apply_to_all_outputs(lambda output_obj: setattr(output_obj, "cache_enabled", False))

Exemples d'Application

Exemple de Surveillance de Base

Démontre l'utilisation du système d'événements pour surveiller l'exécution d'un flux simple.


# Exemple 1: Surveillance de base d'un flux
import asyncio
from langflow.graph.graph.base import Graph # Classe Graph hypothétique
from langflow.events.event_manager import create_default_event_manager # Fonction de création

async def basic_monitoring_scenario():
   """Scénario d'exemple pour la surveillance de base."""
   
   # Initialisation de la file d'événements et du gestionnaire
   event_queue_instance = asyncio.Queue()
   event_manager_instance = create_default_event_manager(event_queue_instance)
   
   # Création d'une instance de graphe
   graph_instance = Graph(flow_id="monitoring_example_01", flow_name="Basic Monitoring Flow")
   
   # Enregistrement d'un gestionnaire d'événements personnalisé pour les erreurs
   def custom_error_handler(*, manager, event_type, data):
       print(f"Custom Error Handler Triggered: {event_type} - {data}")
       # Logique de traitement d'erreur personnalisée ici
       
   event_manager_instance.register_event_handler("on_custom_error", "error_occurred", custom_error_handler)
   
   # Préparation du graphe pour l'exécution
   graph_instance.prepare_execution()
   
   # Exécution du graphe avec surveillance
   try:
       async for step_result in graph_instance.async_execute_flow(
           initial_inputs=[{"query": "What is Langflow?"}], # Entrées initiales
           event_manager=event_manager_instance
       ):
           print(f"Step Result: {step_result}")
           
           # Traitement des événements en attente
           while not event_queue_instance.empty():
               event_id, event_data, timestamp = event_queue_instance.get_nowait()
               print(f"Event Received: {event_id} at {timestamp}")
               
   except Exception as e:
       # Génération d'un événement d'erreur personnalisé
       event_manager_instance.emit_event(event_type="error_occurred", data={"error": str(e), "context": "graph_execution_step"})
       
   print("Basic monitoring scenario finished.")

# Pour exécuter cet exemple :
# asyncio.run(basic_monitoring_scenario())

Exemple d'Analyse Avancée des Performances

Illustre comment collecter et analyser les métriques de performance pour identifier les goulots d'étranglement.


# Exemple 2: Analyse de performance avancée et optimisation
import time
from datetime import datetime, timezone
from langflow.services.tracing.service import TracingService # Service de traçage hypothétique
from langflow.services.telemetry.service import TelemetryService # Service de télémétrie hypothétique

class PerformanceProfiler:
   """Outil pour profiler et analyser les performances d'exécution."""
   
   def __init__(self):
       self.performance_metrics: dict[str, float] = {} # Stocke les durées par opération
       self.active_timers: dict[str, float] = {} # Stocke les temps de début des opérations actives
       self.execution_log: list[dict[str, Any]] = [] # Historique détaillé des exécutions
       
   def start_operation_timer(self, operation_key: str):
       """Démarre le chronomètre pour une opération."""
       self.active_timers[operation_key] = time.time()
       
   def stop_operation_timer(self, operation_key: str) -> float | None:
       """Arrête le chronomètre, enregistre la durée et retourne la durée."""
       if operation_key in self.active_timers:
           end_time = time.time()
           duration = end_time - self.active_timers[operation_key]
           self.performance_metrics[operation_key] = duration
           self.execution_log.append({
               "operation": operation_key,
               "duration_seconds": duration,
               "timestamp_utc": datetime.now(timezone.utc)
           })
           del self.active_timers[operation_key] # Nettoyer les timers actifs
           return duration
       return None
       
   def generate_performance_report(self) -> str:
       """Génère un rapport de performance basé sur les données collectées."""
       if not self.execution_log:
           return "No performance data has been collected yet."
           
       total_operations_count = len(self.execution_log)
       total_execution_time = sum(item["duration_seconds"] for item in self.execution_log)
       average_time_per_operation = total_execution_time / total_operations_count if total_operations_count > 0 else 0
       
       # Identification des opérations les plus lentes et les plus rapides
       slowest_operation = max(self.execution_log, key=lambda x: x["duration_seconds"])
       fastest_operation = min(self.execution_log, key=lambda x: x["duration_seconds"])
       
       report_string = f"""
Performance Analysis Report
==========================
Total Operations Executed: {total_operations_count}
Total Execution Time: {total_execution_time:.4f}s
Average Time per Operation: {average_time_per_operation:.4f}s

Slowest Operation Recorded: {slowest_operation['operation']} ({slowest_operation['duration_seconds']:.4f}s)
Fastest Operation Recorded: {fastest_operation['operation']} ({fastest_operation['duration_seconds']:.4f}s)

Recent Operation Logs (Last 10):
"""
       
       # Affichage des 10 dernières opérations enregistrées
       for log_entry in self.execution_log[-10:]:
           report_string += f"  - {log_entry['operation']}: {log_entry['duration_seconds']:.4f}s at {log_entry['timestamp_utc']}\n"
           
       return report_string

async def advanced_performance_scenario():
   """Scénario d'exemple pour l'analyse de performance avancée."""
   
   profiler = PerformanceProfiler()
   
   # Simulation d'un graphe avec des étapes d'exécution
   class MockGraphExecution:
       def __init__(self):
           self.execution_steps = ["vertex_A", "vertex_B", "vertex_C", "vertex_D"] # Étapes simulées
           
       async def run_steps_with_profiling(self, profiler: PerformanceProfiler):
           """Exécute les étapes en utilisant le profiler."""
           
           for step_id in self.execution_steps:
               profiler.start_operation_timer(f"execution_{step_id}")
               
               # Simulation de temps d'exécution variables
               if step_id == "vertex_C": # Simule un composant plus lent (ex: appel LLM)
                   await asyncio.sleep(0.6)
               elif step_id == "vertex_B": # Composant intermédiaire
                   await asyncio.sleep(0.3)
               else: # Composants rapides (ex: entrée/sortie)
                   await asyncio.sleep(0.1)
                   
               duration = profiler.stop_operation_timer(f"execution_{step_id}")
               print(f"Step {step_id} completed in {duration:.4f}s")
               
               # Détection de lenteur potentielle
               if duration and duration > 0.4:
                   print(f"⚠️  Potential bottleneck detected: {step_id} took {duration:.4f}s")
                   
   # Exécution du scénario de simulation
   mock_execution = MockGraphExecution()
   
   print("Starting performance profiling...")
   profiler.start_operation_timer("total_graph_runtime")
   
   await mock_execution.run_steps_with_profiling(profiler) # Lancement de l'exécution profilée
   
   total_runtime = profiler.stop_operation_timer("total_graph_runtime")
   print(f"\nTotal graph runtime completed: {total_runtime:.4f}s")
   
   # Affichage du rapport de performance
   print(profiler.generate_performance_report())
   
   # Fournir des recommandations basées sur les données collectées
   print("\nOptimization Suggestions:")
   for entry in profiler.execution_log:
       if entry["duration_seconds"] > 0.4:
           print(f"  - Consider optimizing '{entry['operation']}' (currently takes {entry['duration_seconds']:.4f}s).")
       elif entry["duration_seconds"] < 0.2:
           print(f"  - '{entry['operation']}' is performing efficiently ({entry['duration_seconds']:.4f}s).")

# Pour exécuter cet exemple :
# asyncio.run(advanced_performance_scenario())

Conclusion

Avantages Techniques

  1. Surveillance Complète : Visibilité de bout en bout, des couches basses à l'interface utilisateur.
  2. Rétroaction en Temps Réel : Mises à jour d'état basées sur des événements, assurant une réactivité immédiate.
  3. Optimisation Intelligente : Ordonnancement dynamique et stratégies de cache pour améliorer l'efficacité.
  4. Résilience : Gestion robuste des erreurs et mécanismes de restauration d'état.
  5. Extensibilité : Architecture modulaire supportant divers backends de traçage et de surveillance.

Caractéristiques Architecturales

  • Conception en Couches : Structure claire pour la surveillance et la gestion des flux.
  • Approche Asynchrone : Utilisation intensive de l'asynchronisme pour la performance et la scalabilité.
  • Système Modulaire : Intégration facile de nouveaux systèmes de traçage et de télémétrie.
  • Gestion d'État Sophistiquée : Instantanés et restauration pour une fiabilité accrue.

Valeur Applicative

Ces techniques fournissent des capacités essentielles pour le développement d'applications IA robustes et performantes :

  • Capacités de production pour la surveillance et le débogage.
  • Optimisation guidée par les données et les métriques.
  • Fiabilité accrue grâce à une gestion proactive des erreurs.
  • Cycle d'amélioration continue favorisé par une boucle de rétroaction.

L'ensemble de ces technologies constitue une base solide pour la création d'applications IA fiables et performantes, représentant une approche moderne dans la conception de frameworks IA.

Étiquettes: Langflow IA Surveillance Traçage Optimisation

Publié le 1 juillet à 05h19