MQTT

MQTT is a light-weight publish-subscribe messaging protocol particularly suited for IoT communication. This add-on provides an integration of the MQTT protocol in SeedStack. It uses the Eclipse PAHO implementation to automatically manage brokers, connections and message consumers/publishers. Automatic connection recovery is done after an MQTT connection failure.

Dependencies

To add the MQTT add-on to your project, add the following dependency:

<dependency>
    <groupId>org.seedstack.addons.mqtt</groupId>
    <artifactId>mqtt</artifactId>
</dependency>
Show version
dependencies {
    compile("org.seedstack.addons.mqtt:mqtt:2.0.1")
}

You must also add the Eclipse PAHO implementation:

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.0.2</version>
</dependency>
dependencies {
    compile("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2")
}

Configuration

Configuration is done by declaring one or more MQTT clients:

mqtt:
  # Configured clients with the name of the client as key
  clients:
    client1:
      # The URI of the MQTT broker to connect to
      serverUri: (String)
      # The client identifier to use (a default one will be generated if not specified)
      clientId: (String)
      # Connection options based on the PAHO MqttConnectOptions class
      connection:
        ... 
      # Reconnection mode (defaults to ALWAYS)
      reconnectionMode: (NONE|ALWAYS|CUSTOM)
      # The time to wait in seconds before reconnecting to the broker after a connection failure (defaults to 2)
      reconnectionInterval: (int)
      # Connection pool configuration
      pool:
        # If true, connection pooling is enabled (defaults to false)
        enabled: (boolean)
        # The minimal number of connections in the pool (defaults to 1)
        coreSize: (int)
        # The maximum number of connections in the pool (defaults to 2)
        maxSize: (int)
        # The size of the local buffer queue for messages (defaults to 500)
        queueSize: (int)
        # The keep alive interval in seconds (defaults to 60)
        keepAlive: (int)
        # The policy to apply when the local queue is full
        rejectedExecutionPolicy: (ABORT|DISCARD|CALLER_RUNS|DISCARD_OLDEST)

To dump the mqtt configuration options:

mvn -q -Dargs="mqtt" seedstack:config

Consuming messages

To receive MQTT messages, create a listener class which implements the MqttCallback
interface and is annotated with @MqttListener:

@MqttListener(clients = "client1", topics = "someTopic", qos="1")
public class SomeMqttListener implements MqttCallback {
    @Override
    public void connectionLost(Throwable cause) {
        // handle loss of connection if necessary
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // handle message
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // not used in listeners
    }
}

The @MqttListener annotation takes the following parameters:

  • The clients parameter specifying the clients used to receive messages. Multiple clients can be specified.
  • The topics parameter specifying which topics it will listen to. Multiple topics can be specified.
  • The qos parameter specifying the QOS level for each topic (in the same order than the topics parameter).

Configuration placeholders can be used in this annotation:

@MqttListener(clients = "${myapp.mqtt.clients}", topics = "${myapp.mqtt.topics}", qos="${myapp.mqtt.qos")
public class TestMqttListener implements MqttCallback {
    // ...
}

If you don’t know in advance how many clients, topics or qos you must specify you can use a comma-delimited string in configuration:

myapp:
  mqtt:
    clients: client1, client2
    topics: topic1, topic2, topic3
    qos: 1, 1, 1

Publishing messages

In any class, just inject an MQTT client with the IMqttClient interface and the corresponding name:

public class SomeClass {
    @Inject
    @Named("client1")
    private IMqttClient mqttClient;
    
}

To publish a message, use the publish() method:

public class SomeClass {
    @Inject
    @Named("client1")
    private IMqttClient mqttClient;
    
    public void someMethod() {
        mqttClient.publish(
                "topicName", 
                "message".getBytes(Charset.forName("UTF-8")), 
                1, 
                false);
    }
}

Publication handler

You can define a publication handler for any MQTT client creating a class implementing the MqttCallback interface and annotating it with @MqttPublishHandler:

@MqttPublishHandler(clients = "client1")
public class SomeClass {
    @Override
    public void connectionLost(Throwable cause) {
        // handle loss of connection
    }
    
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // not used in publication handlers
    }
    
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
       // handle delivery completion
    } 
}

On this page


Edit this page