Skip to content

Add a write correlator to group all attributes of a single write across TRoE and MongoDB#1965

Open
cfreyfh wants to merge 24 commits into
FIWARE:developfrom
cfreyfh:develop
Open

Add a write correlator to group all attributes of a single write across TRoE and MongoDB#1965
cfreyfh wants to merge 24 commits into
FIWARE:developfrom
cfreyfh:develop

Conversation

@cfreyfh

@cfreyfh cfreyfh commented Jun 5, 2026

Copy link
Copy Markdown

Problem

TRoE rows have no key that links all attributes written by a single operation
("entity snapshot"):

  • instanceId is unique per attribute
  • observedAt is a business timestamp, sometimes NULL and not unique
  • ts (request time) is shared within a request but not guaranteed unique

So a query like "give me all attributes written together with
OriginSource = OPC_Server_01"
cannot be answered reliably.

Solution

Introduce a write correlator: a single value per write operation, shared by
every row that write produces.

  • The correlator is taken from the NGSILD-Correlator request header.
  • If the header is absent, one is generated per request
    (urn:ngsi-ld:correlator:<uuid>), so the grouping key always exists.
  • It is resolved exactly once per request (correlatorGet()) and then written
    into every row of that write.

Where it is stored

  • TRoE / PostgreSQL — a new correlator column on entities, attributes
    and subAttributes. All rows of one write share the same value.
  • MongoDB — the entity document stores the correlator of the last write in
    the lastCorrelator field. It is updated by every write path: entity create,
    attribute append, attribute patch/replace and attribute delete.

This provides a guaranteed-unique key grouping all attributes of one snapshot:

SELECT a.* FROM attributes a
WHERE a.correlator IN (
  SELECT correlator FROM attributes
  WHERE id = '<expanded OriginSource name>' AND text = 'OPC_Server_01'
);

claude and others added 13 commits June 5, 2026 09:58
Every write now carries a correlator that identifies the write operation,
taken from the new "NGSILD-Correlator" request header (falling back to
"Fiware-Correlator"); if neither is present one is generated as
"urn:ngsi-ld:correlator:<uuid>". Any "; cbnotif=N" suffix is stripped.

The correlator is resolved once per request (correlatorGet) and stored:
  - on the entity in MongoDB ("lastCorrelator"), on every write path
    (full rebuild via dbModelFromApiEntity, and the partial $set updates
    for PATCH entity, add/replace/delete attribute);
  - on every TRoE row (entities, attributes, subAttributes) in a new
    "correlator" column.

This gives a guaranteed-unique key (for generated correlators) linking all
attributes of one write - something instanceId (per attribute), observedAt
and ts cannot provide. As the correlator is client-influenced, it is stripped
of quotes and SQL-escaped before being embedded in the TRoE INSERTs.

uuid is added to ftClient's link libraries since the correlator resolver
(which uses uuidGenerate) is now reachable from the entity write paths.

https://claude.ai/code/session_01BBQ1eEcF8F3EFEDrUhetpw
pgCommands only checked for a NULL PGresult, but PQexec returns a non-NULL
result with an error status when the statement itself fails (e.g. a missing
column on an un-migrated schema). Such failures were silently dropped: no log,
the transaction aborted and COMMIT acted as ROLLBACK, so the temporal write was
lost without a trace.

Check PQresultStatus explicitly and, on error, log the status, the Postgres
error message and the offending SQL, then roll back.

https://claude.ai/code/session_01BBQ1eEcF8F3EFEDrUhetpw
Functional test (deterministic via client-supplied NGSILD-Correlator):
all attributes of a write share the correlator in TRoE; entities, attributes
and subAttributes share it; two writes get two correlators; the entity stores
it in MongoDB lastCorrelator; the grouping query returns the co-written
attributes; and a header-less write gets a generated urn:ngsi-ld:correlator:
value.

https://claude.ai/code/session_01BBQ1eEcF8F3EFEDrUhetpw
Add write correlator stored on the entity and TRoE rows
Covers all four write paths (create, append, patch, delete) plus the
auto-generated correlator, asserting the entity's MongoDB 'lastCorrelator'
field after each write. Runs without TRoE/Postgres.
Add functional test verifying the write correlator is stored in MongoDB
mongoCmd2 prints 'MongoDB shell version', 'connecting to:', 'MongoDB
server version:' and a trailing 'bye' around the queried value. Add these
(as REGEX) to the expected output of both correlator tests so they pass.
Match mongoCmd2 shell wrapper lines in correlator test expectations
The 201 response from orionCurl emits a trailing blank line, so there are
three blank lines (not two) before the postgresCmd 'prefix' output.
Fix blank-line count before TRoE step 08 prefix output
@github-actions

github-actions Bot commented Jun 5, 2026

