Skip to main content

Rule Engine

This addon is a local processing engine that allows you to apply rules on data produced by any custom service before it is sent to your Connhex Cloud instance.

Use cases

This addon finds use in multiple scenarios. For example, it can be used to:

  • decimate data in a time-window;
  • apply comparison operators to data (e.g. create a moving average of the last 10 measurements and discard it if it's lower than 100);
  • conditionally upload data to Connhex Cloud (e.g. only if recorded measurements are above a given threshold). This is particularly useful when edges produce lots of repetitive data, or the application benefits from only logging anomalous conditions;
  • build offline conditional flows, instead of creating dedicated firmware logic (e.g. enable outputs based on some input state).

See here for a real-world example.

How it works

The Rule Engine addon is a stream-based processing engine. Every time the addon is loaded, it checks the agent.toml Connhex Edge configuration file and create any rules that are defined in it.

Each user-created rule defines:

  • the computing logic to be applied to the incoming data
  • the input streaming data source
  • the sink or action to be applied to the output.

The logic is expressed through SQL statements, fully compatible with eKuiper's query language.

Stream

Each stream contains the information about the data source (i.e. the NATS or REDIS subject) from which data is coming. It is defined by the following parameters:

  • name: the name of the stream;
  • source: the NATS or REDIS input subject.
info

Each source parameter is automatically prefixed with .re to indicate that it is a subject that will be processed by the Rule Engine addon.

Rule

Each rule object contains the information about the processing function and the output subject where the processed data is sent. It is defined by the following parameters:

  • name: the name of the rule;
  • id: the rule id;
  • sql: the SQL query to be applied to the data coming from the input stream (e.g. SELECT * FROM test-stream);
  • enabled: whether the rule is enabled or not;
  • sink: the output NATS or REDIS subject where the processed data will be published.

Example

Next, we will describe a real-world rule engine application example, as shown in the following diagram:

Connhex Rule Engine Addon: example.

We have an Edge configured with two custom services: Custom Service 1 and Custom Service 2. The first service is responsible for reading data coming from three external sensors (a temperature sensor, a humidity sensor and a VOC sensor) via the Modbus protocol. This data is then published to three different subjects that are processed by the Rule Engine (note the re. subject prefix).

The second custom service is subscribed to the high_temp subject in order to detect high temperature alarms. When a message is received, the service will notify the Connhex Edge Agent via the events.data subject. The message is then automatically sent to the cloud by the agent using the MQTTs protocol.

The Rule Engine addon has been configured with three different rules, named:

  • High temp: with output subject high_temp;
  • Avg Humidity: with output subject avg_humidity;
  • High VOC: with output subject high_voc.

When a rule is triggered, a message is sent to the output subject associated to the rule. In this simulation, only the first two rules have been triggered.

In order to create the rules above, the re section of agent.toml file should be configured as follows:

agent.toml
...

# Rule engine service configuration
[re]
# Streams
[[re.streams]]
name = 'temperature'
source = 'temperature'

[[re.streams]]
name = 'humidity'
source = 'humidity'

[[re.streams]]
name = 'voc'
source = 'voc'

# Rules
[[re.rules]]
name = 'High Temperature'
id = 'high_temp'
# We're assuming that Custom Service 1 is sending data using senML
sql = 'select * from temperature where v > 30'
enabled = true
sink = 'high_temp'

[[re.rules]]
name = 'Average Humidity'
id = 'avg_humidity'
# We're assuming that Custom Service 1 is sending data using senML
sql = 'select avg(v) from humidity group by TUMBLINGWINDOW(ss, 5)'
enabled = true
sink = 'avg_humidity'

[[re.rules]]
name = 'High VOC'
id = 'high_voc'
# We're assuming that Custom Service 1 is sending data using senML
sql = 'select * from voc where v > 200'
enabled = true
sink = 're.high_voc'

Requirements

  • Connhex Edge: >= v2.1.0
  • For supported architectures and minimum hardware requirements, see here.