Communication IoT : Implémentation du protocole MQTT avec Elixir

L'écosystème Elixir est particulièrement bien adapté au développement de solutions pour l'Internet des objets (IoT), notamment grâce à des frameworks comme Nerves. Bien que l'utilisation du protocole HTTP soit courante pour les échanges réseau, le protocole MQTT s'impoce souvent comme un choix plus performant et robuste pour les environnements de production.

Comprendre le protocole MQTT

MQTT (Message Queuing Telemetry Transport) est un protocole de messagerie binaire extrêmement léger, conçu spécifiquement pour les appareils aux ressources limitées et les réseaux peu fiables. Contrairement au modèle classique client-serveur, MQTT repose sur un modèle Publication/Abonnement.

  • Léger : Minimise la surcharge des paquets réseau, idéal pour les connexions bas débit.
  • Robuste : Conçu pour gérer les pertes de connexion fréquentes.
  • Découplé : Les expéditeurs (publishers) et les destinataires (subscribers) ne se connaissent pas directement ; ils communiquent via un broker (courtier).

Configuration du simulateur de capteur

Pour illustrer l'intégration de MQTT, nous allons créer une application Elixir simple simulant un capteur environnemental. Nous utiliserons la bibliothèque emqtt, un client Erlang performant.

Générez d'abord un nouveau projet avec une arborescence de supervision :

mix new --sup station_meteo
cd station_meteo

Ajoutez la dépendance dans votre fichier mix.exs :

defp deps do
  [
    {:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]}
  ]
end

L'implémentation du capteur repose sur un GenServer qui gère à la fois l'envoi périodique de données et la réception de commandes de configuration.

defmodule StationMeteo.Sensor do
  use GenServer
  require Logger

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_opts) do
    config = Application.get_env(:station_meteo, :mqtt_settings)
    topic_data = "telemetrie/#{config[:client_id]}/temp"
    
    {:ok, pid} = :emqtt.start_link(config)
    
    state = %{
      conn: pid,
      delay: 2000,
      timer_ref: nil,
      data_topic: topic_data,
      client_id: config[:client_id]
    }

    {:ok, state, {:continue, :connect_broker}}
  end

  def handle_continue(:connect_broker, state) do
    case :emqtt.connect(state.conn) do
      {:ok, _} ->
        # S'abonner aux commandes de configuration
        :emqtt.subscribe(state.conn, {"config/#{state.client_id}/set_delay", 1})
        {:noreply, schedule_next_run(state)}
      {:error, reason} ->
        Logger.error("Échec de connexion : #{inspect(reason)}")
        {:noreply, state}
    end
  end

  def handle_info(:publish_data, state) do
    valeur_temp = 20.0 + :rand.uniform() * 5
    payload = :erlang.term_to_binary({System.system_time(:second), valeur_temp})
    
    :emqtt.publish(state.conn, state.data_topic, payload)
    {:noreply, schedule_next_run(state)}
  end

  def handle_info({:publish, %{topic: _t, payload: p}}, state) do
    nouveau_delai = String.to_integer(p)
    Logger.info("Mise à jour de l'intervalle : #{nouveau_delai}ms")
    {:noreply, %{state | delay: nouveau_delai} |> schedule_next_run()}
  end

  defp schedule_next_run(state) do
    if state.timer_ref, do: Process.cancel_timer(state.timer_ref)
    ref = Process.send_after(self(), :publish_data, state.delay)
    %{state | timer_ref: ref}
  end
end

Interface de contrôle avec Phoenix LiveView

Pour visualiser les données en temps réel, nous utilisons Phoenix LiveView. Le tableau de bord agira comme un client MQTT miroir : il s'abonne aux mesures et publie des configurations.

Dans le contrôleur LiveView, nous établissons une connexion directe avec le broker lors du montage du composant :

defmodule DashboardWeb.MeteoLive do
  use DashboardWeb, :live_view
  
  def mount(_params, _session, socket) do
    if connected?(socket) do
      settings = Application.get_env(:dashboard, :mqtt)
      {:ok, pid} = :emqtt.start_link(settings)
      :emqtt.connect(pid)
      :emqtt.subscribe(pid, "telemetrie/+/temp")
      
      {:ok, assign(socket, mqtt_pid: pid, mesures: [], current_delay: 2000)}
    else
      {:ok, assign(socket, mesures: [], current_delay: 2000)}
    end
  end

  def handle_event("update_config", %{"delay" => val}, socket) do
    # Envoi d'une commande avec le flag 'retain' pour persistance
    :emqtt.publish(
      socket.assigns.mqtt_pid,
      "config/station_01/set_delay",
      val,
      [retain: true]
    )
    {:noreply, assign(socket, current_delay: val)}
  end

  def handle_info({:publish, packet}, socket) do
    {ts, temp} = :erlang.binary_to_term(packet.payload)
    nouvelle_mesure = %{timestamp: ts, valeur: temp}
    
    {:noreply, assign(socket, mesures: [nouvelle_mesure | Enum.take(socket.assigns.mesures, 19)])}
  end
end

Le rôle crucial du Broker et de la Persistance

L'utilisation d'un broker comme EMQX permet de gérer des milliers de connexions simultanées. Une fonctionnalité clé démontrée ici est le flag Retain.

Lorsqu'un message est publié avec l'option retain: true, le broker stocke la dernière valeur connue pour ce sujet. Si un capteur se déconnecte puis revient en ligne, il recevra immédiatement la dernière configuration publiée, même si le tableau de bord est alors hors ligne. Cela simplifie considérablement la gestion d'état des terminaux IoT distribués.

Exploitatoin des données

En couplant MQTT et Elixir, on obtient une architecture réactive où :

  • La latence est minimale grâce au transport binaire et aux processus légers d'Erlang.
  • La tolérance aux pannes est native via les arbres de supervision.
  • L'interopérabilité est totale : n'importe quel client MQTT (en Python, C++ ou JS) peut interagir avec vos services Elixir.

Étiquettes: elixir MQTT iot phoenix-liveview nerves

Publié le 23 juin à 01h20