Conception et Architecture du Système
L'objectif est de construire une plateforme de messagerie instantanée en temps réel. Le système est décomposé en plusieurs services autonomes : un service utilisateur, un service d'authentification, un service de conversasion, un service de diffusion en temps réel et une passerelle API. Chaque service possède sa propre base de données, assurant un découplage maximal.
La communication inter-services s'effectue via trois canaux principaux. Les appels directs et synchrones utilisent gRPC pour les opérations critiques comme l'authentification. Un système de files d'attente de messages (NATS) gère les événements asynchrones, comme la notification d'un nouveau message. Enfin, les clients finaux se connectent via WebSocket à travers la passerelle pour recevoir les mises à jour en temps réel.
Implémentation des Services Clés
Service Utilisateur
Ce service gère le cycle de vie des comptes. Son interface gRPC définit les opérations CRUD de base.
syntax = "proto3";
package usersvc;
service Users {
rpc RegisterUser(UserRegistration) returns (UserDetails);
rpc GetUserProfile(ProfileRequest) returns (UserDetails);
}
message UserRegistration {
string login = 1;
string email_address = 2;
string secret = 3;
}
message UserDetails {
string id = 1;
string login = 2;
string email_address = 3;
string display_name = 4;
string connection_status = 5;
}
message ProfileRequest {
string identifier = 1;
}
La logique de création inclut la validation, le hachage du mot de passe et la publication d'un événement via le message broker.
func (s *Server) RegisterUser(ctx context.Context, req *pb.UserRegistration) (*pb.UserDetails, error) {
if err := validateInput(req); err != nil {
return nil, err
}
if exists, _ := s.storage.LoginExists(req.Login); exists {
return nil, status.Error(codes.AlreadyExists, "Ce login est déjà pris")
}
hashedPassword, err := hashSecret(req.Secret)
if err != nil {
return nil, status.Error(codes.Internal, "Erreur de traitement")
}
newUser := &entities.User{
ID: generateUUID(),
Login: req.Login,
Email: req.EmailAddress,
PasswordHash: hashedPassword,
Status: "offline",
CreationTime: time.Now(),
}
if err := s.storage.PersistUser(newUser); err != nil {
return nil, status.Error(codes.Internal, "Echec de la sauvegarde")
}
// Publication de l'événement
go s.eventEmitter.Emit("user.account.created", newUser.ID)
return mapUserToProto(newUser), nil
}
Service d'Authentification
Ce service valide les identifiants et émet des jetons JWT. Il maintient les sessions actives dans un magasin clé-valeur (Redis).
func (s *AuthServer) ValidateCredentials(ctx context.Context, req *pb.LoginAttempt) (*pb.SessionInfo, error) {
user, err := s.userSvcClient.LookupByLogin(ctx, &pb.LookupRequest{Login: req.Login})
if err != nil {
return nil, status.Error(codes.NotFound, "Utilisateur inconnu")
}
if !checkPassword(req.Secret, user.PasswordHash) {
return nil, status.Error(codes.Unauthenticated, "Mot de passe incorrect")
}
accessToken, err := s.tokenGenerator.Create(user.ID, 15*time.Minute)
if err != nil {
return nil, err
}
refreshToken, err := s.tokenGenerator.Create(user.ID, 7*24*time.Hour)
if err != nil {
return nil, err
}
session := &store.Session{
Token: refreshToken,
UserID: user.ID,
Expiration: time.Now().Add(7 * 24 * time.Hour),
}
_ = s.sessionCache.StoreSession(session)
return &pb.SessionInfo{
AccessToken: accessToken,
RefreshToken: refreshToken,
UserId: user.ID,
}, nil
}
Communication et Temps Réel
Passerelle API (API Gateway)
Elle sert de point d'entrée unique, expose une API REST/JSON et achemine les requêtes vers les services internes en utilisant gRPC.
// Exemple de handler pour l'envoi de message
func (gw *Gateway) PostMessageHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
convID := vars["id"]
userCtx := r.Context().Value(auth.UserIDKey).(string)
var payload struct {
Body string `json:"body"`
Kind string `json:"kind"`
}
json.NewDecoder(r.Body).Decode(&payload)
msg, err := gw.chatClient.Send(r.Context(), &pb.NewMessage{
ConversationId: convID,
AuthorId: userCtx,
Content: payload.Body,
ContentType: payload.Kind,
})
if err != nil {
// Gestion des erreurs gRPC -> HTTP
handleGrpcError(w, err)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(msg)
}
Service de Diffusion (Push Service)
Ce service maintient des connexions WebSocket ouvertes avec les clients. Il écoute les événements sur NATS et pousse les notifications pertinentes.
type WsConnection struct {
conn *websocket.Conn
userID string
sendChan chan []byte
}
type WsServer struct {
connections sync.Map // userID -> *WsConnection
natsSub *nats.Conn
}
func (ws *WsServer) OnNewMessage(event *nats.Msg) {
var notification struct {
RecipientID string `json:"recipient_id"`
MsgID string `json:"message_id"`
ConvID string `json:"conversation_id"`
}
json.Unmarshal(event.Data, ¬ification)
if client, ok := ws.connections.Load(notification.RecipientID); ok {
conn := client.(*WsConnection)
payload := fmt.Sprintf(`{"type":"new_message","id":"%s","conv":"%s"}`, notification.MsgID, notification.ConvID)
conn.sendChan <- []byte(payload)
}
}
Déploiement et Orchestration
Chaque microservice est conteneurisé avec Docker. Une configuration Docker Compose orchestre l'ensemble pour le développement.
version: '3.8'
services:
postgres_db:
image: postgres:15
environment:
POSTGRES_DB: messenger
POSTGRES_USER: admin
POSTGRES_PASSWORD: secret
redis_cache:
image: redis:7-alpine
nats_mq:
image: nats:2
user-service:
build: ./services/user
environment:
DB_URL: postgres://admin:secret@postgres_db/messenger
NATS_URL: nats://nats_mq:4222
depends_on: [postgres_db, nats_mq]
auth-service:
build: ./services/auth
environment:
USER_SVC: user-service:50051
REDIS_ADDR: redis_cache:6379
JWT_KEY: "a-very-secret-key"
depends_on: [user-service, redis_cache]
api-gateway:
build: ./gateway
ports:
- "8080:8080"
environment:
USER_SVC: user-service:50051
AUTH_SVC: auth-service:50051
CHAT_SVC: chat-service:50051
depends_on: [user-service, auth-service]
Pour la production, un outil comme Kubernetes permet de gérer le scaling, la découverte de services et la résilience. La supervision est assurée par Prometheus pour les métriques, Grafana pour la visualisation et Jaeger pour le traçage distribué.
Optimisation et Aspects Avancés
Pour supporter une charge élevée, plusieurs optimisations sont apppliquées : utilisation de pools de connexions pour les bases de données et gRPC, mise en cache agressive des profils utilisateurs et des métadonnées de conversation dans Redis, indexation judicieuse des tables de messages et pagination stricte des historiques. Le service WebSocket implémente un mécanisme de keep-alive et de reconnexion automatique côté client pour une robustesse accrue.