Implémentation d'une application de chat basée sur une architecture microservices en Go

Conception et Architecture du Système

L'objectif est de construire une plateforme de messagerie instantanée en temps réel. Le système est décomposé en plusieurs services autonomes : un service utilisateur, un service d'authentification, un service de conversasion, un service de diffusion en temps réel et une passerelle API. Chaque service possède sa propre base de données, assurant un découplage maximal.

La communication inter-services s'effectue via trois canaux principaux. Les appels directs et synchrones utilisent gRPC pour les opérations critiques comme l'authentification. Un système de files d'attente de messages (NATS) gère les événements asynchrones, comme la notification d'un nouveau message. Enfin, les clients finaux se connectent via WebSocket à travers la passerelle pour recevoir les mises à jour en temps réel.

Implémentation des Services Clés

Service Utilisateur

Ce service gère le cycle de vie des comptes. Son interface gRPC définit les opérations CRUD de base.

syntax = "proto3";

package usersvc;

service Users {
  rpc RegisterUser(UserRegistration) returns (UserDetails);
  rpc GetUserProfile(ProfileRequest) returns (UserDetails);
}

message UserRegistration {
  string login = 1;
  string email_address = 2;
  string secret = 3;
}

message UserDetails {
  string id = 1;
  string login = 2;
  string email_address = 3;
  string display_name = 4;
  string connection_status = 5;
}

message ProfileRequest {
  string identifier = 1;
}

La logique de création inclut la validation, le hachage du mot de passe et la publication d'un événement via le message broker.

func (s *Server) RegisterUser(ctx context.Context, req *pb.UserRegistration) (*pb.UserDetails, error) {
    if err := validateInput(req); err != nil {
        return nil, err
    }
    
    if exists, _ := s.storage.LoginExists(req.Login); exists {
        return nil, status.Error(codes.AlreadyExists, "Ce login est déjà pris")
    }

    hashedPassword, err := hashSecret(req.Secret)
    if err != nil {
        return nil, status.Error(codes.Internal, "Erreur de traitement")
    }

    newUser := &entities.User{
        ID:           generateUUID(),
        Login:        req.Login,
        Email:        req.EmailAddress,
        PasswordHash: hashedPassword,
        Status:       "offline",
        CreationTime: time.Now(),
    }

    if err := s.storage.PersistUser(newUser); err != nil {
        return nil, status.Error(codes.Internal, "Echec de la sauvegarde")
    }

    // Publication de l'événement
    go s.eventEmitter.Emit("user.account.created", newUser.ID)

    return mapUserToProto(newUser), nil
}

Service d'Authentification

Ce service valide les identifiants et émet des jetons JWT. Il maintient les sessions actives dans un magasin clé-valeur (Redis).

func (s *AuthServer) ValidateCredentials(ctx context.Context, req *pb.LoginAttempt) (*pb.SessionInfo, error) {
    user, err := s.userSvcClient.LookupByLogin(ctx, &pb.LookupRequest{Login: req.Login})
    if err != nil {
        return nil, status.Error(codes.NotFound, "Utilisateur inconnu")
    }

    if !checkPassword(req.Secret, user.PasswordHash) {
        return nil, status.Error(codes.Unauthenticated, "Mot de passe incorrect")
    }

    accessToken, err := s.tokenGenerator.Create(user.ID, 15*time.Minute)
    if err != nil {
        return nil, err
    }

    refreshToken, err := s.tokenGenerator.Create(user.ID, 7*24*time.Hour)
    if err != nil {
        return nil, err
    }

    session := &store.Session{
        Token:        refreshToken,
        UserID:       user.ID,
        Expiration:   time.Now().Add(7 * 24 * time.Hour),
    }
    _ = s.sessionCache.StoreSession(session)

    return &pb.SessionInfo{
        AccessToken:  accessToken,
        RefreshToken: refreshToken,
        UserId:       user.ID,
    }, nil
}

