Skip to content

fix: use original event with new payload #414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 86 additions & 123 deletions doc/2/concepts/measures-ingestion/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ class AbeewayDecoder extends Decoder {

The `decode` method is in charge of transforming the raw data into standardized measures.

It receives two arguments:
It receives three arguments:

- `decodedPayload`: instance of `DecodedPayload` used to extract measures
- `payload`: raw data
- `request`: Kuzzle request

Each measure must be extracted using the `addMeasurement` method of the `decodedPayload` object.

Expand All @@ -170,7 +171,8 @@ export class AbeewayDecoder extends Decoder {

async decode(
decodedPayload: DecodedPayload<AbeewayDecoder>,
payload: JSONObject
payload: JSONObject,
request: KuzzleRequest
) {
decodedPayload.addMeasurement<TemperatureMeasurement>(
payload.deviceEUI, // device reference
Expand All @@ -188,6 +190,7 @@ export class AbeewayDecoder extends Decoder {
}
}
```
- `request` can be use to interract with [kuzzle request](https://docs.kuzzle.io/core/2/framework/classes/kuzzle-request/properties/) as documented onto kuzzle documentation. (ex: configure response format using [`request.response.configure`](https://docs.kuzzle.io/core/2/framework/classes/request-response/configure/).)

### Registration on the framework

Expand Down Expand Up @@ -252,164 +255,124 @@ For each measure contained in the Kuzzle IoT Platform, it is possible to go back

The `payloadUuids` property contained in the measures allows you to search the `payloads` collection to find the corresponding data frames.

# Ingestion pipeline events
## Measures Sources

It is possible to execute additional processing within the ingestion pipeline by using one of the events provided for this purpose.
Measures can originate from different sources:

Adding new business rules is done using the Kuzzle pipe mechanism (See [Event System](/core/2/guides/develop-on-kuzzle/event-system/)).
### Device Measure Source

Depending on the treatments, it is better to choose one or the other and that is what we are going to see now.
Represents measures coming directly from a physical device.

![Ingestion pipeline](./ingestion-pipeline.png)
- **Properties:**
- `type`: Always "device"
- `id`: Unique identifier of the source
- `reference`: Device reference
- `deviceMetadata`: Metadata of the device
- `model`: Device model
- `lastMeasuredAt`: (optional) Timestamp of the last measurement

## Data enrichment processing (before)

It is best to enrich data at this level of the ingestion pipeline to limit the number of document versions.

Also, the measures will be propagated to the different entities of the Kuzzle IoT Platform, so it is more difficult to modify them afterwards.
### API Measure Source

The ingest pipeline offers a mechanism to modify them before they are propagated and then persisted, so it is possible to modify their contents before this step.
Represents measures coming from an API call.

The `device-manager:measures:process:before` event is triggered with an object containing 3 properties:
- **Properties:**
- `type`: Always "api"
- `id`: Unique identifier of the source

- `device`: the last state of the device associated with the measures
- `measures`: table of measures
- `asset`: (optional) the last state of the asset linked to the device
## Measures Targets

::: info
An isolated version of the event is also available: `engine:<engine-id>:device-manager:measures:process:before`
:::
Measures are directed towards specific targets:

Another event is triggered after updating the asset and the device with the latest measures but before being persisted: `device-manager:measures:persist:after`.
### Device Measure Target

The object passed to the event is the same as for the previous event.
Targets a device entity within the platform.

### Enrich existing measures
- **Properties:**
- `type`: Always "device"
- `assetId`: (optional) linked Asset identifier
- `indexId`: Index identifier

It is possible to modify the fields of existing measures directly by manipulating the table of measures.
### API Measure Target

- **_Example: calculation of power in watts from volts and amps_**
Targets an API entity or external systems.

```jsx
app.pipe.register <
EventMeasureProcessBefore >
("device-manager:measures:process:before",
async ({ measures, device, asset }) => {
for (const measure of measures) {
if (measure.type === "power") {
measure.values.watt = measure.values.volt * measure.values.ampere;
}
}
- **Properties:**
- `type`: Always "api"
- `assetId`: Associated asset identifier
- `engineGroup`: (optional) Specific engine group targeted
- `indexId`: Index identifier

return { measures, device, asset };
});
```

### Add new measures
## Events Emitted During Measure Processing

New measures can be created and added to the measure table.
To enrich or modify measures during ingestion, the Kuzzle IoT Platform provides events emitted at various stages. Below are the details on events triggered during measure processing:

If these measures are present in the device or device, then they must also be added to them.
![Ingestion pipeline](./ingestion-pipeline.png)

- **_Example: retrieving the temperature from an API from the current position_**
### Event Before process : `device-manager:measures:process:before`

```jsx
app.pipe.register <
EventMeasureProcessBefore >
("device-manager:measures:process:before",
async ({ measures, device, asset }) => {
const measuresCopy = [...measures];
Triggered before measures are processed and persisted. Ideal for enriching measures or adjusting metadata prior to saving.

// Iterate on a copy because we are mutating the original array
for (const measure of measuresCopy) {
if (measure.type === "position") {
const temperature = await weatherApi.getTemperature(
measure.values.position
);
- **Event properties:**
- `source`: Origin of the measures (Device or API).
- `target`: Intended target for the measures (Device or API).
- `asset`: (optional) Asset associated with the device.
- `measures`: Array of measures to be processed.

const temperatureMeasure: MeasureContent = {
measuredAt: Date.now(),
type: "temperature",
asset: {
_id: asset._id,
measureName: "temperature",
metadata: asset._source.metadata,
model: asset._source.model,
reference: asset._source.reference,
},
origin: {
type: "computed",
measureName: "temperature",
_id: "weather-api",
payloadUuids: measure.origin.payloadUuids,
},
values: { temperature },
};
_Example of enriching a measure:_

// Add the new measure to the array so it will be persisted
measures.push(temperatureMeasure);
```javascript
app.pipe.register('device-manager:measures:process:before', async ({ measures, source, target, asset }) => {
for (const measure of measures) {
if (measure.type === "temperature" && measure.values.celsius) {
measure.values.fahrenheit = measure.values.celsius * 9/5 + 32;
}
}

// Embed the new measure in the asset so it will be persisted
asset._source.measures.temperature = {
name: "temperature",
type: "temperature",
measuredAt: Date.now(),
originId: "weather-api",
values: { temperature },
payloadUuids: measure.origin.payloadUuids,
};
}
}
return { measures, source, target, asset };
});
```

return { measures, device, asset };
});
```
### Tenant-specific event

### Edit metadata

The metadata of the device and the asset can also be modified according to the information received in the measures.

** Example: automatic switch to low battery mode**
An isolated tenant-specific event variant is also available:

```jsx
app.pipe.register <
EventMeasureProcessBefore >
("device-manager:measures:process:before",
async ({ measures, device, asset }) => {
for (const measure of measures) {
if (measure.type === "battery" && measure.values.volts < 1.5) {
device._source.metadata.mode = "saving";
}
}

return { measures, device, asset };
});
```typescript
engine:<engine-id>:device-manager:measures:process:before
```

## Processing in reaction to the data (after)
### Event After Process(`device-manager:measures:process:after`)

Once the data has been enriched and persisted, it is possible to trigger additional processing.
Triggered after measures have been processed and persisted. Suitable for triggering further actions based on new data.

These treatments are not intended to modify the existing data (measure, device and asset).
- **Event properties:**
- `source`: Origin of the measures.
- `target`: Destination for the measures.
- `asset`: (optional) Updated asset state associated with the measures.
- `measures`: Array of persisted measures.

The `device-manager:measures:process:after` event is triggered with an object containing 3 properties:
_Example: Trigger an alert after processing measures_

- `device`: the new state of the device associated with the measures
- `measures`: table of measures
- `asset`: (optional) the new state of the asset linked to the device
```javascript
app.pipe("device-manager:measures:process:after", async ({ measures, source, asset }) => {
for (const measure of measures) {
if (measure.type === "security_alert" && measure.values.alertType === "forced_entry") {
notifySecurityTeam(asset, measure);
}
}

::: info
An isolated version of the event is also available: `engine:<engine-id>:device-manager:measures:process:after`
:::
return { measures, source, target, asset };
});
```

## Ingestion Pipeline Concurrency
### Tenant-specific events

In order to avoid race conditions in the pipeline, a Mutex ensures that the measures of a device are processed one after the other.
Tenant-specific events are also available for isolated measure processing:

This Mutex is related to the device ID of the processed measures.
- **Before processing:**
- `engine:<engine-id>:device-manager:measures:process:before`
- **After processing:**
- `engine:<engine-id>:device-manager:measures:process:after`

Examples:
These events carry the same properties as their global counterparts and allow for tenant-specific customizations and workflows.

- reception of 1 data frame containing 4 measures for 4 devices ⇒ execution of 4 pipelines in parallel,
- reception of 1 data frame containing 2 measures for 1 device ⇒ execution of a pipeline processing the two measures for the device
- reception of 2 data frames containing 1 measure for 1 device ⇒ execution of 2 pipelines sequentially for the device
Binary file modified doc/2/concepts/measures-ingestion/ingestion-pipeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
24 changes: 12 additions & 12 deletions lib/modules/measure/MeasureService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import { BaseService } from "../shared";
import { DecodedMeasurement, MeasureContent } from "./types/MeasureContent";
import {
AskMeasureSourceIngest,
EventMeasureProcessSourceAfter,
EventMeasureProcessSourceBefore,
TenantEventMeasureProcessSourceAfter,
TenantEventMeasureProcessSourceBefore,
EventMeasureProcessAfter,
EventMeasureProcessBefore,
TenantEventMeasureProcessAfter,
TenantEventMeasureProcessBefore,
} from "./types/MeasureEvents";
import {
ApiMeasureSource,
Expand Down Expand Up @@ -113,14 +113,14 @@ export class MeasureService extends BaseService {
*
* Useful to enrich measures before they are saved.
*/
await this.app.trigger<EventMeasureProcessSourceBefore>(
"device-manager:measures:process:sourceBefore",
await this.app.trigger<EventMeasureProcessBefore>(
"device-manager:measures:process:before",
{ asset, measures, source, target },
);

if (indexId) {
await this.app.trigger<TenantEventMeasureProcessSourceBefore>(
`engine:${indexId}:device-manager:measures:process:sourceBefore`,
await this.app.trigger<TenantEventMeasureProcessBefore>(
`engine:${indexId}:device-manager:measures:process:before`,
{ asset, measures, source, target },
);
}
Expand Down Expand Up @@ -153,14 +153,14 @@ export class MeasureService extends BaseService {
* Useful to trigger business rules like alerts
*
*/
await this.app.trigger<EventMeasureProcessSourceAfter>(
"device-manager:measures:process:sourceAfter",
await this.app.trigger<EventMeasureProcessAfter>(
"device-manager:measures:process:after",
{ asset, measures, source, target },
);

if (indexId) {
await this.app.trigger<TenantEventMeasureProcessSourceAfter>(
`engine:${indexId}:device-manager:measures:process:sourceAfter`,
await this.app.trigger<TenantEventMeasureProcessAfter>(
`engine:${indexId}:device-manager:measures:process:after`,
{ asset, measures, source, target },
);
}
Expand Down
16 changes: 8 additions & 8 deletions lib/modules/measure/types/MeasureEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ export type AskMeasureSourceIngest = {
*
* Useful to enrich measures before they are saved.
*/
export type EventMeasureProcessSourceBefore = {
name: "device-manager:measures:process:sourceBefore";
export type EventMeasureProcessBefore = {
name: "device-manager:measures:process:before";

args: [
{
Expand All @@ -45,8 +45,8 @@ export type EventMeasureProcessSourceBefore = {
*
* Useful to enrich measures before they are saved.
*/
export type TenantEventMeasureProcessSourceBefore = {
name: `engine:${string}:device-manager:measures:process:sourceBefore`;
export type TenantEventMeasureProcessBefore = {
name: `engine:${string}:device-manager:measures:process:before`;

args: [
{
Expand All @@ -61,8 +61,8 @@ export type TenantEventMeasureProcessSourceBefore = {
/**
* Event after processing new measures from data source.
*/
export type EventMeasureProcessSourceAfter = {
name: "device-manager:measures:process:sourceAfter";
export type EventMeasureProcessAfter = {
name: "device-manager:measures:process:after";

args: [
{
Expand All @@ -78,8 +78,8 @@ export type EventMeasureProcessSourceAfter = {
* Tenant event after processing new measures from data source.
*
*/
export type TenantEventMeasureProcessSourceAfter = {
name: `engine:${string}:device-manager:measures:process:sourceAfter`;
export type TenantEventMeasureProcessAfter = {
name: `engine:${string}:device-manager:measures:process:after`;

args: [
{
Expand Down
9 changes: 3 additions & 6 deletions tests/application/tests/pipes.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import { Backend } from "kuzzle";

import {
MeasureContent,
EventMeasureProcessSourceBefore,
} from "../../../index";
import { MeasureContent, EventMeasureProcessBefore } from "../../../index";

import { TemperatureMeasurement } from "../measures";

Expand Down Expand Up @@ -82,8 +79,8 @@ function addTemperatureWeatherMeasure(
}

export function registerTestPipes(app: Backend) {
app.pipe.register<EventMeasureProcessSourceBefore>(
"device-manager:measures:process:sourceBefore",
app.pipe.register<EventMeasureProcessBefore>(
"device-manager:measures:process:before",
async ({ source, target, asset, measures }) => {
if (source.id === "DummyTemp-enrich_me_master") {
enrichTemperatureMeasures(measures);
Expand Down