Copy link
Copy Markdown

CLA Assistant Lite bot:
Thank you for your submission, we really appreciate it. Like many open-source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution. You can sign the CLA by just posting a Pull Request Comment same as the below format.


I have read the CLA Document and I hereby sign the CLA


2 out of 3 committers have signed the CLA.
@carstenfrey-de
@cfreyfh
@claude
You can retrigger this bot by commenting recheckcla in this Pull Request

carstenfrey-de and others added 7 commits June 5, 2026 14:27
The 'lastCorrelator' entity field already existed (NGSIv2 compat) and was empty
in NGSI-LD. Storing a generated correlator there changed it to a non-deterministic
UUID, breaking ~140 functional tests that dump the entity and expect an empty
lastCorrelator.

Fix: only mirror the correlator to MongoDB when the client actually supplied one
(NGSILD-Correlator / Fiware-Correlator). Generated correlators stay only on the
TRoE rows, so the snapshot grouping is unchanged and existing NGSI-LD behaviour
is preserved when no correlator header is sent.

- correlatorClientProvided() helper
- gate the create path (dbModelFromApiEntity) and the four mongoc update paths
- update the MongoDB functional test + docs for the no-header case
Mirror correlator to MongoDB lastCorrelator only when client-supplied
Mirror correlator to MongoDB lastCorrelator only when client-supplied
Merge 2b4034c accidentally dropped 6 lines from
ngsild_write_correlator_mongodb.test: the '===' underlines of steps 09/10
and the MongoDB shell-version wrapper + 'true' of step 10. This restores
them so the test's expected output matches the broker output again.
Restore test lines dropped by a bad merge resolution
@kzangeli

kzangeli commented Jun 8, 2026

Copy link
Copy Markdown
Collaborator

One very important detail.
The postgres DB layout is changed in the PR, so, all existing DBs need a migration.
Would be great to automate this inside orion-ld (the broker starts, sees the DB layout is old and migrates automatically).
This will not br the last change in DB layout, so, an automatic mechanism like that would be quite useful.

@kzangeli kzangeli left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, all tests pass, all good.
BUT, as I said in a comment, we need to merge the DB model. Any other deployment out there will fail (if they use TRoE) probably at startup once this modification is in, so, we need an automatic "merge" (whatever it's called) of the DB layout.
Can't ship this PR without it.

The broker now versions each TRoE database (a 'metadata' table with a
'schemaVersion' row) and applies pending, idempotent migration steps on
startup and for every tenant database - so existing databases are upgraded
automatically when the layout changes, with no manual/external step.

- pgSchemaMigrate(): versioned, idempotent migration registry guarded by a
  Postgres advisory lock (safe under concurrent broker instances)
- step v1->v2 adds the write 'correlator' column (+ index) to the three tables
- hooked into pgDatabasePrepare (runs after table creation, per database)
- functional test simulating an old database that gets migrated on restart
- docs: new 'Automatic schema migration on startup' section

To add a future layout change: bump PG_SCHEMA_VERSION and append a step.
@cfreyfh

cfreyfh commented Jun 8, 2026

Copy link
Copy Markdown
Author

What do you think about this?

automatic, versioned schema-migration mechanism for the TRoE PostgreSQL database. On startup (and for every tenant database), the broker detects an outdated DB layout and migrates it automatically — no external tooling or manual step required.

This is built as a general mechanism (not a one-off for the correlator column), since the DB layout will keep evolving: future changes just bump a version and append a step.

How it works

  • Each TRoE database carries a metadata table with a schemaVersion row.
  • A database created before this mechanism existed has no such row and is treated as the baseline (1).
  • An ordered, idempotent migration registry lives in code (pgSchemaMigrate). Each step uses ADD COLUMN IF NOT EXISTS / CREATE INDEX IF NOT EXISTS, bumps schemaVersion, and runs in its own transaction (rollback on failure).
  • A PostgreSQL advisory lock wraps the whole migration, so multiple broker instances pointing at the same database never migrate in parallel.
  • Hooked into pgDatabasePrepare, which already runs per database at startup and per tenant.
schemaVersion Change
1 Baseline (released TRoE layout)
2 Adds the write correlator column to entities, attributes, subAttributes (+ index)

Freshly created databases already carry the latest layout, so the steps run as no-ops on them; only pre-existing databases are actually upgraded.

@kzangeli

kzangeli commented Jun 9, 2026

Copy link
Copy Markdown
Collaborator

That's perfect. Exactly what I had in mind

carstenfrey-de and others added 3 commits June 9, 2026 10:39
@sonarqubecloud

sonarqubecloud Bot commented Jun 9, 2026

Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
7 Security Hotspots
B Maintainability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

@cfreyfh cfreyfh requested a review from kzangeli June 11, 2026 13:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants