Protocole MQTT : Déploiement via Docker et intégration dans Spring Boot

Principes fondamentaux de MQTT

MQTT est un protocole de communication léger basé sur le modèle publication-abonnement, conçu pour fonctionner sur des réseaux à faible bande passante ou instables. Il exploite la pile TCP/IP pour offrir une messagerie asynchrone, ce qui le rend adapté aux scénarios où les ressources matérielles des appareils ou la connectivité réseau sont limitées. Ce protocole permet une interaction flexible entre émetteurs et récepteurs, sans contrainte de synchronisation temporelle ou spatiale.

Déploiement du broker MQTT avec Docker

Pour déployer un serveur MQTT, on peut utiliser Docker avec une configuration personnalisée. Voici un exemple de fichier docker-compose.yml :

version: "3"
services:
    mqtt-broker:
        image: eclipse-mosquitto
        container_name: mosquitto-server
        privileged: true
        ports:
            - "1883:1883"
            - "9001:9001"
        volumes:
            - ./mosquitto-config:/mosquitto/config
            - ./mosquitto-data:/mosquitto/data
            - ./mosquitto-logs:/mosquitto/log

Créez ensuite un fichier de configuration mosquitto-config/mosquitto.conf avec le contenu suivant :

persistence true
listener 1883
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
password_file /mosquitto/config/passwd.conf

Lancez le conteneur en mode détaché :

docker-compose up -d

Pour configurer un accès authentifié, exécutez ces commandes dans le conteneur :

docker exec -it mosquitto-server sh
touch /mosquitto/config/passwd.conf
chmod 755 /mosquitto/config/passwd.conf
mosquitto_passwd -b /mosquitto/config/passwd.conf utilisateur motdepasse123

Redémarrez ensuite le service pour appliquer les modifications :

docker-compose restart mqtt-broker

Intégration dans une application Spring Boot

Dépendances Maven

Ajoutez les dépendances nécessaires dans votre fichier pom.xml :

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.0</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

Fichier de configuration

Dans application.properties, définissez les paramètres de connexion :

mqtt.broker.url=tcp://localhost:1883
mqtt.client.identifier=client_spring_001
mqtt.user.name=utilisateur
mqtt.user.password=motdepasse123
mqtt.default.topic=sensor/data
mqtt.connection.timeout=30000
mqtt.keep.alive.interval=60

Classe de configuration MQTT

@Configuration
@Slf4j
public class MqttBrokerConfig {

    @Value("${mqtt.broker.url}")
    private String brokerUrl;
    @Value("${mqtt.client.identifier}")
    private String clientIdentifier;
    @Value("${mqtt.user.name}")
    private String username;
    @Value("${mqtt.user.password}")
    private String password;
    @Value("${mqtt.connection.timeout}")
    private int connectionTimeout;
    @Value("${mqtt.keep.alive.interval}")
    private int keepAliveInterval;
    @Value("${mqtt.default.topic}")
    private String defaultTopic;

    @Bean
    public MqttService mqttClientService() {
        MqttService service = new MqttService(brokerUrl, username, password, clientIdentifier, connectionTimeout, keepAliveInterval);
        try {
            service.establishConnection();
            service.registerSubscription(defaultTopic, 1);
        } catch (MqttException e) {
            log.error("Échec de la connexion initiale au broker MQTT", e);
        }
        return service;
    }
}

Service client MQTT

@Slf4j
public class MqttService {

    private MqttAsyncClient mqttClient;
    private final String broker;
    private final String user;
    private final String pass;
    private final String clientId;
    private final int timeout;
    private final int keepAlive;

    public MqttService(String broker, String user, String pass, String clientId, int timeout, int keepAlive) {
        this.broker = broker;
        this.user = user;
        this.pass = pass;
        this.clientId = clientId;
        this.timeout = timeout;
        this.keepAlive = keepAlive;
    }

    private MqttConnectionOptions buildConnectionOptions() {
        MqttConnectionOptions opts = new MqttConnectionOptions();
        opts.setUserName(this.user);
        opts.setPassword(this.pass.toCharArray());
        opts.setConnectionTimeout(this.timeout);
        opts.setKeepAliveInterval(this.keepAlive);
        opts.setAutomaticReconnect(true);
        opts.setCleanStart(true);
        return opts;
    }

    public void establishConnection() throws MqttException {
        if (this.mqttClient == null || !this.mqttClient.isConnected()) {
            this.mqttClient = new MqttAsyncClient(this.broker, this.clientId, new MemoryPersistence());
            this.mqttClient.setCallback(new MessageEventProcessor(this));
            MqttConnectionOptions options = buildConnectionOptions();
            this.mqttClient.connect(options).waitForCompletion();
            log.info("Connexion au broker MQTT réussie");
        }
    }

    public void terminateConnection() throws MqttException {
        if (this.mqttClient != null && this.mqttClient.isConnected()) {
            this.mqttClient.disconnect().waitForCompletion();
            log.info("Déconnexion du broker MQTT");
        }
    }

    public void registerSubscription(String topic, int qos) throws MqttException {
        if (this.mqttClient != null && this.mqttClient.isConnected()) {
            this.mqttClient.subscribe(topic, qos);
            log.info("Abonnement au sujet '{}' effectué", topic);
        }
    }

    public void removeSubscription(String topic) throws MqttException {
        if (this.mqttClient != null && this.mqttClient.isConnected()) {
            this.mqttClient.unsubscribe(topic);
            log.info("Désabonnement du sujet '{}'", topic);
        }
    }

    public void transmitMessage(String content, String topic, int qos, boolean retain) throws MqttException {
        if (this.mqttClient != null && this.mqttClient.isConnected()) {
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            message.setRetained(retain);
            this.mqttClient.publish(topic, message);
            log.debug("Message publié sur le sujet '{}'", topic);
        } else {
            log.warn("Impossible de publier : client non connecté");
        }
    }
}

Processus d'événements de message

@Slf4j
public class MessageEventProcessor implements MqttCallback {

    private final MqttService mqttService;

    public MessageEventProcessor(MqttService service) {
        this.mqttService = service;
    }

    @Override
    public void connectionLost(Throwable cause) {
        log.warn("Connexion perdue : {}", cause.getMessage());
        // La reconnexion automatique est gérée par MqttConnectionOptions
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String payload = new String(message.getPayload());
        log.info("Message reçu sur '{}' : {}", topic, payload);
        processIncomingData(topic, payload);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.debug("Livraison du message terminée : {}", token.isComplete());
    }

    private void processIncomingData(String topic, String data) {
        // Logique de traitement des messages entrants
        log.info("Traitement des données du sujet '{}'", topic);
    }
}

Ce code illustre une intégration typique de MQTT dans Spring Boot, avec une gestion propre de la connexion et des messages. Les classes et méthodes ont été restructurées pour une meilleure séparation des responsabilités.

Étiquettes: MQTT Docker Spring Boot Java iot

Publié le 19 juin à 16h06