Concepts fondamentaux du RPC et du pattern Proxy
Le Remote Procedure Call (RPC) permet d'invoquer une méthode distante avec la même simplicité qu'un appel local. Côté client, l'appel à l'interface est intercepté par un proxy dynamique. Ce proxy encapsule les paramètres de la méthode dans un objet de requête, le sérialise en octets, puis l'envoie via le réseau à l'aide de Netty. Le serveur distant reçoit ces octets, les désérialise, localise la méthode correspondante par réflexion, l'exécute, et renvoie le résultat.
Le pattern Proxy : Statique vs Dynamique
Le pattern Proxy se divise en deux catégories : statique et dynamique. Le proxy statique nécessite de créer manuellement une classe wrapper pour chaque interface à proxyfier. Le proxy dynamqiue, en revanche, génère la classe proxy à l'exécution, offrant une flexibilité accrue pour intercepter et étendre les comportements sans dépendre d'une interface spécifique.
JDK Dynamic Proxy
Le proxy dynamique natif de Java s'appuie sur l'API de réflexion. Il nécessite que la classe cible implémente au moins une interface. L'interception est réalisée via l'interface InvocationHandler.
public interface ServiceExpediteur {
boolean envoyer();
}
public class ServiceExpediteurSms implements ServiceExpediteur {
public boolean envoyer() {
System.out.println("Envoi de message");
return true;
}
}
public class GestionnaireProxyJdk implements InvocationHandler {
private Object cible;
public GestionnaireProxyJdk(Object cible) {
this.cible = cible;
}
public Object invoke(Object proxy, Method methode, Object[] args) throws Throwable {
System.out.println("Avant traitement");
Object resultat = methode.invoke(cible, args);
System.out.println("Après traitement");
return resultat;
}
}
// Utilisation
ServiceExpediteur proxy = (ServiceExpediteur) Proxy.newProxyInstance(
ClassLoader.getSystemClassLoader(),
new Class[]{ServiceExpediteur.class},
new GestionnaireProxyJdk(new ServiceExpediteurSms())
);
proxy.envoyer();
CGLib Dynamic Proxy
CGLib contourne la limitation des interfaces en générant une sous-classe de la classe cible à l'aide de la bibliothèque ASM. Il ne peut cependant pas proxyfier des méthodes déclarées final.
public class EmetteurBd {
public boolean envoyer() {
System.out.println("Envoi de message");
return true;
}
}
public class IntercepteurCglib implements MethodInterceptor {
private Enhancer amplificateur = new Enhancer();
public Object getProxy(Class clazz) {
amplificateur.setSuperclass(clazz);
amplificateur.setCallback(this);
return amplificateur.create();
}
@Override
public Object intercept(Object obj, Method methode, Object[] args, MethodProxy proxy) throws Throwable {
System.out.println("Avant traitement");
Object resultat = proxy.invokeSuper(obj, args);
System.out.println("Après traitement");
return resultat;
}
}
Javassist Dynamic Proxy
Javassist permet de modifier ou de créer des classes au niveau du bytecode dynamiquement, sans nécessiter une connaissance approfondie des instructions de la machine virtuelle Java.
public class GestionnaireProxyJavassist implements MethodHandler {
private ProxyFactory fabrique = new ProxyFactory();
public Object getProxy(Class clazz) throws Exception {
fabrique.setSuperclass(clazz);
Class<?> classeProxy = fabrique.createClass();
Object proxy = classeProxy.newInstance();
((ProxyObject) proxy).setHandler(this);
return proxy;
}
@Override
public Object invoke(Object self, Method m, Method proceed, Object[] args) throws Throwable {
System.out.println("Avant traitement");
Object resultat = proceed.invoke(self, args);
System.out.println("Après traitement");
return resultat;
}
}
Implémentation du proxy côté client pour les appels RPC
Dans un système RPC, le client utilise un proxy dynamique pour transformer l'appel local en requête réseau. La classe ProxyServiceRpc crée un proxy qui initialise la connexion Netty et transmet la requête sérialisée.
public class ProxyServiceRpc {
public static Object creerProxy(ConfigReference config) {
return Proxy.newProxyInstance(
ProxyServiceRpc.class.getClassLoader(),
new Class[]{config.getInterfaceService()},
new GestionnaireInvocationRpc(config)
);
}
static class GestionnaireInvocationRpc implements InvocationHandler {
private ConfigReference config;
public GestionnaireInvocationRpc(ConfigReference config) {
this.config = config;
}
public Object invoke(Object proxy, Method methode, Object[] args) throws Throwable {
ClientRpcNetty client = new ClientRpcNetty(config);
client.connecter();
DemandeRpc demande = new DemandeRpc();
demande.setIdDemande(UUID.randomUUID().toString().replace("-", ""));
demande.setNomClasse(config.getInterfaceService().getName());
demande.setNomMethode(methode.getName());
demande.setTypesParametres(methode.getParameterTypes());
demande.setArguments(args);
ReponseRpc reponse = client.appelDistant(demande);
return reponse.getResultat();
}
}
}
Processus d'appel distant via le client Netty
La communication réseau est gérée par le client Netty. Lorsque la méthode appelDistant est invoquée, la requête est écrite dans le canal réseau. L'opération est synchronisée (sync()) pour bloquer le thread jusqu'à l'obtention de la réponse du serveur. Le pipeline Netty inclut l'encodeur, le décodeur, le gestionnaire de timeout et le gestionnaire de réception.
public class ClientRpcNetty {
private ConfigReference config;
private ChannelFuture futurCanal;
private GestionnaireClientRpc gestionnaire;
public ClientRpcNetty(ConfigReference config) {
this.config = config;
this.gestionnaire = new GestionnaireClientRpc(config.getTimeout());
}
public void connecter() {
EventLoopGroup groupe = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(groupe)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new EncodeurRpc(DemandeRpc.class))
.addLast(new DecodeurRpc(ReponseRpc.class))
.addLast(new GestionnaireTimeoutRpcNetty(config.getTimeout()))
.addLast(gestionnaire);
}
});
try {
futurCanal = bootstrap.connect(config.getServiceHost(), config.getServicePort()).sync();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public ReponseRpc appelDistant(DemandeRpc demande) throws Throwable {
StockeurTempsDemandeRpc.mettre(demande.getIdDemande(), System.currentTimeMillis());
futurCanal.channel().writeAndFlush(demande).sync();
ReponseRpc reponse = gestionnaire.getReponseRpc(demande.getIdDemande());
if (reponse.isSucces()) {
return reponse;
}
throw reponse.getErreur();
}
}
Codecs et sérialisation réseau
Pour transmettre les objets DemandeRpc et ReponseRpc, on utilise la sérialisation Hessian. Les classes doivent implémenter Serializable.
public class SerialiseurHessian {
public static byte[] serialiser(Object obj) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
HessianOutput sortie = new HessianOutput(baos);
sortie.writeObject(obj);
return baos.toByteArray();
}
public static Object deserialiser(byte[] data, Class clazz) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
HessianInput entree = new HessianInput(bais);
return entree.readObject(clazz);
}
}
Encodeur et Décodeur Netty
Pour résoudre les problèmes de paquets collés (TCP sticky packet) et de paquets fragmentés (half-packet), on ajoute un en-tête de 4 octets indiquant la longueur du message. L'encodeur hérite de MessageToByteEncoder et le décodeur de ByteToMessageDecoder.
public class EncodeurRpc extends MessageToByteEncoder {
private Class<?> classeCible;
public EncodeurRpc(Class<?> cible) { this.classeCible = cible; }
@Override
protected void encode(ChannelHandlerContext ctx, Object obj, ByteBuf buf) throws Exception {
if (classeCible.isInstance(obj)) {
byte[] data = SerialiseurHessian.serialiser(obj);
buf.writeInt(data.length);
buf.writeBytes(data);
}
}
}
public class DecodeurRpc extends ByteToMessageDecoder {
private static final int TAILLE_ENTETE = 4;
private Class<?> classeCible;
public DecodeurRpc(Class<?> cible) { this.classeCible = cible; }
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
if (buf.readableBytes() < TAILLE_ENTETE) return;
buf.markReaderIndex();
int longueur = buf.readInt();
if (longueur < 0) ctx.close();
if (buf.readableBytes() < longueur) {
buf.resetReaderIndex();
return;
}
byte[] data = new byte[longueur];
buf.readBytes(data);
out.add(SerialiseurHessian.deserialiser(data, classeCible));
}
}
Traitement côté serveur RPC
Le serveur Netty écoute les connexions entrantes, décode les requêtes, utilise la réflexion pour invoquer la méthode cible sur l'implémentation de service enregistrée, et renvoie la réponse sérialisée.
public class ServeurRpcNetty {
private int port;
private List<ConfigService> services = new CopyOnWriteArrayList<>();
public ServeurRpcNetty(int port) { this.port = port; }
public void demarrer() {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new DecodeurRpc(DemandeRpc.class))
.addLast(new EncodeurRpc(ReponseRpc.class))
.addLast(new GestionnaireServeurRpcNetty(services));
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
sb.bind(port).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
// Gestion d'erreur
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
public class GestionnaireServeurRpcNetty extends ChannelInboundHandlerAdapter {
private Map<String, ConfigService> registreService = new ConcurrentHashMap<>();
public GestionnaireServeurRpcNetty(List<ConfigService> configs) {
for (ConfigService cfg : configs) {
registreService.put(cfg.getInterfaceService().getName(), cfg);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DemandeRpc demande = (DemandeRpc) msg;
ReponseRpc reponse = new ReponseRpc();
reponse.setIdDemande(demande.getIdDemande());
try {
ConfigService cfg = registreService.get(demande.getNomClasse());
Object instance = cfg.getImplementation().newInstance();
Method m = instance.getClass().getMethod(demande.getNomMethode(), demande.getTypesParametres());
Object resultat = m.invoke(instance, demande.getArguments());
reponse.setResultat(resultat);
reponse.setSucces(true);
} catch (Exception e) {
reponse.setSucces(false);
reponse.setErreur(e);
}
ctx.writeAndFlush(reponse);
}
}
Gestion des timeouts côté client
Pour éviter un blocage infini en cas de non-réponse du serveur, un mécanisme de timeout est implémenté. L'horodatage de l'envoi est enregistré, et le gestionnaire de lecture vérifie le dépassement du délai lors de la réception. Le client interroge la map des réponses avec une boucle d'attente active contrôlée.
public class GestionnaireTimeoutRpcNetty extends ChannelInboundHandlerAdapter {
private long timeout;
public GestionnaireTimeoutRpcNetty(long timeout) { this.timeout = timeout; }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReponseRpc reponse = (ReponseRpc) msg;
long tempsEnvoi = StockeurTempsDemandeRpc.get(reponse.getIdDemande());
if (System.currentTimeMillis() - tempsEnvoi >= timeout) {
reponse.setTimeout(true);
}
StockeurTempsDemandeRpc.retirer(reponse.getIdDemande());
ctx.fireChannelRead(reponse);
}
}
public class GestionnaireClientRpc extends ChannelInboundHandlerAdapter {
private Map<String, ReponseRpc> reponses = new ConcurrentHashMap<>();
private long timeout;
public GestionnaireClientRpc(long timeout) { this.timeout = timeout; }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReponseRpc reponse = (ReponseRpc) msg;
if (!reponse.isTimeout()) {
reponses.put(reponse.getIdDemande(), reponse);
}
}
public ReponseRpc getReponseRpc(String idDemande) throws ExceptionTimeoutRpc {
long debut = System.currentTimeMillis();
while (reponses.get(idDemande) == null) {
if (System.currentTimeMillis() - debut >= timeout) break;
Thread.sleep(5);
}
ReponseRpc reponse = reponses.remove(idDemande);
if (reponse == null) throw new ExceptionTimeoutRpc("Timeout RPC atteint.");
return reponse;
}
}