How does it work?
In the following sections we will walk through the important code sections of the different components.
The full code is available at: https://github.com/drogue-iot/quarkus-mqtt-integration-starter/tree/main/src/main but here, we will show some of the most important concepts.
A brief overview of the flow of events is:
Receiving events
Receiving events from Drogue IoT is done using the MQTT integration. This endpoint of Drogue Cloud is an MQTT server to which an application can connect and subscribe to some specific topics, in order to receive events from the system. It is not a full MQTT broker though, and only provides some dedicates topics and operations, required for this use case.
The core logic for receiving events is in the file Receiver.java.
MQTT Events
@Startup (1)
@ApplicationScoped
public class Receiver {
@Incoming(Channels.DROGUE_INBOUND)
@Outgoing(Channels.TELEMETRY)
@Broadcast
public DeviceEvent process(Message<byte[]> rawMessage ) { (2)
// ...
}
}
1 | The receiver is a bean, which is annotated with the @Startup annotation, so that it starts processing events,
even if no user has yet to access the web frontend. This is required for this example, as events are already being generated
and need to be processed by our logic, so that when a user connects to our application, there will be some existing data
to show.
This pattern also works in cases where you don’t even have a web frontend, but an application which only processes events in the background. |
2 | The message you will receive is a raw byte[] message. |
Processing as a cloud event
As Drogue IoT sends out Cloud Events, we need to parse the raw MQTT payload as a cloud event, and then map the payload of the cloud event to the actual payload we expect:
var format = EventFormatProvider
.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE); (1)
var message = format.deserialize(rawMessage.getPayload()); (2)
var mappedMessage = mapData(
message,
PojoCloudEventDataMapper.from(this.objectMapper, Payload.class)
); (3)
var payload = mappedMessage.getValue(); (4)
1 | Get the format provider for the Cloud Events JSON format. As we are using MQTT 3.1.1, we are sure that we are using the structural format, which encodes the full cloud event using JSON. |
2 | Deserialize the message using the format provider. |
3 | Map the payload of the cloud event to the Java class Payload . |
4 | Extract the mapped payload from the payload mapper. |
The actual code is a bit more complex, and properly handles error cases. That is why your local cloned code or the code from the git repository looks a bit more complex. |
Assembling the outbound event
The Receiver
bean actually is an event transformer. It transforms the incoming MQTT messages into system internal
events. The target event is created by the following section:
var device = new DeviceEvent();
var deviceId = message.getExtension("device"); (1)
device.setDeviceId(deviceId.toString());
var timestamp = message.getTime(); (1)
device.setTimestamp(timestamp.toInstant());
device.setTemperature(payload.getTemperature()); (2)
device.setLocation(payload.getGeoloc()); (2)
return device; (3)
1 | Extracts information from the cloud event’s metadata |
2 | Extracts information from the payload |
3 | The resulting, internal, event |
As the process
method is annotated with both the @Inbound
and @Outbound
annotation, it will receive events but
also forward events which are returned by the method. The event which is returned, is sent on internally to the
event channel defined by Channels.TELEMETRY
.
Understanding the channel mappings
We saw before that events flow in from a channel Channels.DROGUE_INBOUND
and leave to a channel
named Channels.TELEMETRY
. There is a bit more to the channels than just their names.
Let’s take a look at the application configuration: application.yaml
mp:
messaging:
#
# Configure the MQTT source (we read from it)
#
incoming:
drogue-inbound: (1)
type: smallrye-mqtt (2)
topic: app/${drogue.integration.application} (3)
host: ${drogue.integration.mqtt.host}
port: ${drogue.integration.mqtt.port}
#
# Configure the MQTT sink (we send commands to)
#
outgoing:
drogue-outbound: (1)
type: smallrye-mqtt (2)
host: ${drogue.integration.mqtt.host}
port: ${drogue.integration.mqtt.port}
1 | The names of the channels, aligns with the constants in Channels . |
2 | Declares the channels as "backed by MQTT". |
3 | The inbound topic. For the outbound channel it is possible to provide this "per message", and so it may be omitted. |
Although this examples configures an outbound channel, it is actually not used in this starter. However, it is left in for completeness. |
As the internal telemetry
channel does not have any configuration, it is an internal channel, not backed by any
transport technology.
Consuming events
The internal component, which consumes these events are actually two:
Current state
The bean CurrentState
simply records the "last known event":
public class CurrentState {
private DeviceEvent lastEvent;
@Incoming(Channels.TELEMETRY) (1)
public void telemetryChange(final DeviceEvent event) {
this.lastEvent = event; (2)
}
public DeviceEvent getLastEvent() {
return this.lastEvent; (3)
}
}
1 | Bind the method to receive events from the internal "telemetry" channel. The one we are feeding from the Receiver bean. |
2 | Simply remember the last known event. |
3 | Return the last known event when we need it. |
This example doesn’t make use of the state stored by this bean. In the next section you will see why. But still, this pattern might come in handy for you in other cases. |
UI event stream
The dashboard is connected to the backend using WebSockets. When the dashboard is loaded, it connects to the backend.
Those connections are handled by the EventsResource
bean:
@ServerEndpoint("/ws")
@ApplicationScoped
public class EventsResource {
private final Map<String, Session> sessions = new ConcurrentHashMap<>(); (1)
private Object lastEvent; (2)
@OnOpen
public void onOpen(Session session) {
if (lastEvent != null) {
session.getAsyncRemote().sendObject(lastEvent); (3)
}
sessions.put(session.getId(), session);
}
@OnClose
public void onClose(Session session) {
sessions.remove(session.getId());
}
@OnError
public void onError(Session session, Throwable throwable) {
sessions.remove(session.getId());
}
@Incoming(Channels.TELEMETRY)
void telemetryEvent(DeviceEvent event) { (4)
Object nextEvent = new JsonObject()
.put("type", "telemetry")
.put("payload", JsonObject.mapFrom(event)).toString();
this.lastEvent = nextEvent; (5)
sessions.values().forEach(s -> { (6)
s.getAsyncRemote().sendObject(nextEvent);
});
}
}
1 | The map used to keep track of all connected clients. As the bean is annotated with @ApplicationScoped there will
only be one instance of it, and we can track all sessions. |
2 | The last known state, preformatted for directly sending out to clients. |
3 | When a new client connects, and we have a previous state, we send it out before anything else. |
4 | The method which will receive the internal events from the Receiver bean. |
5 | Remembers the last event formatted for the client. |
6 | Send out the event to all known clients. |
What’s next?
We walked through the flow of events and learned how events get processes, converted and consumed in the application.
You might want to take a look at the web frontend too. This is just a small HTML page, with some CSS and JavaScript to subscribe to the backend and receive data.
The content is located at: main/src/main/resources/META-INF/resources/index.html.
Maybe you already have some ideas to tweak this. Making changes is easy: take your editor of choice, and go ahead.
In some cases, when modifying application global beans, you need to re-start the application, as hot-reloading doesn’t
work. Press Ctrl+C and re-run mvn quarkus:dev
.