How does it work?

In the following sections we will walk through the important code sections of the different components.

The full code is available at: https://github.com/drogue-iot/quarkus-mqtt-integration-starter/tree/main/src/main but here, we will show some of the most important concepts.

A brief overview of the flow of events is:

Architecture

Receiving events

Receiving events from Drogue IoT is done using the MQTT integration. This endpoint of Drogue Cloud is an MQTT server to which an application can connect and subscribe to some specific topics, in order to receive events from the system. It is not a full MQTT broker though, and only provides some dedicates topics and operations, required for this use case.

The core logic for receiving events is in the file Receiver.java.

MQTT Events

@Startup (1)
@ApplicationScoped
public class Receiver {
    @Incoming(Channels.DROGUE_INBOUND)
    @Outgoing(Channels.TELEMETRY)
    @Broadcast
    public DeviceEvent process(Message<byte[]> rawMessage ) { (2)
        // ...
    }
}
1 The receiver is a bean, which is annotated with the @Startup annotation, so that it does start to process events, even if no user has yet access the web frontend. This is required for this example, as events are already being generated and need to be processed by our logic, so that when a user connects to our application, which have some existing data to show.

This pattern also works in cases where you don’t even have a web frontend, but an application which only processes events in the background.

2 The message you will receive is a raw byte[] message.

Processing as a cloud event

As Drogue IoT sends out Cloud Events, we need to parse the raw MQTT payload as a cloud event, and then map the payload of the cloud event to the actual payload we expect:

var format = EventFormatProvider
        .getInstance()
        .resolveFormat(JsonFormat.CONTENT_TYPE); (1)

var message = format.deserialize(rawMessage.getPayload()); (2)

var mappedMessage = mapData(
        message,
        PojoCloudEventDataMapper.from(this.objectMapper, Payload.class)
); (3)

var payload = mappedMessage.getValue(); (4)
1 Get the format provider for the Cloud Events JSON format. As we are using MQTT 3.1.1, we are sure that we are using the structural format, which encodes the full cloud event using JSON.
2 Deserialize the message using the format provider.
3 Map the payload of the cloud event to the Java class Payload.
4 Extract the mapped payload from the payload mapper.
The actual code is a bit more complex, and properly handles error cases. That is why your local cloned code or the code from the git repository looks a bit more complex.

Assembling the outbound event

The Receiver bean actually is an event transformer. It transforms the incoming MQTT messages into system internal events. The target event is created by the following section:

var device = new DeviceEvent();

var deviceId = message.getExtension("device"); (1)
device.setDeviceId(deviceId.toString());

var timestamp = message.getTime(); (1)
device.setTimestamp(timestamp.toInstant());
device.setTemperature(payload.getTemperature()); (2)
device.setLocation(payload.getGeoloc()); (2)

return device; (3)
1 Extracts information from the cloud event’s metadata
2 Extracts information from the payload
3 The resulting, internal, event

As the process method is annotated with both the @Inbound and @Outbound annotation, it will receive events but also forward events which are returned by the method. The event which returned, is sent on internally to the event channel defined by Channels.TELEMETRY.

Understanding the channel mappings

We saw before that events flow in from a channel Channels.DROGUE_INBOUND and leave to a channel named Channels.TELEMETRY. There is a bit more to the channels than just their names.

Let’s take a look at the application configuration: application.yaml

mp:
  messaging:
    #
    # Configure the MQTT source (we read from it)
    #
    incoming:
      drogue-inbound: (1)
        type: smallrye-mqtt (2)
        topic: app/${drogue.integration.application} (3)
        host: ${drogue.integration.mqtt.host}
        port: ${drogue.integration.mqtt.port}
    #
    # Configure the MQTT sink (we send commands to)
    #
    outgoing:
      drogue-outbound: (1)
        type: smallrye-mqtt (2)
        host: ${drogue.integration.mqtt.host}
        port: ${drogue.integration.mqtt.port}
1 The names of the channels, aligns with the constants in Channels.
2 Declares the channels as "backed by MQTT".
3 The inbound topic. For the outbound channel it is possible to provide this "per message", and so it may be omitted.
Although this examples configures an outbound channel, it is actually not used in this starter. However, it is left in for completeness.

As the internal telemetry channel does not have any configuration, it is an internal channel, not backed by any transport technology.

Consuming events

The internal component, which consumes these events are actually two:

Current state

The bean CurrentState simply records the "last known event":

public class CurrentState {

    private DeviceEvent lastEvent;

    @Incoming(Channels.TELEMETRY) (1)
    public void telemetryChange(final DeviceEvent event) {
        this.lastEvent = event; (2)
    }

    public DeviceEvent getLastEvent() {
        return this.lastEvent; (3)
    }
}
1 Bind the method to receive events from the internal "telemetry" channel. The one we are feeding from the Receiver bean.
2 Simply remember the last known event.
3 Return the last known event when we need it.
This example doesn’t make use of the state stored by this bean. In the next section you will see why. But still, this pattern might come in handy for you in other cases.

UI event stream

The dashboard is connected to the backend using WebSockets. When the dashboard is loaded, it connects to the backend. Those connections are handled by the EventsResource bean:

@ServerEndpoint("/ws")
@ApplicationScoped
public class EventsResource {

    private final Map<String, Session> sessions = new ConcurrentHashMap<>(); (1)

    private Object lastEvent; (2)

    @OnOpen
    public void onOpen(Session session) {
        if (lastEvent != null) {
            session.getAsyncRemote().sendObject(lastEvent); (3)
        }
        sessions.put(session.getId(), session);
    }

    @OnClose
    public void onClose(Session session) {
        sessions.remove(session.getId());
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        sessions.remove(session.getId());
    }

    @Incoming(Channels.TELEMETRY)
    void telemetryEvent(DeviceEvent event) { (4)
        Object nextEvent = new JsonObject()
                .put("type", "telemetry")
                .put("payload", JsonObject.mapFrom(event)).toString();
        this.lastEvent = nextEvent; (5)
        sessions.values().forEach(s -> { (6)
            s.getAsyncRemote().sendObject(nextEvent);
        });
    }
}
1 The map used to keep track of all connected clients. As the bean is annotated with @ApplicationScoped there will only be once instance of it, and we can track all sessions.
2 The last known state, preformatted for directly sending out to clients.
3 When a new client connects, and we have a previous state, we send it out before anything else.
4 The method which will receive the internal events from the Receiver bean.
5 Remembers the last event formatted for the client.
6 Send out the event to all known clients.

What’s next?

We walked through the flow of events and learned how events get processes, converted and consumed in the application.

You might want to take a look at the web frontend too. This is just a small HTML page, with some CSS and JavaScript to subscribe to the backend and receive data.

Maybe you already have some ideas to tweak this. Making changes is easy: take your editor of choice, and go ahead. In some cases, when modifying application global beans, you need to re-start the application, as hot-reloading doesn’t work. Press Ctrl+C and re-run mvn quarkus:dev.