Introduction et Philosophie de Conception
Synthèse des Techniques d'Évaluation et d'Itération
Les techniques d'évaluation et d'itération de Langflow constituent un système de surveillance et d'optimisation à plusieurs niveaux, conçu pour offrir :
- Surveillance de l'exécution en temps réel : Suivi de l'état d'exécution des flux via un système d'événements.
- Évaluation des performances : Collecte et analyse d'indicateurs tels que le temps d'exécution et l'utilisation des ressources.
- Suivi des erreurs : Mécanismes complets de capture d'exceptions et d'analyse des erreurs.
- Optimisation itérative : Ajustement adaptatif et amélioration continue basés sur les données de surveillance.
- Assurance qualité : Indicateurs d'évaluation multidimensionnels et contrôle qualité.
Philosophie de Conception Clé
Les principes fondamentaux de conception sont intégrés dans le moteur d'exécution du graphe :
# Principes de conception clés intégrés dans le moteur d'exécution du graphe
class GraphExecutor:
def __init__(self):
# Surveillance de l'exécution
self._execution_count = 0
self._update_count = 0
self._start_timestamp = datetime.now(timezone.utc)
self._execution_snapshots: list[dict[str, Any]] = []
self._execution_order: list[str] = []
# Service de traçage
self.tracing_service: TracingService | None = self.get_tracing_service_instance()
# Gestion des exécutions
self.run_manager = RunnableVerticesManager()
self.inactive_vertices_set: set = set()
self.active_vertices_list: list[str] = []
- Observabilité d'abord : Mécanismes de surveillance et de traçage intégrés.
- Optimisation progressive : Prise en charge de l'amélioration incrémentale et itérative.
- Récupération après défaillance : Capacités de gestion des erreurs et de restauration d'état.
- Axé sur les performances : Indicateurs de performance guidant les décisions d'optimisation.
Architecture Principale et Modèle de Surveillance de l'Exécution
Architecture de Surveillance de l'Exécution
L'architecture de surveillance de l'exécution est conçue pour capturer chaque étape du processus du graphe, depuis le démarrage jusqu'à la complétion, en passant par les erreurs potentielles.
Modèle d'État d'Exécution
La gestion de l'état de chaque sommet est cruciale pour suivre le déroulement de l'exécution.
# Gestion des états des sommets
class VertexStates(str, Enum):
ACTIVE = "ACTIVE"
INACTIVE = "INACTIVE"
ERROR = "ERROR"
class Vertex:
def __init__(self):
self.is_built = False
self.execution_result: ResultData | None = None
self.artifacts: dict[str, Any] = {}
self.execution_logs: dict[str, list[Log]] = {}
self.current_state = VertexStates.ACTIVE
async def build(self, event_manager: EventManager = None):
"""Construit le sommet et enregistre les métriques d'exécution."""
start_time = time.time()
try:
# Logique d'implémentation de la construction
result = await self._build_implementation_logic()
# Enregistrement des métriques de succès
self.is_built = True
self.execution_result = result
except Exception as e:
# Enregistrement des métriques d'erreur
self.current_state = VertexStates.ERROR
await self._log_execution_error(e)
raise
finally:
# Enregistrement du temps d'exécution
execution_duration = time.time() - start_time
await self._record_performance_metrics(execution_duration)
Analyse du Moteur d'Exécution des Flux
Gestion Asynchrone de l'Exécution
Le moteur gère l'exécution des flux de manière asynchrone, optimisant l'utilisation des ressources et permettant une parallélisation efficace.
# Surveillance principale de l'exécution du graphe
class Graph:
async def process(self, *, fallback_to_env_vars: bool,
start_component_id: str | None = None,
event_manager: EventManager | None = None) -> Graph:
"""Traite l'exécution du graphe et effectue une surveillance complète."""
# Initialisation du traçage de l'exécution
await self.initialize_run_tracing()
# Traitement par couches
vertex_task_run_count: dict[str, int] = {}
to_process_queue = deque(first_layer_vertices)
layer_index = 0
while to_process_queue:
current_batch = list(to_process_queue)
to_process_queue.clear()
# Création des tâches concurrentes
tasks = []
for vertex_id in current_batch:
vertex = self.get_vertex_by_id(vertex_id)
task = asyncio.create_task(
self.build_vertex_execution(
vertex_id=vertex_id,
event_manager=event_manager,
),
name=f"{vertex.identifier} Run {vertex_task_run_count.get(vertex_id, 0)}"
)
tasks.append(task)
vertex_task_run_count[vertex_id] = vertex_task_run_count.get(vertex_id, 0) + 1
logger.debug(f"Executing layer {layer_index} with {len(tasks)} tasks")
try:
# Exécution des tâches et collecte des résultats
next_runnable_vertices = await self._execute_concurrent_tasks(
tasks, lock=asyncio.Lock(), has_webhook_component=True # Exemple de valeur
)
except Exception:
logger.exception(f"Error executing tasks in layer {layer_index}")
raise
# Préparation de l'exécution de la couche suivante
if next_runnable_vertices:
to_process_queue.extend(next_runnable_vertices)
layer_index += 1
return self
Surveillance de l'Exécution des Tâches
Chaque tâche d'exécution de sommet est surveillée pour détecter les succès et les échecs.
async def _execute_concurrent_tasks(self, tasks: list[asyncio.Task],
lock: asyncio.Lock,
*, has_webhook_component: bool = False) -> list[str]:
"""Exécute les tâches et gère les exceptions."""
results_collection = []
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
processed_vertices: list[Vertex] = []
for i, task_result in enumerate(completed_tasks):
task_identifier = tasks[i].get_name()
vertex_identifier = tasks[i].get_name().split(" ")[0]
if isinstance(task_result, Exception):
logger.error(f"Task {task_identifier} failed with exception: {task_result}")
# Journalisation de l'exception dans le système de surveillance
if has_webhook_component:
await self._log_vertex_build_from_exception_details(vertex_identifier, task_result)
# Annulation des tâches restantes
for remaining_task in tasks[i + 1:]:
remaining_task.cancel()
raise task_result
# Journalisation de l'exécution réussie
if isinstance(task_result, VertexBuildResult):
if self.flow_identifier is not None:
await self.log_vertex_build_result(
flow_id=self.flow_identifier,
vertex_id=result.vertex.id,
is_valid=result.valid,
parameters=result.params,
data=result.result_dict,
artifacts=result.artifacts,
)
processed_vertices.append(task_result.vertex)
return results_collection
Collecte des Métriques de Performance
Modèle des Métriques de Performance
Les résultats de la construction des sommets sont enregistrés pour l'analyse des performances et le débogage.
# Enregistrement du résultat de la construction du sommet
class VertexBuildRecord(SQLModel):
record_timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
vertex_id: str = Field(nullable=False)
result_data: dict | None = Field(default=None)
associated_artifacts: dict | None = Field(default=None)
input_parameters: str | None = Field(default=None)
is_valid: bool = Field(nullable=False)
flow_id: UUID = Field()
# Support multi-tenant
job_uuid: UUID | None = Field(default=None)
tenant_uuid: UUID | None = Field(default=None)
organization_uuid: UUID | None = Field(default=None)
business_domain_uuid: UUID | None = Field(default=None)
@field_serializer("result_data")
def serialize_data_with_limits(self, data) -> dict:
"""Sérialise les données en appliquant des limites de taille."""
return serialize_data_to_primitive(data,
max_length=get_max_text_length_limit(),
max_items=get_max_items_length_limit())
Journalisation des Transactions
Chaque transaction est journalisée pour permettre une analyse détaillée des performances.
async def log_transaction_details(flow_id: str | UUID, source_vertex: Vertex,
transaction_status, target_vertex: Vertex | None = None,
error_details=None) -> None:
"""Journalise les transactions de manière asynchrone pour l'analyse des performances."""
try:
if not get_settings_service().are_transactions_storage_enabled():
return
# Nettoyage des paramètres sensibles
inputs_serialized = _vertex_to_primitive_dict(source_vertex)
# Sérialisation des données de résultat
if source_vertex.execution_result:
try:
result_dict = source_vertex.execution_result.model_dump()
for key, value in result_dict.items():
if isinstance(value, pd.DataFrame):
result_dict[key] = value.to_dict()
outputs_serialized = result_dict
except Exception as e:
logger.warning(f"Error serializing result: {e!s}")
outputs_serialized = None
else:
outputs_serialized = None
# Création de l'enregistrement de transaction
transaction_record = TransactionBase(
vertex_id=source_vertex.id,
inputs=inputs_serialized,
outputs=outputs_serialized,
status=transaction_status,
error=str(error_details) if error_details else None,
flow_id=str(flow_id),
timestamp=datetime.now(timezone.utc)
)
# Stockage asynchrone
await crud_log_transaction(transaction_record)
except Exception as e:
logger.error(f"Error logging transaction: {e}")
Système d'Événements et Suivi d'État
Architecture du Gestionnaire d'Événements
Le gestionnaire d'événements orchestre la communication entre les différents composants du système via une file d'attente d'événements.
class EventManager:
def __init__(self, event_queue: asyncio.Queue, job_id: str | None = None):
self.event_queue = event_queue
self.registered_event_handlers: dict[str, PartialEventCallback] = {}
self.job_id = job_id
def register_event_handler(self, event_name: str, event_type: str,
callback: EventCallback | None = None) -> None:
"""Enregistre un gestionnaire d'événements."""
if not event_name.startswith("on_"):
raise ValueError("Event name must start with 'on_'")
if callback is None:
# Utilise une fonction partielle pour envoyer l'événement
handler_function = partial(self.emit_event, event_type=event_type)
else:
handler_function = partial(callback, manager=self, event_type=event_type)
self.registered_event_handlers[event_name] = handler_function
def emit_event(self, *, event_type: str, data: LoggableType):
"""Envoie un événement au système de surveillance."""
try:
# Création d'un événement standardisé par type
if isinstance(data, dict) and event_type in {"message", "error", "warning", "info", "token"}:
data = create_event_by_type(event_type, **data)
except TypeError as e:
logger.debug(f"Error creating playground event: {e}")
except Exception:
raise
# Sérialisation des données de l'événement
jsonable_data = jsonable_encoder(data)
event_payload = {"event": event_type, "data": jsonable_data}
event_unique_id = f"{event_type}-{uuid.uuid4()}"
serialized_data = json.dumps(event_payload) + "\n\n"
# Mise à jour de l'état du job
from langflow.services.apa.resource.job.job_service import JobService
run_until_complete_safe(JobService.update_job_status_from_event(self.job_id, event_type, data))
# Ajout de l'événement à la file d'attente
self.event_queue.put_nowait((event_unique_id, serialized_data.encode("utf-8"), time.time()))
Gestionnaire d'Événements par Défaut
Un ensemble standard d'événements est préconfiguré pour couvrir les cas d'utilisation courants.
def create_default_event_manager(queue: asyncio.Queue) -> EventManager:
"""Crée un gestionnaire d'événements par défaut."""
manager = EventManager(queue)
# Enregistrement des événements standards
manager.register_event_handler("on_token", "token")
manager.register_event_handler("on_vertices_sorted", "vertices_sorted")
manager.register_event_handler("on_error", "error")
manager.register_event_handler("on_end", "end")
manager.register_event_handler("on_message", "add_message")
manager.register_event_handler("on_remove_message", "remove_message")
manager.register_event_handler("on_end_vertex", "end_vertex")
manager.register_event_handler("on_build_start", "build_start")
manager.register_event_handler("on_build_end", "build_end")
return manager
Système de Journalisation et Support de Débogage
Système de Journalisation Structuré
Un tampon de journalisation de taille limitée permet une récupération efficace des journaux en temps réel.
class SizedLogBuffer:
def __init__(self, max_buffer_size: int = 1000):
"""Tampon de journalisation pour la récupération des journaux en temps réel."""
self.log_entries: deque = deque(maxlen=max_buffer_size)
self._write_lock = Lock()
self._max_retention = max_buffer_size
def write(self, log_message: str) -> None:
"""Écrit un message de journal."""
try:
record = json.loads(log_message)
log_content = record["text"]
timestamp_epoch_ms = int(record["record"]["time"]["timestamp"] * 1000)
with self._write_lock:
# Assure que la taille maximale n'est pas dépassée
while len(self.log_entries) >= self._max_retention:
self.log_entries.popleft()
self.log_entries.append((timestamp_epoch_ms, log_content))
except json.JSONDecodeError:
logger.warning(f"Failed to decode log message: {log_message}")
except KeyError as e:
logger.warning(f"Missing key in log message structure: {e}")
def get_logs_after_timestamp(self, since_timestamp: int, max_lines: int = 5) -> dict[int, str]:
"""Récupère les journaux après un horodatage spécifié."""
relevant_logs = {}
with self._write_lock:
# Recherche à partir de la fin pour plus d'efficacité si les journaux sont récents
for ts, msg in reversed(self.log_entries):
if max_lines <= 0:
break
if ts >= since_timestamp:
relevant_logs[ts] = msg
max_lines -= 1
# Inverser pour maintenir l'ordre chronologique
return dict(sorted(relevant_logs.items()))
Configuration et Gestion des Journaux
La configuration flexible du système de journalisation permet de s'adapter à différents environnements, du développement à la production.
def setup_logging(*, level: str | None = None,
file_path: Path | None = None,
disable_logging: bool | None = False,
environment_type: str | None = None,
log_format: str | None = None,
use_async_file: bool = False) -> None:
"""Configure le système de journalisation."""
# Suppression des gestionnaires par défaut
logger.remove()
logger.patch(patch_log_record)
# Configuration pour l'environnement conteneurisé
if environment_type.lower() in ["container", "container_json"]:
logger.add(sys.stderr, format="{message}", serialize=True)
elif environment_type.lower() == "container_csv":
logger.add(sys.stderr, format="{time:YYYY-MM-DD HH:mm:ss.SSS} {level} {file} {line} {function} {message}")
else:
# Configuration pour l'environnement de développement
if log_format is None or not is_valid_log_format(log_format):
log_format = DEFAULT_CONSOLE_LOG_FORMAT
# Activation de l'affichage formaté dans la console (si configuré)
enable_pretty_logs = os.getenv("LANGFLOW_PRETTY_LOGS", "false").lower() == "true"
if enable_pretty_logs:
logger.configure(
handlers=[{
"sink": RichHandler(rich_tracebacks=True, markup=True),
"format": log_format,
"level": level.upper() if level else "INFO",
}]
)
else:
logger.add(sys.stderr, level=level.upper() if level else "INFO", format=log_format,
backtrace=True, diagnose=True)
# Configuration de la journalisation dans un fichier
if not file_path:
cache_directory = Path(user_cache_dir("langflow"))
file_path = cache_directory / "langflow.log"
try:
# Utilisation d'un Sink de fichier asynchrone si spécifié
log_sink = AsyncFileSink(file_path) if use_async_file else file_path
logger.add(
sink=log_sink,
level=level.upper() if level else "INFO",
format=log_format,
serialize=True, # Journalisation en format JSON
)
except Exception:
logger.exception("Error setting up log file sink")
# Activation du tampon de journalisation en mémoire
if log_buffer.is_enabled():
logger.add(sink=log_buffer.write, format="{time} {level} {message}", serialize=True)
Gestion des Erreurs et Analyse des Exceptions
Gestion des Erreurs de Construction de Composant
Les exceptions survenant lors de la construction des composants sont capturées et journalisées pour une analyse approfondie.
async def _log_vertex_build_from_exception_details(self, vertex_id: str, error_instance: Exception) -> None:
"""Enregistre les détails d'une exception de construction de sommet."""
error_message_details: str
traceback_formatted: str
if isinstance(error_instance, ComponentBuildError):
error_message_details = error_instance.message
traceback_formatted = error_instance.formatted_traceback
else:
from langflow.api.utils import format_exception_message
traceback_formatted = traceback.format_exc()
logger.exception("Error during Component build process")
error_message_details = format_exception_message(error_instance)
# Construction du message d'erreur formaté
formatted_error = {"errorMessage": error_message_details, "stackTrace": traceback_formatted}
# Détermination de l'étiquette de sortie par défaut
output_label = "output" # Valeur par défaut
vertex_object = self.get_vertex_by_id(vertex_id)
if vertex_object and vertex_object.outputs:
output_label = vertex_object.outputs[0]["name"]
vertex_outputs = {output_label: OutputValue(message=formatted_error, type="error")}
# Création de la structure de réponse d'erreur
error_response_data = {
"results": {},
"outputs": vertex_outputs,
"logs": {},
"messages": {},
"artifacts": {},
"timedelta": None,
"duration": None,
"used_frozen_result": False,
}
# Journalisation de l'erreur dans le système de suivi
await self.log_vertex_build_result(
flow_id=self.flow_id or "",
vertex_id=vertex_id or "error_vertex",
is_valid=False,
parameters=error_message_details,
data=error_response_data,
artifacts={},
)
Intercepteur d'Exceptions
Un intercepteur standard permet de rediriger les logs du module logging Python vers le système Loguru.
class ExceptionInterceptorHandler(logging.Handler):
"""Intercepteur pour rediriger les logs standard vers Loguru."""
@override
def emit(self, record: logging.LogRecord) -> None:
# Récupération du niveau Loguru correspondant
try:
loguru_level = logger.level(record.levelname).name
except ValueError:
loguru_level = record.levelno # Utilisation du code numérique si le niveau n'est pas trouvé
# Recherche des informations de l'appelant
frame, depth = logging.currentframe(), 2
while frame and frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
# Journalisation avec Loguru, en incluant les informations d'exception si présentes
logger.opt(depth=depth, exception=record.exc_info).log(loguru_level, record.getMessage())
Optimisation Itérative et Mécanismes Adaptatifs
Instantanés d'État du Graphe
Des instantanés de l'état du graphe sont créés pour permettre le retour arrière et l'analyse des exécutions.
class Graph:
def _capture_state_snapshot(self):
"""Crée un instantané de l'état du graphe pour retour arrière et analyse."""
return {
"_execution_queue": self._execution_queue.copy(),
"_initial_layer": self._initial_layer.copy(),
"vertex_execution_layers": copy.deepcopy(self.vertex_execution_layers),
"vertices_scheduled_for_run": copy.deepcopy(self.vertices_scheduled_for_run),
"run_manager_state": copy.deepcopy(self.run_manager.get_state_as_dict()),
}
def _record_execution_snapshot(self, vertex_id: str | None = None) -> None:
"""Enregistre un instantané de l'exécution."""
self._execution_snapshots.append(self.get_current_state_snapshot())
if vertex_id:
self._execution_order.append(vertex_id)
def get_current_state_snapshot(self):
"""Récupère l'instantané de l'état actuel."""
return copy.deepcopy({
"run_manager_state": self.run_manager.get_state_as_dict(),
"execution_queue": self._execution_queue,
"vertex_execution_layers": self.vertex_execution_layers,
"initial_layer": self.initial_layer,
"inactive_vertices_set": self.inactive_vertices_set,
"active_vertices_list": self.active_vertices_list,
})
Gestion Adaptative des Sommets
Le système peut activer dynamiquement des sommets en fonction du contexte d'exécution, comme les changements d'état.
def activate_state_dependent_vertices(self, state_name: str, caller_vertex_id: str) -> None:
"""Active les sommets associés à un nom d'état donné."""
vertex_ids_to_activate = set()
updated_predecessor_map = {}
newly_activated_vertices = []
for vertex_id in self.state_dependent_vertex_ids:
calling_vertex = self.get_vertex_by_id(caller_vertex_id)
current_vertex = self.get_vertex_by_id(vertex_id)
if vertex_id == caller_vertex_id or current_vertex.display_name == calling_vertex.display_name:
continue
context_key_param = current_vertex.raw_parameters.get("context_key")
if isinstance(context_key_param, str) and state_name in context_key_param and vertex_id != caller_vertex_id:
if isinstance(current_vertex, StateVertex): # Supposant qu'il existe une classe StateVertex
newly_activated_vertices.append(vertex_id)
vertex_ids_to_activate.add(vertex_id)
# Récupération de tous les successeurs directs et indirects
all_successors = self.get_all_successors(current_vertex, flat=True)
# Collecte des prédécesseurs des successeurs
successors_predecessors = set()
for successor in all_successors:
successors_predecessors.update(self.get_all_predecessors(successor))
# Reconstruction de l'ensemble des arêtes
edge_set = set()
for v in [current_vertex, *all_successors, *successors_predecessors]:
edge_set.update(v.edges)
if v.current_state == VertexStates.INACTIVE:
v.set_state("ACTIVE") # Mise à jour de l'état
vertex_ids_to_activate.add(v.id)
# Reconstruction de la carte des prédécesseurs
edges_list = list(edge_set)
predecessor_map, _ = self.build_adjacency_maps(edges_list)
updated_predecessor_map.update(predecessor_map)
# Mise à jour de l'ensemble des sommets à exécuter
vertex_ids_to_activate.update(updated_predecessor_map.keys())
vertex_ids_to_activate.update(v_id for successors_list in updated_predecessor_map.values() for v_id in successors_list)
self.active_vertices_list = newly_activated_vertices # Mise à jour de la liste des actifs
self.vertices_scheduled_for_run.update(vertex_ids_to_activate)
self.run_manager.update_run_state(
run_predecessors=updated_predecessor_map,
vertices_to_run=self.vertices_scheduled_for_run,
)
Indicateurs d'Évaluation et Assurance Qualité
Système de Traçage Distribué
Le service de traçage supporte l'intégration avec divers backends de traçage distribué pour une visibilité complète.
class TracingService(Service):
"""Service de traçage distribué."""
async def initialize_tracers_for_run(self, run_uuid: UUID, run_name: str,
user_identifier: str | None, session_identifier: str | None,
project_name: str | None = None) -> None:
"""Initialise le traçage pour une exécution de graphe."""
if self.is_deactivated:
return
try:
resolved_project_name = project_name or os.getenv("LANGCHAIN_PROJECT", "Langflow")
trace_context = TraceContext(run_uuid, run_name, resolved_project_name, user_identifier, session_identifier)
trace_context_variable.set(trace_context)
# Démarrage des threads de travail pour le traçage
await self._start_background_tracing(trace_context)
# Initialisation des différents traceurs spécifiques
self._setup_langsmith_tracer(trace_context)
self._setup_langwatch_tracer(trace_context)
self._setup_langfuse_tracer(trace_context)
self._setup_arize_phoenix_tracer(trace_context)
self._setup_opik_tracer(trace_context)
except Exception as e:
logger.debug(f"Error initializing tracers: {e}")
@asynccontextmanager
async def trace_component_execution(self, component: Component, trace_span_name: str,
input_data: dict[str, Any],
metadata: dict[str, Any] | None = None):
"""Gère le traçage d'un composant individuel."""
if self.is_deactivated:
yield self
return
span_id = trace_span_name
if component._vertex:
span_id = component._vertex.id # Utilise l'ID du sommet comme span ID si disponible
span_type = component.trace_type
component_trace_context = ComponentTraceContext(
span_id, trace_span_name, span_type, component._vertex, input_data, metadata
)
component_context_variable.set(component_trace_context)
current_trace_context = trace_context_variable.get()
if current_trace_context is None:
msg = "Cannot call trace_component_execution without an active trace context."
raise RuntimeError(msg)
# Ajout des entrées au contexte global du traceur
current_trace_context.all_inputs[trace_span_name].update(input_data or {})
# Démarrage du span du composant
await current_trace_context.traces_processing_queue.put(
(self._start_component_span, (component_trace_context, current_trace_context))
)
try:
yield self # Permet l'exécution du code du composant
except Exception as e:
# Enregistrement de la fin du span avec exception
await current_trace_context.traces_processing_queue.put(
(self._end_component_span, (component_trace_context, current_trace_context, e))
)
raise # Propage l'exception
else:
# Enregistrement de la fin normale du span
await current_trace_context.traces_processing_queue.put(
(self._end_component_span, (component_trace_context, current_trace_context, None))
)
Collecte des Données Télémétriques
Le service de télémétrie collecte des données anonymisées pour l'analyse des performances et l'amélioration du produit.
class TelemetryService(Service):
"""Service de télémétrie pour la collecte des métriques de performance."""
def __init__(self, settings_manager: SettingsService):
super().__init__()
self.settings_manager = settings_manager
self.telemetry_endpoint_url = settings_manager.settings.telemetry_base_url
self.telemetry_event_queue: asyncio.Queue = asyncio.Queue()
self.http_client = httpx.AsyncClient(timeout=10.0)
self.is_operational = False
self._stop_event_flag = False
# Intégration OpenTelemetry
self.open_telemetry_integration = OpenTelemetry(prometheus_enabled=settings_manager.settings.prometheus_enabled)
# Paramètre de confidentialité
self.disable_tracking = (
os.getenv("DO_NOT_TRACK", "False").lower() == "true"
or settings_manager.settings.do_not_track
)
async def send_telemetry_payload(self, payload: BaseModel, endpoint_path: str | None = None) -> None:
"""Envoie une charge utile de télémétrie au serveur."""
if self.disable_tracking:
logger.debug("Telemetry tracking is disabled. Data will not be sent.")
return
target_url = f"{self.telemetry_endpoint_url}"
if endpoint_path:
target_url = f"{target_url}/{endpoint_path}"
try:
payload_dictionary = payload.model_dump(by_alias=True, exclude_none=True, exclude_unset=True)
response = await self.http_client.get(target_url, params=payload_dictionary)
if response.status_code != httpx.codes.OK:
logger.error(f"Failed to send telemetry data to {target_url}: {response.status_code} {response.text}")
else:
logger.debug("Telemetry data sent successfully.")
except httpx.RequestError:
logger.error("Network error occurred while sending telemetry data.")
except Exception:
logger.error("An unexpected error occurred during telemetry data transmission.")
async def report_package_version_info(self) -> None:
"""Rapporte les informations de version du package et de l'environnement."""
python_version_short = ".".join(platform.python_version().split(".")[:2])
package_version_details = get_version_info()
# Détermination de l'architecture du système si non déjà fait
if self.system_architecture is None:
self.system_architecture = (await asyncio.to_thread(platform.architecture))[0]
version_payload = VersionPayload(
package_name=package_version_details["package"].lower(),
package_version=package_version_details["version"],
system_platform=platform.platform(),
python_version=python_version_short,
cache_type_used=self.settings_manager.settings.cache_type,
backend_only_mode=self.settings_manager.settings.backend_only,
architecture=self.system_architecture,
auto_login_enabled=self.settings_manager.auth_settings.AUTO_LOGIN,
is_desktop_app=self._check_langflow_desktop_mode(),
)
# Ajout de l'événement d'envoi à la file d'attente
await self._queue_telemetry_event((self.send_telemetry_payload, version_payload, None))
Tableaux de Bord de Surveillance et Visualisation
Flux de Journaux en Temps Réel
Une interface SSE (Server-Sent Events) permet de diffuser les journaux en temps réel vers le client.
@log_api_router.get("/logs-stream")
async def stream_realtime_logs(request: Request):
"""Point d'accès SSE pour le streaming des journaux en temps réel."""
global log_buffer # Accès au tampon de journalisation global
if not log_buffer.is_enabled():
raise HTTPException(
status_code=HTTPStatus.NOT_IMPLEMENTED,
detail="Log retrieval is currently disabled.",
)
# Utilisation de StreamingResponse pour la diffusion des événements
return StreamingResponse(event_stream_generator(request), media_type="text/event-stream")
async def event_stream_generator(request: Request):
"""Générateur pour le flux d'événements SSE des journaux."""
global log_buffer
last_processed_entry = None
consecutive_no_new_entries = 0
while not await request.is_disconnected(): # Continuer tant que la connexion est active
new_entries_to_send: list[Any] = []
with log_buffer.get_read_write_lock(): # Verrou pour accès concurrentiel au tampon
if last_processed_entry is None:
# Si c'est la première lecture, prendre la dernière entrée comme référence
if log_buffer.log_entries:
last_processed_entry = log_buffer.log_entries[-1]
else:
# Trouver les nouvelles entrées depuis la dernière lecture
found_last = False
for entry in log_buffer.log_entries:
if found_last:
new_entries_to_send.append(entry)
last_processed_entry = entry
continue
if entry is last_processed_entry:
found_last = True
continue
# Gérer le cas où le tampon a été vidé et rempli à nouveau
if not found_last and log_buffer.log_entries:
new_entries_to_send.extend(log_buffer.log_entries)
last_processed_entry = log_buffer.log_entries[-1]
if new_entries_to_send:
for timestamp_ms, log_message_content in new_entries_to_send:
# Formater chaque entrée comme un événement JSON
yield f"data: {json.dumps({timestamp_ms: log_message_content})}\n\n"
else:
consecutive_no_new_entries += 1
# Envoyer un signal de "keepalive" pour maintenir la connexion active
if consecutive_no_new_entries >= KEEPALIVE_THRESHOLD:
consecutive_no_new_entries = 0
yield "event: keepalive\ndata: {}\n\n"
await asyncio.sleep(1) # Attendre avant la prochaine vérification
API de Requête des Journaux
Une API RESTful permet de récupérer l'historique des journaux selon divers critères.
@log_api_router.get("/logs")
async def query_historical_logs(
lines_before_ts: Annotated[int, Query(description="Nombre de lignes avant l'horodatage")] = 0,
lines_after_ts: Annotated[int, Query(description="Nombre de lignes après l'horodatage")] = 0,
timestamp_query: Annotated[int, Query(description="Horodatage de référence pour la requête")] = 0,
):
"""Récupère les données historiques des journaux."""
global log_buffer
if not log_buffer.is_enabled():
raise HTTPException(
status_code=HTTPStatus.NOT_IMPLEMENTED,
detail="Log retrieval is disabled.",
)
# Validation des paramètres de requête
if lines_after_ts > 0 and lines_before_ts > 0:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Cannot request logs both before and after the specified timestamp.",
)
retrieved_content: dict[int, str]
if timestamp_query <= 0:
# Si aucun horodatage n'est fourni, récupérer les N dernières lignes ou un défaut
if lines_after_ts > 0: # Doit spécifier un timestamp pour 'after'
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Timestamp is required when requesting logs after a specific point.",
)
retrieved_content = log_buffer.get_last_n_logs(10) if lines_before_ts <= 0 else log_buffer.get_last_n_logs(lines_before_ts)
elif lines_before_ts > 0:
# Récupérer les lignes avant l'horodatage spécifié
retrieved_content = log_buffer.get_logs_before_timestamp(timestamp=timestamp_query, max_lines=lines_before_ts)
elif lines_after_ts > 0:
# Récupérer les lignes après l'horodatage spécifié
retrieved_content = log_buffer.get_logs_after_timestamp(since_timestamp=timestamp_query, max_lines=lines_after_ts)
else:
# Cas par défaut : récupérer les 10 dernières lignes avant l'horodatage si aucun autre critère n'est spécifié
retrieved_content = log_buffer.get_logs_before_timestamp(timestamp=timestamp_query, max_lines=10)
return JSONResponse(content=retrieved_content)
Outils de Débogage et Support au Développement
Support à l'Exécution Pas à Pas
Le moteur d'exécution permet une exécution pas à pas des sommets du graphe pour faciliter le débogage.
class Graph:
async def step_forward_async(self, inputs: InputValueRequest | None = None,
files: list[str] | None = None,
user_id: str | None = None,
event_manager: EventManager | None = None):
"""Exécute de manière asynchrone le prochain sommet du graphe."""
if not self._is_prepared:
raise ValueError("Graph must be prepared before stepping. Call prepare() first.")
if not self._execution_queue: # Si la file d'exécution est vide
self._terminate_all_traces_async() # Terminer les traces
return Finish() # Retourner un indicateur de fin
# Obtenir l'identifiant du prochain sommet à exécuter
next_vertex_id = self.get_next_vertex_from_queue()
chat_service = get_chat_service() # Accès au service de chat pour la mise en cache
# Exécution du sommet
vertex_build_result = await self.build_vertex_execution(
vertex_id=next_vertex_id,
user_id=user_id,
input_parameters_dict=inputs.model_dump() if inputs else {},
files_provided=files,
cache_retriever=chat_service.get_cache,
cache_setter=chat_service.set_cache,
event_manager=event_manager,
)
# Déterminer les prochains sommets exécutables après celui-ci
next_runnable_vertices = await self.get_next_runnable_vertices_after(
lock=self._mutex_lock, current_vertex=vertex_build_result.vertex, use_cache=False
)
# Gérer le cas où un sommet spécifique doit arrêter l'exécution
if self.stop_at_vertex and self.stop_at_vertex in next_runnable_vertices:
next_runnable_vertices = [self.stop_at_vertex]
self.add_vertices_to_execution_queue(next_runnable_vertices) # Ajouter à la file d'attente
self.reset_inactivated_vertex_status() # Réinitialiser les états inactifs
self.reset_activated_vertex_status() # Réinitialiser les états actifs
# Mise à jour du cache du graphe
await chat_service.set_cache(str(self.flow_id or self._run_id), self)
self._record_execution_snapshot(next_vertex_id) # Enregistrer l'instantané
return vertex_build_result # Retourner le résultat de la construction du sommet
def step_forward_sync(self, inputs: InputValueRequest | None = None,
files: list[str] | None = None,
user_id: str | None = None):
"""Wrapper synchrone pour l'exécution pas à pas."""
return run_until_complete(self.step_forward_async(inputs, files, user_id))
Collecte d'Informations de Débogage
Des fonctions utilitaires aident à déterminer le type des artefacts et à formater les données pour l'affichage en débogage.
def determine_artifact_type(value, build_result) -> str:
"""Détermine le type d'un artefact pour l'affichage en débogage."""
artifact_category = ArtifactCategory.UNKNOWN
match value:
case DataRecord(): # Supposant une classe DataRecord
artifact_category = ArtifactCategory.RECORD
case str():
artifact_category = ArtifactCategory.TEXT
case dict():
artifact_category = ArtifactCategory.OBJECT
case list():
artifact_category = ArtifactCategory.ARRAY
case Message():
artifact_category = ArtifactCategory.MESSAGE
# Cas spécial pour les générateurs ou les flux de messages
if artifact_category == ArtifactCategory.UNKNOWN and (
isinstance(build_result, Generator) or
(isinstance(value, Message) and isinstance(value.text, Generator))
):
artifact_category = ArtifactCategory.STREAM
return artifact_category.value # Retourne la valeur enum (string)
def post_process_for_debugging_display(raw_data, artifact_type: str):
"""Traite les données brutes pour l'affichage en mode débogage."""
if artifact_type == ArtifactCategory.STREAM.value:
# Pour les flux, on peut afficher une chaîne vide ou un indicateur
return "[Streaming Data]"
return raw_data
Amélioration Continue et Boucle de Rétroaction
Mécanismes d'Optimisation des Performances
Le système peut réorganiser l'ordre d'exécution des sommets en fonction de leurs temps de construction moyens pour optimiser le flux global.
class Graph:
def sort_vertices_by_average_build_time(self, vertex_execution_layers: list[list[str]]) -> list[list[str]]:
"""Trie les sommets en fonction de leur temps de construction moyen."""
def sort_layer_by_avg_build_time(vertex_ids_in_layer: list[str]) -> list[str]:
if len(vertex_ids_in_layer) <= 1:
return vertex_ids_in_layer
# Tri basé sur l'attribut avg_build_time du sommet
vertex_ids_in_layer.sort(key=lambda vid: self.get_vertex_by_id(vid).avg_build_time)
return vertex_ids_in_layer
return [sort_layer_by_avg_build_time(layer) for layer in vertex_execution_layers]
@staticmethod
def prioritize_interface_components(vertex_execution_layers: list[list[str]]) -> list[list[str]]:
"""Place les composants d'interface en tête de chaque couche d'exécution."""
def is_interface_component(vertex_id):
# Vérifie si le type de composant fait partie des types d'interface
component_type_prefix = vertex_id.split("-")[0] # Extrait le type du préfixe
return component_type_prefix in {"Input", "Output", "ChatInput"} # Exemples de types d'interface
# Utilise une clé de tri pour mettre les composants d'interface en premier
return [
sorted(
layer_vertices,
key=lambda vid: not is_interface_component(vid), # False (0) pour interface, True (1) sinon
)
for layer_vertices in vertex_execution_layers
]
Stratégies de Mise en Cache Adaptatives
Le système désactive intelligemment la mise en cache pour les sommets impliqués dans des cycles ou pour des composants spécifiques comme Listen/Notify.
def _configure_caching_for_cyclic_vertices(self) -> None:
"""Configure la stratégie de mise en cache pour les sommets dans des cycles."""
all_edges = self._get_all_edges_as_list_of_tuples()
vertices_in_cycles = set(find_cycle_vertices(all_edges)) # Supposant une fonction find_cycle_vertices
for vertex in self.vertices:
if vertex.id in vertices_in_cycles:
# Désactiver la mise en cache pour les sommets dans des cycles pour assurer l'exécution correcte
vertex.apply_to_all_outputs(lambda output_obj: setattr(output_obj, "cache_enabled", False))
def _disable_caching_for_listen_notify_components(self) -> None:
"""Désactive la mise en cache si des composants Listen/Notify sont présents."""
has_special_component = any(
vertex.type_prefix in {"Listen", "Notify"} for vertex in self.vertices
)
if has_special_component:
# Si un composant Listen ou Notify est détecté, désactiver la mise en cache pour tous les sommets
for vertex in self.vertices:
vertex.apply_to_all_outputs(lambda output_obj: setattr(output_obj, "cache_enabled", False))
Exemples d'Application
Exemple de Surveillance de Base
Démontre l'utilisation du système d'événements pour surveiller l'exécution d'un flux simple.
# Exemple 1: Surveillance de base d'un flux
import asyncio
from langflow.graph.graph.base import Graph # Classe Graph hypothétique
from langflow.events.event_manager import create_default_event_manager # Fonction de création
async def basic_monitoring_scenario():
"""Scénario d'exemple pour la surveillance de base."""
# Initialisation de la file d'événements et du gestionnaire
event_queue_instance = asyncio.Queue()
event_manager_instance = create_default_event_manager(event_queue_instance)
# Création d'une instance de graphe
graph_instance = Graph(flow_id="monitoring_example_01", flow_name="Basic Monitoring Flow")
# Enregistrement d'un gestionnaire d'événements personnalisé pour les erreurs
def custom_error_handler(*, manager, event_type, data):
print(f"Custom Error Handler Triggered: {event_type} - {data}")
# Logique de traitement d'erreur personnalisée ici
event_manager_instance.register_event_handler("on_custom_error", "error_occurred", custom_error_handler)
# Préparation du graphe pour l'exécution
graph_instance.prepare_execution()
# Exécution du graphe avec surveillance
try:
async for step_result in graph_instance.async_execute_flow(
initial_inputs=[{"query": "What is Langflow?"}], # Entrées initiales
event_manager=event_manager_instance
):
print(f"Step Result: {step_result}")
# Traitement des événements en attente
while not event_queue_instance.empty():
event_id, event_data, timestamp = event_queue_instance.get_nowait()
print(f"Event Received: {event_id} at {timestamp}")
except Exception as e:
# Génération d'un événement d'erreur personnalisé
event_manager_instance.emit_event(event_type="error_occurred", data={"error": str(e), "context": "graph_execution_step"})
print("Basic monitoring scenario finished.")
# Pour exécuter cet exemple :
# asyncio.run(basic_monitoring_scenario())
Exemple d'Analyse Avancée des Performances
Illustre comment collecter et analyser les métriques de performance pour identifier les goulots d'étranglement.
# Exemple 2: Analyse de performance avancée et optimisation
import time
from datetime import datetime, timezone
from langflow.services.tracing.service import TracingService # Service de traçage hypothétique
from langflow.services.telemetry.service import TelemetryService # Service de télémétrie hypothétique
class PerformanceProfiler:
"""Outil pour profiler et analyser les performances d'exécution."""
def __init__(self):
self.performance_metrics: dict[str, float] = {} # Stocke les durées par opération
self.active_timers: dict[str, float] = {} # Stocke les temps de début des opérations actives
self.execution_log: list[dict[str, Any]] = [] # Historique détaillé des exécutions
def start_operation_timer(self, operation_key: str):
"""Démarre le chronomètre pour une opération."""
self.active_timers[operation_key] = time.time()
def stop_operation_timer(self, operation_key: str) -> float | None:
"""Arrête le chronomètre, enregistre la durée et retourne la durée."""
if operation_key in self.active_timers:
end_time = time.time()
duration = end_time - self.active_timers[operation_key]
self.performance_metrics[operation_key] = duration
self.execution_log.append({
"operation": operation_key,
"duration_seconds": duration,
"timestamp_utc": datetime.now(timezone.utc)
})
del self.active_timers[operation_key] # Nettoyer les timers actifs
return duration
return None
def generate_performance_report(self) -> str:
"""Génère un rapport de performance basé sur les données collectées."""
if not self.execution_log:
return "No performance data has been collected yet."
total_operations_count = len(self.execution_log)
total_execution_time = sum(item["duration_seconds"] for item in self.execution_log)
average_time_per_operation = total_execution_time / total_operations_count if total_operations_count > 0 else 0
# Identification des opérations les plus lentes et les plus rapides
slowest_operation = max(self.execution_log, key=lambda x: x["duration_seconds"])
fastest_operation = min(self.execution_log, key=lambda x: x["duration_seconds"])
report_string = f"""
Performance Analysis Report
==========================
Total Operations Executed: {total_operations_count}
Total Execution Time: {total_execution_time:.4f}s
Average Time per Operation: {average_time_per_operation:.4f}s
Slowest Operation Recorded: {slowest_operation['operation']} ({slowest_operation['duration_seconds']:.4f}s)
Fastest Operation Recorded: {fastest_operation['operation']} ({fastest_operation['duration_seconds']:.4f}s)
Recent Operation Logs (Last 10):
"""
# Affichage des 10 dernières opérations enregistrées
for log_entry in self.execution_log[-10:]:
report_string += f" - {log_entry['operation']}: {log_entry['duration_seconds']:.4f}s at {log_entry['timestamp_utc']}\n"
return report_string
async def advanced_performance_scenario():
"""Scénario d'exemple pour l'analyse de performance avancée."""
profiler = PerformanceProfiler()
# Simulation d'un graphe avec des étapes d'exécution
class MockGraphExecution:
def __init__(self):
self.execution_steps = ["vertex_A", "vertex_B", "vertex_C", "vertex_D"] # Étapes simulées
async def run_steps_with_profiling(self, profiler: PerformanceProfiler):
"""Exécute les étapes en utilisant le profiler."""
for step_id in self.execution_steps:
profiler.start_operation_timer(f"execution_{step_id}")
# Simulation de temps d'exécution variables
if step_id == "vertex_C": # Simule un composant plus lent (ex: appel LLM)
await asyncio.sleep(0.6)
elif step_id == "vertex_B": # Composant intermédiaire
await asyncio.sleep(0.3)
else: # Composants rapides (ex: entrée/sortie)
await asyncio.sleep(0.1)
duration = profiler.stop_operation_timer(f"execution_{step_id}")
print(f"Step {step_id} completed in {duration:.4f}s")
# Détection de lenteur potentielle
if duration and duration > 0.4:
print(f"⚠️ Potential bottleneck detected: {step_id} took {duration:.4f}s")
# Exécution du scénario de simulation
mock_execution = MockGraphExecution()
print("Starting performance profiling...")
profiler.start_operation_timer("total_graph_runtime")
await mock_execution.run_steps_with_profiling(profiler) # Lancement de l'exécution profilée
total_runtime = profiler.stop_operation_timer("total_graph_runtime")
print(f"\nTotal graph runtime completed: {total_runtime:.4f}s")
# Affichage du rapport de performance
print(profiler.generate_performance_report())
# Fournir des recommandations basées sur les données collectées
print("\nOptimization Suggestions:")
for entry in profiler.execution_log:
if entry["duration_seconds"] > 0.4:
print(f" - Consider optimizing '{entry['operation']}' (currently takes {entry['duration_seconds']:.4f}s).")
elif entry["duration_seconds"] < 0.2:
print(f" - '{entry['operation']}' is performing efficiently ({entry['duration_seconds']:.4f}s).")
# Pour exécuter cet exemple :
# asyncio.run(advanced_performance_scenario())
Conclusion
Avantages Techniques
- Surveillance Complète : Visibilité de bout en bout, des couches basses à l'interface utilisateur.
- Rétroaction en Temps Réel : Mises à jour d'état basées sur des événements, assurant une réactivité immédiate.
- Optimisation Intelligente : Ordonnancement dynamique et stratégies de cache pour améliorer l'efficacité.
- Résilience : Gestion robuste des erreurs et mécanismes de restauration d'état.
- Extensibilité : Architecture modulaire supportant divers backends de traçage et de surveillance.
Caractéristiques Architecturales
- Conception en Couches : Structure claire pour la surveillance et la gestion des flux.
- Approche Asynchrone : Utilisation intensive de l'asynchronisme pour la performance et la scalabilité.
- Système Modulaire : Intégration facile de nouveaux systèmes de traçage et de télémétrie.
- Gestion d'État Sophistiquée : Instantanés et restauration pour une fiabilité accrue.
Valeur Applicative
Ces techniques fournissent des capacités essentielles pour le développement d'applications IA robustes et performantes :
- Capacités de production pour la surveillance et le débogage.
- Optimisation guidée par les données et les métriques.
- Fiabilité accrue grâce à une gestion proactive des erreurs.
- Cycle d'amélioration continue favorisé par une boucle de rétroaction.
L'ensemble de ces technologies constitue une base solide pour la création d'applications IA fiables et performantes, représentant une approche moderne dans la conception de frameworks IA.