Event pre-processing

One core functionality of Drogue Cloud is to take device events and send deliver them to a Kafka topic, so that interested application can consume them. In that process, Drogue Cloud takes care of device connectivity, authentication & authorization, and some very basic validation steps. However, most of that processing is payload agnostic. By default, payload and metadata just forwarded as is.

In some cases it would be helpful though, if metadata or payload could be altered of verified with additional logic during the reception of the event.

Scope

What might come in handy in situation like this, is a rule based engine, which allows inspecting metadata and payload and decide how to process the event further. Rejecting, diverting, or altering it in the process.

However, such a system can get out of hand pretty quickly, and we do not want to re-invent the wheel. A lot of systems like that already exists: Apache Camel for example, to mention just one of them.

Also, allowing a user to configure a complex piece of logic for a cloud side, "as a service" platform can be rather tricky. Just imagine someone would validate "what the next matching bitcoin hash" is. It would be difficult to measure and project the costs caused by user provided validations.

So Drogue Cloud tries to strike a balance. Support very simple and stateless operations, while allowing to integrate for more complex operations, hosted by the user.

Just two quick examples: Drogue Cloud does offer a way to overwrite the "content type" attribute for an event if its channels matches a specific value. But for letting an AI/ML model categorize values of an even, we allow passing the event into an external endpoint, and consume it back as the response, before passing it along the chain. This way you can spin up your own (optionally Knative serving based) endpoint, and do complex calculations there, but do all of this as part of the Drogue Cloud device facing endpoints.

Configuration

The functionality is configured as part of the "application" resource in the .spec.publish path.

The configuration consists of two main parts:

  • rules which select when operations get triggered

  • what operations should be performed when a rule matches

Overall, even pre-processing will iterate over all rules, as defined in the order of the configuration and check if a rule matches. If it does, its operations will be executed. The outcome of an operation can one of: continue, accept, reject, drop.

When the outcome is to continue, the event might have been modified, and the processing will continue with the next rule to check.

When the outcome is to accept, the event might have been modified, the processing will stop, and the event will directly be delivered to the final destination.

When the outcome is to reject, the event will be rejected as invalid, responding this condition to the device that sent the event, in case the device waits for a response.

When the outcome is to drop, the event will be silently dropped. Towards the device, the operation is still successful.

spec:
  publish:
    rules:
      - when: {} (1)
        then: [] (2)
1 One object which defines if the rule should be activated
2 An array of operations which should be executed when the rule matches

Rules

Checking for a channel

You can use the isChannel matcher. For example:

when:
  isChannel: my-channel (1)
1 The name of the channel which the event must match

Inverting

You can invert the outcome of a check using the not operation:

when:
  not:
    isChannel: my-channel

This checks if "the channel is not `my-channel`".

Always matching

A rule that always matches is always.

when: always

You can also turn this into "never" using the following:

when:
  not: always

And/Or

It is also possible to use multiple checks and combined them with and or or:

when:
  or:
    - isChannel: foo
    - isChannel: bar
    - isChannel: baz

Which would match if the channel is either foo, bar, or baz.

When an or or and check has no sub-checks, it evaluates to false.

Operations

Operations are part of the then field, which is an array. All operations will be executed in order. The outcome of an operation may influence the processing of the next. If the array is empty, then simply no operations will be executed.

Drop / Accept / Reject

You can silently drop the event using drop:

then: drop

Or accept the event directly, and stop processing the following operations and rules:

then:
  - break

Rejecting the event will also stop processing, but return the provided reason back to the sender:

then:
  - reject: This is not right

Setting/Removing attributes/extensions

It also is possible to set or remove cloud event attributes or extensions.

Cloud event attributes are similar to extensions, they are just extensions that are part of the official specification and thus are treated differently.

You can set or remove arbitrary extensions using:

then:
  - setExtension: (1)
      name: my-ext
      value: my-value
  - removeExtension: my-other-ext (2)
1 Set extension my-ext to value my-value
2 Remove extension my-other-ext

Setting an extension which already exists will overwrite the value. Removing an extension that does not exist is a no-op.

As attributes are somewhat special it is only possible to set some of them. The following attributes are supported and can be set similar to the setExtension operation:

  • datacontenttype - The content type of the event, e.g. application/json

  • dataschema - The schema of the data, e.g. some JSON schema if the content type is JSON

  • subject - The subject of the event, originally the "channel" information

  • type - The type of the event, originally io.drogue.event.v1

Externally validate an event

This will send an event to an external endpoint and wait for the response.

HTTP status code Outcome Description

200, 204

Continue

The event can continue processing

202

Accept

The event gets directly accepted. No further procesing is performed.

400..=499

Reject

The event gets rejected. If the response payload is JSON, and contains a .reason field, that value of that field is used as rejection cause.

any other

This will the processing with a server side error. Devices are encouraged to re-try later.

then:
  - validate:
      endpoint: {} (1)
      request: (2)
        type: cloudEvent (3)
        mode: binary # or structured (4)
1 Endpoint configuration as described in External endpoints
2 Parameters for the outgoing request. Defaults to binary encoded cloud event.
3 Selects the type of the encoding
4 The cloud events mode: binary or structured

Externally enrich an event

This will send an event to an external endpoint and wait for the response. The headers and body of the response will be used as the metadata and payload of the new event.

then:
  - enrich:
      endpoint: {} (1)
      request: (2)
        type: cloudEvent (3)
        mode: binary # or structured (4)
      response: (5)
        type: cloudEvent (6)
1 Endpoint configuration as described in External endpoints
2 Parameters for the outgoing request. Defaults to binary encoded cloud event.
3 Selects the type of the encoding
4 The cloud events mode: binary or structured
5 Parameters for the incoming response. Defaults to binary encoded cloud event.
6 The response type

The response can be one of:

  • cloudEvent - A cloud event (binary of structured).

  • raw - Only use the response body as payload, keep the metadata.

  • assumeStructuredCloudEvent - Assume the response body contains a structured cloud event, with attributes/extensions as part of the root level. However, the response content type is ignored, although it normally must be application/cloudevents+json; charset=UTF-8. This can be used for broken cloud events serialization.