Communication et Temps Réel

Passerelle API (API Gateway)

Elle sert de point d'entrée unique, expose une API REST/JSON et achemine les requêtes vers les services internes en utilisant gRPC.

// Exemple de handler pour l'envoi de message
func (gw *Gateway) PostMessageHandler(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    convID := vars["id"]
    
    userCtx := r.Context().Value(auth.UserIDKey).(string)

    var payload struct {
        Body string `json:"body"`
        Kind string `json:"kind"`
    }
    json.NewDecoder(r.Body).Decode(&payload)

    msg, err := gw.chatClient.Send(r.Context(), &pb.NewMessage{
        ConversationId: convID,
        AuthorId:       userCtx,
        Content:        payload.Body,
        ContentType:    payload.Kind,
    })
    if err != nil {
        // Gestion des erreurs gRPC -> HTTP
        handleGrpcError(w, err)
        return
    }

    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(msg)
}

Service de Diffusion (Push Service)

Ce service maintient des connexions WebSocket ouvertes avec les clients. Il écoute les événements sur NATS et pousse les notifications pertinentes.

type WsConnection struct {
    conn     *websocket.Conn
    userID   string
    sendChan chan []byte
}

type WsServer struct {
    connections sync.Map // userID -> *WsConnection
    natsSub     *nats.Conn
}

func (ws *WsServer) OnNewMessage(event *nats.Msg) {
    var notification struct {
        RecipientID string `json:"recipient_id"`
        MsgID       string `json:"message_id"`
        ConvID      string `json:"conversation_id"`
    }
    json.Unmarshal(event.Data, &notification)

    if client, ok := ws.connections.Load(notification.RecipientID); ok {
        conn := client.(*WsConnection)
        payload := fmt.Sprintf(`{"type":"new_message","id":"%s","conv":"%s"}`, notification.MsgID, notification.ConvID)
        conn.sendChan <- []byte(payload)
    }
}

Déploiement et Orchestration

Chaque microservice est conteneurisé avec Docker. Une configuration Docker Compose orchestre l'ensemble pour le développement.

version: '3.8'
services:
  postgres_db:
    image: postgres:15
    environment:
      POSTGRES_DB: messenger
      POSTGRES_USER: admin
      POSTGRES_PASSWORD: secret
      
  redis_cache:
    image: redis:7-alpine

  nats_mq:
    image: nats:2

  user-service:
    build: ./services/user
    environment:
      DB_URL: postgres://admin:secret@postgres_db/messenger
      NATS_URL: nats://nats_mq:4222
    depends_on: [postgres_db, nats_mq]

  auth-service:
    build: ./services/auth
    environment:
      USER_SVC: user-service:50051
      REDIS_ADDR: redis_cache:6379
      JWT_KEY: "a-very-secret-key"
    depends_on: [user-service, redis_cache]

  api-gateway:
    build: ./gateway
    ports:
      - "8080:8080"
    environment:
      USER_SVC: user-service:50051
      AUTH_SVC: auth-service:50051
      CHAT_SVC: chat-service:50051
    depends_on: [user-service, auth-service]

Pour la production, un outil comme Kubernetes permet de gérer le scaling, la découverte de services et la résilience. La supervision est assurée par Prometheus pour les métriques, Grafana pour la visualisation et Jaeger pour le traçage distribué.

Optimisation et Aspects Avancés

Pour supporter une charge élevée, plusieurs optimisations sont apppliquées : utilisation de pools de connexions pour les bases de données et gRPC, mise en cache agressive des profils utilisateurs et des métadonnées de conversation dans Redis, indexation judicieuse des tables de messages et pagination stricte des historiques. Le service WebSocket implémente un mécanisme de keep-alive et de reconnexion automatique côté client pour une robustesse accrue.

Étiquettes: Microservices grpc Go WebSocket PostgreSQL

Publié le 10 juin à 00h53