-
Notifications
You must be signed in to change notification settings - Fork 34
feat(app): add 514-wasm-msg-to-dss WASM map operator with DSS enrichment pattern #354
Description
Summary
Add src/500-application/514-wasm-msg-to-dss — a reusable WASM map operator that writes any incoming JSON message to the AIO Distributed State Store (DSS) under a configurable key, then passes the message through unchanged to downstream nodes. This operator enables no-code DSS enrichment in any downstream dataflow graph using AIO's built-in transform capabilities.
Value
Without this component, populating the DSS from a live message stream requires either custom WASM development or manual CLI tooling. This operator removes that barrier by providing a single, configurable WASM module that:
- Works with any JSON schema — key extraction uses RFC 6901 JSON Pointer (
/id,/data/record_id,/items/0/id) - Inserts into any existing pipeline without disrupting data flow — the original message is always returned unchanged
- Enables full DSS enrichment in a second pipeline using only AIO's built-in map, filter, and branch transforms — no custom WASM required on the read side
Scope
| Deliverable | Location |
|---|---|
WASM operator (msg-to-dss-key) |
src/500-application/514-wasm-msg-to-dss/operators/msg-to-dss-key/ |
| Graph definition OCI artifact | src/500-application/514-wasm-msg-to-dss/resources/graphs/graph-msg-to-dss-key.yaml |
| Build and push scripts | src/500-application/514-wasm-msg-to-dss/scripts/ |
| Component README | src/500-application/514-wasm-msg-to-dss/README.md |
| Blueprint tfvars example | blueprints/full-single-node-cluster/terraform/dataflow-graphs-msg-to-dss.tfvars.example |
Operator Configuration
| Parameter | Required | Default | Description |
|---|---|---|---|
keyPath |
Yes | — | RFC 6901 JSON Pointer to the field used as the DSS key, e.g. /deviceId |
ttlSeconds |
Yes | — | TTL in seconds; 0 for no expiration |
keyPrefix |
No | "" |
Prefix prepended to the extracted key, e.g. device: |
onMissing |
No | skip |
skip (passthrough + warning) or error (drop message) |
Init returns false — preventing dataflow startup — if keyPath or ttlSeconds are absent or invalid.
End-to-End Pattern: Write with WASM, Enrich with Built-in Transforms
This is the primary use case. Two independent dataflow graphs handle the write and read sides.
Pipeline A — Index incoming messages into DSS
The WASM operator writes each message to the DSS as a named key while forwarding the message unchanged:
MQTT: devices/+/config
→ [msg_to_dss_key: keyPath=/deviceId, keyPrefix=device-configs, ttlSeconds=3600]
→ MQTT: devices/config/ack
For a message {"deviceId":"sensor-001","calibration":1.05,"unit":"celsius"}, this writes the full JSON to DSS key device-configssensor-001.
Note: With
keyPrefix=device-configsand no separator, the key isdevice-configssensor-001. To getdevice-configs:sensor-001, setkeyPrefixtodevice-configs:.
Pipeline B — Enrich telemetry using DSS context (no custom WASM)
This feature will not create the pipeline but needs to document it for users of this operator.
A second DataflowGraph uses AIO's built-in map transform with a datasets entry to look up the stored record. The key field in datasets is the DSS key written by Pipeline A. Use the as keyword to assign a short alias:
# Built-in map transform — no custom WASM required
datasets:
- key: "device-configs:sensor-001 as sensor"
inputs:
- "$source.deviceId" # field from the incoming telemetry message
- "$context.deviceId" # field from the stored DSS record
expression: "$1 == $2"Matched fields from the stored record are referenced in rules as $context(<alias>).<field>:
map:
# Copy a single field from the stored configuration into the output
- inputs:
- "$context(sensor).calibration"
- "$source.rawValue"
output: "calibratedValue"
expression: "$1 * $2"
# Copy ALL top-level fields from the stored record into the output (map only)
- inputs:
- "$context(sensor).*"
output: "*"
# Copy only the nested 'location' sub-object fields
- inputs:
- "$context(sensor).location.*"
output: "location.*"Enrichment Behavior Reference
Sourced from Enrich data with external datasets in data flow graphs:
| Behavior | Detail |
|---|---|
| Supported transforms | Map, filter, branch — not window (accumulate) transforms |
| Dataset format | NDJSON: one JSON object per line at the configured DSS key |
| Match semantics | First record in the dataset where the expression evaluates to true |
| No match | Rules referencing $context(alias) fields are skipped — message is not dropped |
| State store unreachable | Transformation fails for that message (error propagates) |
| Wildcard inputs | $context(alias).* supported in map rules only; not in dataset definitions themselves |
| Change propagation | Runtime caches records and receives DSS change notifications — updates reflect without redeploy |
| Alias syntax | key: "long-dss-key as short" → reference as $context(short).<field> |
Scalability Note: Lookup Table vs. Per-Entity Keys
This operator writes one JSON object per DSS key (the per-entity snapshot pattern). This is optimally paired with downstream graphs that reference a known, fixed set of DSS keys in their datasets configuration.
For a scalable lookup table pattern — where a single DSS key holds records for many entities and the enrichment matches by expression — the DSS key should hold NDJSON content covering multiple records. This can be populated using the AIO state store CLI. The msg-to-dss-key operator's per-entity output is not directly suitable for this pattern without an aggregation step.
Acceptance Criteria
-
514-wasm-msg-to-dss/directory structure mirrors512-avro-to-json - Operator init returns
falsefor missing/invalidkeyPathorttlSeconds - State store write errors are logged without dropping the message
- Original message is returned unchanged in all code paths
- 18 unit tests pass with
cargo test(native host, no WASM target required) - WASM binary builds successfully with
./scripts/build-wasm.sh -
dataflow-graphs-msg-to-dss.tfvars.exampleprovides a copy-paste-ready pipeline - README documents the enrichment stitching pattern with
$contextsyntax examples -
src/500-application/README.mdupdated with the new component entry