L'écosystème Elixir est particulièrement bien adapté au développement de solutions pour l'Internet des objets (IoT), notamment grâce à des frameworks comme Nerves. Bien que l'utilisation du protocole HTTP soit courante pour les échanges réseau, le protocole MQTT s'impoce souvent comme un choix plus performant et robuste pour les environnements de production.
Comprendre le protocole MQTT
MQTT (Message Queuing Telemetry Transport) est un protocole de messagerie binaire extrêmement léger, conçu spécifiquement pour les appareils aux ressources limitées et les réseaux peu fiables. Contrairement au modèle classique client-serveur, MQTT repose sur un modèle Publication/Abonnement.
- Léger : Minimise la surcharge des paquets réseau, idéal pour les connexions bas débit.
- Robuste : Conçu pour gérer les pertes de connexion fréquentes.
- Découplé : Les expéditeurs (publishers) et les destinataires (subscribers) ne se connaissent pas directement ; ils communiquent via un broker (courtier).
Configuration du simulateur de capteur
Pour illustrer l'intégration de MQTT, nous allons créer une application Elixir simple simulant un capteur environnemental. Nous utiliserons la bibliothèque emqtt, un client Erlang performant.
Générez d'abord un nouveau projet avec une arborescence de supervision :
mix new --sup station_meteo
cd station_meteo
Ajoutez la dépendance dans votre fichier mix.exs :
defp deps do
[
{:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]}
]
end
L'implémentation du capteur repose sur un GenServer qui gère à la fois l'envoi périodique de données et la réception de commandes de configuration.
defmodule StationMeteo.Sensor do
use GenServer
require Logger
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
config = Application.get_env(:station_meteo, :mqtt_settings)
topic_data = "telemetrie/#{config[:client_id]}/temp"
{:ok, pid} = :emqtt.start_link(config)
state = %{
conn: pid,
delay: 2000,
timer_ref: nil,
data_topic: topic_data,
client_id: config[:client_id]
}
{:ok, state, {:continue, :connect_broker}}
end
def handle_continue(:connect_broker, state) do
case :emqtt.connect(state.conn) do
{:ok, _} ->
# S'abonner aux commandes de configuration
:emqtt.subscribe(state.conn, {"config/#{state.client_id}/set_delay", 1})
{:noreply, schedule_next_run(state)}
{:error, reason} ->
Logger.error("Échec de connexion : #{inspect(reason)}")
{:noreply, state}
end
end
def handle_info(:publish_data, state) do
valeur_temp = 20.0 + :rand.uniform() * 5
payload = :erlang.term_to_binary({System.system_time(:second), valeur_temp})
:emqtt.publish(state.conn, state.data_topic, payload)
{:noreply, schedule_next_run(state)}
end
def handle_info({:publish, %{topic: _t, payload: p}}, state) do
nouveau_delai = String.to_integer(p)
Logger.info("Mise à jour de l'intervalle : #{nouveau_delai}ms")
{:noreply, %{state | delay: nouveau_delai} |> schedule_next_run()}
end
defp schedule_next_run(state) do
if state.timer_ref, do: Process.cancel_timer(state.timer_ref)
ref = Process.send_after(self(), :publish_data, state.delay)
%{state | timer_ref: ref}
end
end
Interface de contrôle avec Phoenix LiveView
Pour visualiser les données en temps réel, nous utilisons Phoenix LiveView. Le tableau de bord agira comme un client MQTT miroir : il s'abonne aux mesures et publie des configurations.
Dans le contrôleur LiveView, nous établissons une connexion directe avec le broker lors du montage du composant :
defmodule DashboardWeb.MeteoLive do
use DashboardWeb, :live_view
def mount(_params, _session, socket) do
if connected?(socket) do
settings = Application.get_env(:dashboard, :mqtt)
{:ok, pid} = :emqtt.start_link(settings)
:emqtt.connect(pid)
:emqtt.subscribe(pid, "telemetrie/+/temp")
{:ok, assign(socket, mqtt_pid: pid, mesures: [], current_delay: 2000)}
else
{:ok, assign(socket, mesures: [], current_delay: 2000)}
end
end
def handle_event("update_config", %{"delay" => val}, socket) do
# Envoi d'une commande avec le flag 'retain' pour persistance
:emqtt.publish(
socket.assigns.mqtt_pid,
"config/station_01/set_delay",
val,
[retain: true]
)
{:noreply, assign(socket, current_delay: val)}
end
def handle_info({:publish, packet}, socket) do
{ts, temp} = :erlang.binary_to_term(packet.payload)
nouvelle_mesure = %{timestamp: ts, valeur: temp}
{:noreply, assign(socket, mesures: [nouvelle_mesure | Enum.take(socket.assigns.mesures, 19)])}
end
end
Le rôle crucial du Broker et de la Persistance
L'utilisation d'un broker comme EMQX permet de gérer des milliers de connexions simultanées. Une fonctionnalité clé démontrée ici est le flag Retain.
Lorsqu'un message est publié avec l'option retain: true, le broker stocke la dernière valeur connue pour ce sujet. Si un capteur se déconnecte puis revient en ligne, il recevra immédiatement la dernière configuration publiée, même si le tableau de bord est alors hors ligne. Cela simplifie considérablement la gestion d'état des terminaux IoT distribués.
Exploitatoin des données
En couplant MQTT et Elixir, on obtient une architecture réactive où :
- La latence est minimale grâce au transport binaire et aux processus légers d'Erlang.
- La tolérance aux pannes est native via les arbres de supervision.
- L'interopérabilité est totale : n'importe quel client MQTT (en Python, C++ ou JS) peut interagir avec vos services Elixir.