Pour améliorer la rapidité d'exécution d'un modèle de contrôle robotique, il est essentiel d'optimiser son format d'inférence. Ce guide détaille comment convertir le modèle Pi0, un modèle vision-langage-action, depuis PyTorch vers ONNX, puis l'accélérer avec TensorRT pour une inférence à faible latence sur GPU.
Préparation de l'environnement et compréhension du modèle
Avant de commencer, configurez un environnement Linux avec Python 3.10 ou 3.11 pour éviter les problèmes de compatibilité. Installez les dépendances nécessaires dans un environnement virtuel dédié.
# Créer et activer un environnement virtuel
python -m venv optimisation_pi0_env
source optimisation_pi0_env/bin/activate
# Installer PyTorch pour CUDA 11.8
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# Installer les bibliothèques pour Pi0 et ONNX
pip install transformers lerobot onnx onnxruntime-gpu
# Installer TensorRT (télécharger et configurer manuellement selon la version CUDA)
# Exemple : après décompression, installer le package Python
cd /chemin/vers/TensorRT-8.6.1.6/python
pip install tensorrt-*.whl
# Ajouter le chemin des bibliothèques à LD_LIBRARY_PATH
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/chemin/vers/TensorRT-8.6.1.6/lib
Inspectez la structure du modèle Pi0 pour identifier ses entrées et sorties. Utilisez un script pour examiner les configurations et les dimensions des tenseurs.
import torch
from transformers import AutoConfig
from lerobot.models.pi0.modeling_pi0 import Pi0ForCausalLM
chemin_modele = "/chemin/vers/lerobot/pi0"
configuration = AutoConfig.from_pretrained(chemin_modele)
print("Type de modèle :", configuration.model_type)
print("Taille de la couche cachée :", configuration.hidden_size)
# Charger le modèle pour inspecter les entrées
modele = Pi0ForCausalLM.from_pretrained(chemin_modele, torch_dtype=torch.float16)
modele.eval()
# Créer des entrées factices pour tester
images_factice = torch.randn(1, 3, 3, 480, 640) # [lot, caméras, canaux, hauteur, largeur]
etat_robot_factice = torch.randn(1, 6) # [lot, dimension état]
print("Forme des images factices :", images_factice.shape)
print("Forme de l'état factice :", etat_robot_factice.shape)
Conversion du modèle PyTorch vers ONNX
ONNX sert de format intermédiaire standardisé pour les modèles. Créez un script pour exporter le modèle en spécifiant les axes dynamiques pour la taille des lots.
import torch
import onnx
from pathlib import Path
def exporter_modele_vers_onnx():
# Charger le modèle PyTorch
modele = Pi0ForCausalLM.from_pretrained(chemin_modele, torch_dtype=torch.float32)
modele.eval()
# Définir les entrées factices
taille_lot = 1
entree_images = torch.randn(taille_lot, 3, 3, 480, 640)
entree_etat = torch.randn(taille_lot, 6)
# Configurer les noms et axes dynamiques
noms_entrees = ["images_entree", "etat_robot_entree"]
noms_sorties = ["actions_sortie"]
axes_dynamiques = {
'images_entree': {0: 'taille_lot'},
'etat_robot_entree': {0: 'taille_lot'},
'actions_sortie': {0: 'taille_lot'},
}
# Exporter le modèle
chemin_onnx = "modele_pi0.onnx"
torch.onnx.export(
modele,
(entree_images, entree_etat),
chemin_onnx,
input_names=noms_entrees,
output_names=noms_sorties,
dynamic_axes=axes_dynamiques,
opset_version=14,
do_constant_folding=True,
)
print(f"Modèle exporté vers {chemin_onnx}")
# Vérifier le modèle ONNX
modele_onnx = onnx.load(chemin_onnx)
onnx.checker.check_model(modele_onnx)
print("Vérification du modèle ONNX réussie.")
Pour valider, exécutez une inférence avec ONNX Runtime et comparez les résultats avec le modèle original.
import onnxruntime as ort
import numpy as np
# Préparer les données d'entrée
images_np = np.random.randn(1, 3, 3, 480, 640).astype(np.float32)
etat_np = np.random.randn(1, 6).astype(np.float32)
# Créer une session ONNX Runtime
session = ort.InferenceSession("modele_pi0.onnx", providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
resultats = session.run(None, {"images_entree": images_np, "etat_robot_entree": etat_np})
print("Forme de la sortie ONNX :", resultats[0].shape)
Optimisation avec TensorRT pour une inférence rapide
TensorRT optimise le modèle pour un GPU spécifique en utilisant des techniques comme la fusion de couches et la précision FP16. Construisez un moteur TensorRT à partir du modèle ONNX.
import tensorrt as trt
import os
def construire_moteur_tensorrt(fichier_onnx, fichier_moteur, precision_fp16=True, taille_lot_max=8):
journal = trt.Logger(trt.Logger.WARNING)
constructeur = trt.Builder(journal)
reseau = constructeur.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
parseur = trt.OnnxParser(reseau, journal)
with open(fichier_onnx, 'rb') as f:
if not parseur.parse(f.read()):
raise Exception("Échec de l'analyse ONNX")
configuration = constructeur.create_builder_config()
configuration.max_workspace_size = 1 << 30 # 1 Go
if precision_fp16 and constructeur.platform_has_fast_fp16:
configuration.set_flag(trt.BuilderFlag.FP16)
# Configurer les profils pour les formes dynamiques
profil = constructeur.create_optimization_profile()
entree0 = reseau.get_input(0)
forme_min = (1,) + entree0.shape[1:]
forme_opt = (4,) + entree0.shape[1:]
forme_max = (taille_lot_max,) + entree0.shape[1:]
profil.set_shape(entree0.name, forme_min, forme_opt, forme_max)
configuration.add_optimization_profile(profil)
moteur_serialise = constructeur.build_serialized_network(reseau, configuration)
with open(fichier_moteur, "wb") as f:
f.write(moteur_serialise)
print(f"Moteur TensorRT enregistré sous {fichier_moteur}")
# Exemple d'utilisation
construire_moteur_tensorrt("modele_pi0.onnx", "modele_pi0_fp16.engine", precision_fp16=True)
Pour effectuer l'inférence avec TensorRT, créez une classe qui gère l'allocation de mémoire GPU et l'exécution asynchrone.
import tensorrt as trt
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit
class InferieurTensorRT:
def __init__(self, chemin_moteur):
journal = trt.Logger(trt.Logger.WARNING)
with open(chemin_moteur, 'rb') as f:
moteur = trt.Runtime(journal).deserialize_cuda_engine(f.read())
self.contexte = moteur.create_execution_context()
self.flux = cuda.Stream()
# Allouer la mémoire pour les entrées et sorties
self.liaisons = []
self.entrees_gpu = []
self.sorties_gpu = []
for liaison in moteur:
forme = self.contexte.get_binding_shape(moteur.get_binding_index(liaison))
taille = trt.volume(forme) * moteur.get_binding_dtype(liaison).itemsize
memoire_gpu = cuda.mem_alloc(taille)
self.liaisons.append(int(memoire_gpu))
if moteur.binding_is_input(liaison):
self.entrees_gpu.append({'nom': liaison, 'memoire': memoire_gpu, 'forme': forme})
else:
self.sorties_gpu.append({'nom': liaison, 'memoire': memoire_gpu, 'forme': forme})
def inferer(self, images_np, etat_np):
# Mettre à jour les formes dynamiques
taille_lot = images_np.shape[0]
for entree in self.entrees_gpu:
idx = self.contexte.engine.get_binding_index(entree['nom'])
nouvelle_forme = (taille_lot,) + entree['forme'][1:]
self.contexte.set_binding_shape(idx, nouvelle_forme)
# Copier les données vers le GPU
cuda.memcpy_htod_async(self.entrees_gpu[0]['memoire'], images_np.ravel(), self.flux)
cuda.memcpy_htod_async(self.entrees_gpu[1]['memoire'], etat_np.ravel(), self.flux)
# Exécuter l'inférence
self.contexte.execute_async_v2(bindings=self.liaisons, stream_handle=self.flux.handle)
# Récupérer les résultats
sorties_np = []
for sortie in self.sorties_gpu:
forme_sortie = self.contexte.get_binding_shape(self.contexte.engine.get_binding_index(sortie['nom']))
tampon = np.zeros(forme_sortie, dtype=np.float32)
cuda.memcpy_dtoh_async(tampon, sortie['memoire'], self.flux)
sorties_np.append(tampon)
self.flux.synchronize()
return sorties_np
# Tester l'inférence
inferieur = InferieurTensorRT("modele_pi0_fp16.engine")
images_test = np.random.randn(2, 3, 3, 480, 640).astype(np.float32)
etat_test = np.random.randn(2, 6).astype(np.float32)
resultats = inferieur.inferer(images_test, etat_test)
print("Forme de la sortie TensorRT :", resultats[0].shape)
Évaluation des performances et conseils de déploeiment
Pour comparer les performances, exécutez des tests de latence sur les différentes versions du modèle. Utilisez un script de benchmark pour mesurer les temps d'exécution et l'utilisation mémoire.
import time
import numpy as np
def tester_latence(fonction_inference, donnees_entree, nom, repetitions=100):
# Préchauffage
for _ in range(10):
fonction_inference(*donnees_entree)
# Mesurer les latences
latences = []
for _ in range(repetitions):
debut = time.perf_counter()
fonction_inference(*donnees_entree)
fin = time.perf_counter()
latences.append((fin - debut) * 1000)
latence_moyenne = np.mean(latences)
print(f"{nom}: latence moyenne de {latence_moyenne:.2f} ms")
return latence_moyenne
# Exemple d'appel pour chaque méthode d'inférence
# latence_pytorch = tester_latence(modele_pytorch.inferer, (images_test, etat_test), "PyTorch")
# latence_onnx = tester_latence(session_onnx.run, [None, {"images_entree": images_np, "etat_robot_entree": etat_np}], "ONNX Runtime")
# latence_tensorrt = tester_latence(inferieur.inferer, (images_np, etat_np), "TensorRT FP16")
Lors du déploiemant, assurez-vous que l'environnement est cohérent, par exemple avec Docker. Surveillez les latences en production pour garantir la fiabilité. Les gains en vitesse avec TensorRT, notamment en FP16, peuvent réduire considérablement la latence, mais évaluer la précision pour les applications critiques de contrôle robotique.