Introduction générale
Trino est un moteur de requêtes SQL distribué conçu pour analyser efficacement de vastes volumes de données, pouvant atteindre l'échelle du pétaoctet. À l'origine développé pour interagir avec les systèmes Hadoop et HDFS, il s'est imposé comme une alternative performante aux outils historiques tels que Hive ou Pig.
Au-delà du stockage Hadoop, Trino prend en charge de nombreuses sources de données : systèmes relationnels classiques, bases NoSQL comme Cassandra, et bien d'autres. Son domaine de prédilection correspond aux traitements analytiques de type OLAP, incluant l'agrégation, la génération de rapports et l'exploration de données.
Architecture fondamentale
Vue d'ensemble du cluster
L'infrastructure Trino repose sur deux composants principaux :
- Coordinator — point d'entrée unique qui analyse les requêtes, élabore les plans d'exécution et orchestre le travail.
- Workers — nœuds d'exécution qui traitent concurremment les fragments de données.
Rôle du Coordinator
Le Coordinator assume plusieurs responsabilités cruciales :
- Parsing des requêtes SQL et construction du plan logique d'exécution
- Découpage du plan en étapes (stages) puis en tâches (tasks) distribuées aux Workers
- Supervision de l'état de chaque Worker via des appels REST
- Récupération des résultats partiels et assemblage final pour le client
Rôle des Workers
Chaque Worker effectue les opérations suivantes :
- Exécution des tâches assignées et traitement des données
- Échange de données intermédiaires avec les autres Workers
- Inscription automatique auprès du Coordinator au démarrage
- Communication via une API REST avec les autres composants du cluster
Sources de données et connecteurs
Connecteurs
Un connecteur constitue l'interface entre Trino et une source de données spécifique. Chaque catalogue (catalog) est associé à un connecteur via la propriété connector.name présente dans son fichier de configuration.
Il est possible de configurer plusieurs catalogues utilisant le même type de connecteur. Par exemple, deux clusters Hive distincts peuvent être accessibles simultanément au sein d'une même requête SQL.
Catalogues et schémas
Un catalogue regroupe des schémas (schemas) et représente une source de données connectée. Les fichiers de propriétés stockés dans le répertoire de configuration Trino définissent ces catalogues.
Les schémas organisent les tables en espaces de noms logiques. Leur implémentation varie selon le connecteur :
- Connecteur relationnel : le schéma correspond généralement à un namespace de la base source
- Connecteur Hive : il peut représenter une base de données Hive ou un répertoire du système de fichiers
La référence complète à une table suit la nomenclature catalogue.schema.table.
Modèle d'exécution des requêtes
Processus global
Lorsqu'une requête SQL est soumise, Trino la transforme en un plan d'exécution structuré en arborescence de stages.
Étapes (Stages) et tâches (Tasks)
Un stage représente une phase logique du plan d'exécution. Il contient une ou plusieurs tâches s'exécutant en parallèle sur différents Workers. La structure hiérarchique des stages forme un arbre dont la racine agrège les résultats des sous-stages.
Chaque task constitue l'unité de travail concrète. Elle traite une portion des données et génère un résultat partiel.
Fragments de données (Splits)
Les splits découpent les données en unités logiques basées sur leur localisation, leur taille ou d'autres critères. Chaque task traite un ensemble de splits, permettant le parallélisme.
Opérateurs et pilotes
Un opérateur (operator) implémente une opération spécifique : lecture de table, filtrage, jointure, etc. Il reçoit des données en entrée et produit un flux de sortie transformé.
Le pilote (driver) orchestre une chaîne d'opérateurs. Un task peut contenir plusieurs pilotes fonctionnant en parallèle, chacun consommant et produisant des flux de données.
L'échangeur (exchange) permet le transfert de données entre tâches situées sur des Workers différents ou entre stages distincts.
Optimisation des requêtes
Optimisation basée sur les coûts
Stratégies de réordonnancement des jointures
L'ordre d'exécution des jointures influence considérablement les performances. Exécuter précocement une jointure générant un grand volume de données alourdit les étapes suivantes.
Trino exploite les statistiques fournies par les connecteurs pour estimer le coût de chaque ordre possible et sélectionner automatiquement le plus économique. Ce comportement est contrôlé par l'attribut join_reordering_strategy :
- AUTOMATIC (défaut) : réordonnancement complet basé sur les coûts
- ELIMINATE_CROSS_JOINS : suppression des jointures croisées inutiles
- NONE : respect strict de l'ordre syntaxique
Sélection du type de distribution
L'algorithme de jointure hashé utilisé par Trino définit deux rôles : le côté construction (build side) sur lequel une table de hachage est créée, et le côté sonde (probe side) dont les lignes sont itérées pour chercher les correspondances.
Deux modes de distribution existent :
- Partitionnée — chaque nœud construit une table de hachage sur une portion des données, répartie par hachage de la clé de jointure
- Diffusée (broadcast) — chaque nœud reçoit une copie complète de la table de construction
La diffusion offre de meilleures performances lorsque la table de construction est petite, mais nécessite suffisamment de mémoire sur chaque nœud. La partitionné tolère des tables plus volumineuses mais implique un coût réseau supplémentaire.
L'attribut join_distribution_type régit ce comportement :
- AUTOMATIC (défaut) : décision automatique basée sur les coûts
- BROADCAST : diffusion systématique
- PARTITIONED : partitionnement systématique
La taille maximale d'une table diffusée est limitée par join_max_broadcast_table_size (100 Mo par défaut), ce qui protège le cluster contre les estimations erronées du planificateur.
Ordre syntaxique des jointures
Lorsqu'aucune statistique n'est disponible, Trino applique l'ordre syntaxique. L'implémentation interne charge la table la plus à droite en mémoire comme côté construction.
Pour optimiser manuellement la mémoire, il est recommandé de placer les grandes tables à gauche et les petites à droite :
SELECT c.nom_client, p.designation, d.montant
FROM ventes_grandes c
LEFT JOIN produits_moyens p ON c.ref_produit = p.id
LEFT JOIN promotions_petites pr ON pr.id_client = c.id;
Mécanismes de pushdown
Le pushdown consiste à déléguer des opérations de traitement à la source de données elle-même, réduisant ainsi le volume de données transférées vers Trino.
Pushdown des prédicats
Les conditions de filtrage (WHERE) sont transmises au moteur source pour être évaluées lors de la lecture des données, diminuant le trafic réseau.
Pushdown des projections
Seules les colonnes effectivement utilisées dans la requête sont demandées à la source, évitant le transfert de colonnes inutiles. Le plan EXPLAIN ne montre alors que l'accès aux colonnes pertinentes.
Pushdown de déréférencement
Pour les structures de données complexes (types ROW, structures imbriquées), cette technique permet de ne lire que le champ spécifiquement accédé, sans charger l'intégralité de la structure en mémoire.
Pushdown des agrégations
Les fonctions d'agrégation (SUM, COUNT, AVG, etc.) peuvent être exécutées directement par la source si le connecteur le supporte. Le plan EXPLAIN ne présente alors pas d'opérateur d'agrégation côté Trino.
Pushdown des jointures
Certains connecteurs peuvent déléguer l'intégralité d'une jointure à la source sous-jacente. Les conditions requises incluent :
- Tous les prédicats de la jointure doivent être supportés par le pushdown
- Les tables jointes doivent appartenir au même catalogue
Pushdown des limites et Top-N
Les clauses LIMIT sont transmises à la source pour ne retourner que le nombre demandé de lignes. Lorsque LIMIT est combiné avec ORDER BY, on parle de Top-N pushdown.
Exemple de requête Top-N avec un connecteur PostgreSQL :
SELECT identifiant, raison_sociale
FROM pg_entreprise.public.societes
ORDER BY identifiant
FETCH FIRST 10 ROWS ONLY;
Si le connecteur supporte le pushdown, le plan d'exécution reflète directement la limitation :
Fragment 1 [SOURCE]
Output layout: [identifiant, raison_sociale]
Output partitioning: SINGLE []
TableScan[pg_entreprise:public.societes sortOrder=[identifiant:integer ASC NULLS LAST] limit=10]
identifiant := identifiant:integer
raison_sociale := raison_sociale:varchar
Sans support du connecteur, Trino exécute lui-même le tri partiel :
Fragment 1 [SOURCE]
Output layout: [id_client, denomination]
TopNPartial[10 by (id_client ASC NULLS LAST)]
└─ TableScan[catalogue:clients:sf1.0]
id_client := catalogue:id_client
denomination := catalogue:denomination