Rule Engine
This addon is a local processing engine that allows you to apply rules on data produced by any custom service before it is sent to your Connhex Cloud instance.
Use cases
This addon finds use in multiple scenarios. For example, it can be used to:
- decimate data in a time-window;
- apply comparison operators to data (e.g. create a moving average of the last
10
measurements and discard it if it's lower than100
); - conditionally upload data to Connhex Cloud (e.g. only if recorded measurements are above a given threshold). This is particularly useful when edges produce lots of repetitive data, or the application benefits from only logging anomalous conditions;
- build offline conditional flows, instead of creating dedicated firmware logic (e.g. enable outputs based on some input state).
See here for a real-world example.
How it works
The Rule Engine addon is a stream-based processing engine. Every time the addon is loaded, it checks the agent.toml
Connhex Edge configuration file and create any rules that are defined in it.
Each user-created rule defines:
- the computing logic to be applied to the incoming data
- the input streaming data source
- the sink or action to be applied to the output.
The logic is expressed through SQL statements, fully compatible with eKuiper's query language.
Stream
Each stream contains the information about the data source (i.e. the NATS
or REDIS
subject) from which data is coming. It is defined by the following parameters:
name
: the name of the stream;source
: the NATS or REDIS input subject.
Each source
parameter is automatically prefixed with .re
to indicate that it is a subject that will be processed by the Rule Engine addon.
Rule
Each rule object contains the information about the processing function and the output subject where the processed data is sent. It is defined by the following parameters:
name
: the name of the rule;id
: the rule id;sql
: the SQL query to be applied to the data coming from the input stream (e.g.SELECT * FROM test-stream
);enabled
: whether the rule is enabled or not;sink
: the output NATS or REDIS subject where the processed data will be published.
Example
Next, we will describe a real-world rule engine application example, as shown in the following diagram:
We have an Edge
configured with two custom services: Custom Service 1
and Custom Service 2
. The first service is responsible for reading data coming from three external sensors (a temperature sensor, a humidity sensor and a VOC sensor) via the Modbus protocol. This data is then published to three different subjects that are processed by the Rule Engine (note the re.
subject prefix).
The second custom service is subscribed to the high_temp
subject in order to detect high temperature alarms. When a message is received, the service will notify the Connhex Edge Agent via the events.data
subject. The message is then automatically sent to the cloud by the agent using the MQTTs protocol.
The Rule Engine addon has been configured with three different rules, named:
High temp
: with output subjecthigh_temp
;Avg Humidity
: with output subjectavg_humidity
;High VOC
: with output subjecthigh_voc
.
When a rule is triggered, a message is sent to the output subject associated to the rule. In this simulation, only the first two rules have been triggered.
In order to create the rules above, the re
section of agent.toml
file should be configured as follows:
...
# Rule engine service configuration
[re]
# Streams
[[re.streams]]
name = 'temperature'
source = 'temperature'
[[re.streams]]
name = 'humidity'
source = 'humidity'
[[re.streams]]
name = 'voc'
source = 'voc'
# Rules
[[re.rules]]
name = 'High Temperature'
id = 'high_temp'
# We're assuming that Custom Service 1 is sending data using senML
sql = 'select * from temperature where v > 30'
enabled = true
sink = 'high_temp'
[[re.rules]]
name = 'Average Humidity'
id = 'avg_humidity'
# We're assuming that Custom Service 1 is sending data using senML
sql = 'select avg(v) from humidity group by TUMBLINGWINDOW(ss, 5)'
enabled = true
sink = 'avg_humidity'
[[re.rules]]
name = 'High VOC'
id = 'high_voc'
# We're assuming that Custom Service 1 is sending data using senML
sql = 'select * from voc where v > 200'
enabled = true
sink = 're.high_voc'
Requirements
- Connhex Edge: >=
v2.1.0
- For supported architectures and minimum hardware requirements, see here.