Event pre-processing
One core functionality of Drogue Cloud is to take device events and send deliver them to a Kafka topic, so that interested application can consume them. In that process, Drogue Cloud takes care of device connectivity, authentication & authorization, and some very basic validation steps. However, most of that processing is payload agnostic. By default, payload and metadata just forwarded as is.
In some cases it would be helpful though, if metadata or payload could be altered of verified with additional logic during the reception of the event.
Scope
What might come in handy in situation like this, is a rule based engine, which allows inspecting metadata and payload and decide how to process the event further. Rejecting, diverting, or altering it in the process.
However, such a system can get out of hand pretty quickly, and we do not want to re-invent the wheel. A lot of systems like that already exists: Apache Camel for example, to mention just one of them.
Also, allowing a user to configure a complex piece of logic for a cloud side, "as a service" platform can be rather tricky. Just imagine someone would validate "what the next matching bitcoin hash" is. It would be difficult to measure and project the costs caused by user provided validations.
So Drogue Cloud tries to strike a balance. Support very simple and stateless operations, while allowing to integrate for more complex operations, hosted by the user.
Just two quick examples: Drogue Cloud does offer a way to overwrite the "content type" attribute for an event if its channels matches a specific value. But for letting an AI/ML model categorize values of an even, we allow passing the event into an external endpoint, and consume it back as the response, before passing it along the chain. This way you can spin up your own (optionally Knative serving based) endpoint, and do complex calculations there, but do all of this as part of the Drogue Cloud device facing endpoints.
Configuration
The functionality is configured as part of the "application" resource in the .spec.publish
path.
The configuration consists of two main parts:
-
rules which select when operations get triggered
-
what operations should be performed when a rule matches
Overall, even pre-processing will iterate over all rules, as defined in the order of the configuration and check if a rule matches. If it does, its operations will be executed. The outcome of an operation can one of: continue, accept, reject, drop.
When the outcome is to continue, the event might have been modified, and the processing will continue with the next rule to check.
When the outcome is to accept, the event might have been modified, the processing will stop, and the event will directly be delivered to the final destination.
When the outcome is to reject, the event will be rejected as invalid, responding this condition to the device that sent the event, in case the device waits for a response.
When the outcome is to drop, the event will be silently dropped. Towards the device, the operation is still successful.
spec:
publish:
rules:
- when: {} (1)
then: [] (2)
1 | One object which defines if the rule should be activated |
2 | An array of operations which should be executed when the rule matches |
Rules
Checking for a channel
You can use the isChannel
matcher. For example:
when:
isChannel: my-channel (1)
1 | The name of the channel which the event must match |
Inverting
You can invert the outcome of a check using the not
operation:
when:
not:
isChannel: my-channel
This checks if "the channel is not `my-channel`".
Operations
Operations are part of the then
field, which is an array. All operations will be executed in order. The outcome of
an operation may influence the processing of the next. If the array is empty, then simply no operations will be executed.
Drop / Accept / Reject
You can silently drop the event using drop
:
then: drop
Or accept the event directly, and stop processing the following operations and rules:
then:
- break
Rejecting the event will also stop processing, but return the provided reason back to the sender:
then:
- reject: This is not right
Setting/Removing attributes/extensions
It also is possible to set or remove cloud event attributes or extensions.
Cloud event attributes are similar to extensions, they are just extensions that are part of the official specification and thus are treated differently.
You can set or remove arbitrary extensions using:
then:
- setExtension: (1)
name: my-ext
value: my-value
- removeExtension: my-other-ext (2)
1 | Set extension my-ext to value my-value |
2 | Remove extension my-other-ext |
Setting an extension which already exists will overwrite the value. Removing an extension that does not exist is a no-op.
As attributes are somewhat special it is only possible to set some of them. The following attributes are supported
and can be set similar to the setExtension
operation:
-
datacontenttype
- The content type of the event, e.g.application/json
-
dataschema
- The schema of the data, e.g. some JSON schema if the content type is JSON -
subject
- The subject of the event, originally the "channel" information -
type
- The type of the event, originallyio.drogue.event.v1
Externally validate an event
This will send an event to an external endpoint and wait for the response.
HTTP status code | Outcome | Description |
---|---|---|
200, 204 |
Continue |
The event can continue processing |
202 |
Accept |
The event gets directly accepted. No further procesing is performed. |
400..=499 |
Reject |
The event gets rejected. If the response payload is JSON, and contains a |
any other |
This will the processing with a server side error. Devices are encouraged to re-try later. |
then:
- validate:
endpoint: {} (1)
request: (2)
type: cloudEvent (3)
mode: binary # or structured (4)
1 | Endpoint configuration as described in External endpoints |
2 | Parameters for the outgoing request. Defaults to binary encoded cloud event. |
3 | Selects the type of the encoding |
4 | The cloud events mode: binary or structured |
Externally enrich an event
This will send an event to an external endpoint and wait for the response. The headers and body of the response will be used as the metadata and payload of the new event.
then:
- enrich:
endpoint: {} (1)
request: (2)
type: cloudEvent (3)
mode: binary # or structured (4)
response: (5)
type: cloudEvent (6)
1 | Endpoint configuration as described in External endpoints |
2 | Parameters for the outgoing request. Defaults to binary encoded cloud event. |
3 | Selects the type of the encoding |
4 | The cloud events mode: binary or structured |
5 | Parameters for the incoming response. Defaults to binary encoded cloud event. |
6 | The response type |
The response can be one of:
-
cloudEvent
- A cloud event (binary of structured). -
raw
- Only use the response body as payload, keep the metadata. -
assumeStructuredCloudEvent
- Assume the response body contains a structured cloud event, with attributes/extensions as part of the root level. However, the response content type is ignored, although it normally must beapplication/cloudevents+json; charset=UTF-8
. This can be used for broken cloud events serialization.