Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 72 additions & 30 deletions docs/subsystems/conflict_resolution.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Conflict Resolution — Merge Policies and Field Semantics

**Related docs:**

- [Reducer engine](reducer.md) — how merge strategies are executed
- [Schema registry](schema_registry.md) — how to register a schema with custom policies
- [Observation architecture](observation_architecture.md) — `source_priority` and `observed_at`
Expand Down Expand Up @@ -163,20 +164,20 @@ it silently.
"schema_version": "1.0",
"schema_definition": {
"fields": {
"sensor_id": { "type": "string", "required": true },
"value": { "type": "number", "required": true },
"unit": { "type": "string" },
"sensor_id": { "type": "string", "required": true },
"value": { "type": "number", "required": true },
"unit": { "type": "string" },
"recorded_at": { "type": "date" },
"tags": { "type": "array" }
"tags": { "type": "array" }
}
},
"reducer_config": {
"merge_policies": {
"sensor_id": { "strategy": "last_write" },
"value": { "strategy": "highest_priority", "tie_breaker": "observed_at" },
"unit": { "strategy": "last_write" },
"sensor_id": { "strategy": "last_write" },
"value": { "strategy": "highest_priority", "tie_breaker": "observed_at" },
"unit": { "strategy": "last_write" },
"recorded_at": { "strategy": "highest_priority", "tie_breaker": "observed_at" },
"tags": { "strategy": "merge_array" }
"tags": { "strategy": "merge_array" }
}
},
"activate": true
Expand All @@ -203,20 +204,20 @@ Or via the MCP tool:
"schema_version": "1.0",
"schema_definition": {
"fields": {
"sensor_id": { "type": "string", "required": true },
"value": { "type": "number", "required": true },
"unit": { "type": "string" },
"sensor_id": { "type": "string", "required": true },
"value": { "type": "number", "required": true },
"unit": { "type": "string" },
"recorded_at": { "type": "date" },
"tags": { "type": "array" }
"tags": { "type": "array" }
}
},
"reducer_config": {
"merge_policies": {
"sensor_id": { "strategy": "last_write" },
"value": { "strategy": "highest_priority", "tie_breaker": "observed_at" },
"unit": { "strategy": "last_write" },
"sensor_id": { "strategy": "last_write" },
"value": { "strategy": "highest_priority", "tie_breaker": "observed_at" },
"unit": { "strategy": "last_write" },
"recorded_at": { "strategy": "highest_priority", "tie_breaker": "observed_at" },
"tags": { "strategy": "merge_array" }
"tags": { "strategy": "merge_array" }
}
},
"activate": true
Expand All @@ -229,10 +230,10 @@ regardless of which was written more recently.

### `tie_breaker` options

| Value | Behavior within the same `source_priority` tier |
|-------|--------------------------------------------------|
| `"observed_at"` (default) | Most recent observation wins |
| `"source_priority"` | Falls through to `id ASC` (stable, not time-based) |
| Value | Behavior within the same `source_priority` tier |
| ------------------------- | -------------------------------------------------- |
| `"observed_at"` (default) | Most recent observation wins |
| `"source_priority"` | Falls through to `id ASC` (stable, not time-based) |

---

Expand Down Expand Up @@ -286,8 +287,8 @@ const obs = {
entity_type: "invoice",
fields: {
vendor_name: fetchedName,
amount_due: fetchedAmount ?? 0, // ← 0 is now a real observation
}
amount_due: fetchedAmount ?? 0, // ← 0 is now a real observation
},
};

// Correct — omit the field when the read failed
Expand Down Expand Up @@ -344,18 +345,58 @@ This footgun is tracked in
**Mitigation today:** register an explicit schema (`register_schema`) for any entity type
where source priority must be honored.

### The default `source_priority = 100` footgun

`source_priority` **defaults to `100`** (`z.number().optional().default(100)` in
`src/shared/action_schemas.ts`). The default is unchanged for backward compatibility, but
it has a sharp edge on any field whose reducer policy is `highest_priority` (or
`most_specific` with `tie_breaker: "source_priority"`):

> An **unprioritized** write — one that omits `--source-priority` / `source_priority` —
> silently inherits priority `100`, which **outranks** any prior observation written with
> an explicit priority **≤ 99** (e.g. a trusted import at `50`). The unprioritized write
> wins the field. This is a silent **trust-escalation**: a caller who never thought about
> priority at all can override a value that was deliberately ranked lower.

Concretely, with a `highest_priority` schema on `value`:

```
obs_a: { value: "trusted-import", source_priority: 50 } # explicit, deliberately low
obs_b: { value: "casual-write" } # omitted → defaults to 100
# Reducer winner: obs_b ("casual-write"), because 100 > 50.
```

**Why the default isn't being changed:** lowering it (e.g. to `0`) would silently flip the
winner of existing reductions across every deployment — a breaking semantic change. The
mitigation is detection + discipline, not a new default.

**Limitation — default vs. explicit `100` are indistinguishable at write time:** zod
applies `.default(100)` before the request handler runs, so by the time the write is
processed there is no way to tell "caller omitted `source_priority`" from "caller
explicitly passed `100`". The advisory below therefore fires on **both** cases (framed as
"default/non-explicit 100"); a caller who genuinely intends priority `100` can ignore it.

**Detection:** a write at priority `100` into a `highest_priority` field emits a
non-blocking `SOURCE_PRIORITY_ESCALATION` `store_warning` (issue
[#1838](https://github.com/markmhendrickson/neotoma/issues/1838)) naming the affected
field(s). It is advisory only — nothing is rejected and no merge semantics change.

**Mitigation today:** **always pass an explicit `--source-priority` / `source_priority`**
for any priority-semantic write (one that targets a `highest_priority` field), reflecting
how that write should rank against others — rather than relying on the implicit `100`.

---

## Quick Reference

| I want... | Strategy | Notes |
|-----------|----------|-------|
| Latest write wins | `last_write` | Default for auto-discovered schemas |
| Trusted source wins | `highest_priority` | Requires a registered schema; `source_priority` on observations |
| Most specific value wins | `most_specific` | Requires `specificity_score` on observations |
| Accumulate all values | `merge_array` | Field must be array type; corrections replace |
| Field cleared | Write `null` explicitly | Use `correct()` at high priority |
| Field kept from earlier observation | Omit the field | Do not write `0` or `null` as sentinel |
| I want... | Strategy | Notes |
| ----------------------------------- | ----------------------- | --------------------------------------------------------------- |
| Latest write wins | `last_write` | Default for auto-discovered schemas |
| Trusted source wins | `highest_priority` | Requires a registered schema; `source_priority` on observations |
| Most specific value wins | `most_specific` | Requires `specificity_score` on observations |
| Accumulate all values | `merge_array` | Field must be array type; corrections replace |
| Field cleared | Write `null` explicitly | Use `correct()` at high priority |
| Field kept from earlier observation | Omit the field | Do not write `0` or `null` as sentinel |

## Related Documents

Expand All @@ -364,3 +405,4 @@ where source priority must be honored.
- [observation_architecture.md](observation_architecture.md) — `source_priority`, `observed_at`, `specificity_score`
- [issue #1755](https://github.com/markmhendrickson/neotoma/issues/1755) — LWW-default footgun (source-priority silently ignored for auto-discovered schemas)
- [issue #1756](https://github.com/markmhendrickson/neotoma/issues/1756) — write-time value constraints (range/enum/required-enforcement)
- [issue #1838](https://github.com/markmhendrickson/neotoma/issues/1838) — default `source_priority = 100` footgun (unprioritized write silently outranks an explicit lower priority; `SOURCE_PRIORITY_ESCALATION` advisory)
16 changes: 16 additions & 0 deletions src/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1663,7 +1663,7 @@
// Encryption off: local requests can use no-auth. HTTP (insecure) defaults to anonymous 000... user; HTTPS/localhost can use dev-local.
if (!authHeader?.startsWith("Bearer ") && !connectionIdHeader) {
if (isLocalRequest(req)) {
const isInsecure = req.protocol === "http" || !(req as any).secure;

Check warning on line 1666 in src/actions.ts

View workflow job for this annotation

GitHub Actions / baseline

Unexpected any. Specify a different type
if (isInsecure) {
req.headers["x-connection-id"] = "dev-local-http";
connectionIdHeader = "dev-local-http";
Expand Down Expand Up @@ -1953,7 +1953,7 @@
},
() => transport!.handleRequest(req, res, req.body)
);
} catch (error: any) {

Check warning on line 1956 in src/actions.ts

View workflow job for this annotation

GitHub Actions / baseline

Unexpected any. Specify a different type
logger.error("[MCP HTTP] Request error:", error);
if (!res.headersSent) {
res.status(500).json({
Expand Down Expand Up @@ -2316,7 +2316,7 @@
const result = await initiateOAuthFlow(connection_id, client_name, finalRedirectUri);

return res.json(result);
} catch (error: any) {

Check warning on line 2319 in src/actions.ts

View workflow job for this annotation

GitHub Actions / baseline

Unexpected any. Specify a different type
logError("MCPOAuthInitiate", req, error);

// Check if it's a structured OAuthError
Expand All @@ -2326,7 +2326,7 @@
message: string;
statusCode: number;
retryable?: boolean;
details?: Record<string, any>;

Check warning on line 2329 in src/actions.ts

View workflow job for this annotation

GitHub Actions / baseline

Unexpected any. Specify a different type
};

return res.status(oauthError.statusCode).json({
Expand Down Expand Up @@ -2393,7 +2393,7 @@
process.env.NEOTOMA_FRONTEND_URL || process.env.FRONTEND_URL || "http://localhost:5195";
const successUrl = `${frontendBase}/oauth?connection_id=${encodeURIComponent(connectionId)}&status=success`;
return res.redirect(successUrl);
} catch (error: any) {

Check warning on line 2396 in src/actions.ts

View workflow job for this annotation

GitHub Actions / baseline

Unexpected any. Specify a different type
logError("MCPOAuthCallback", req, error);

// Extract structured error information
Expand Down Expand Up @@ -2690,7 +2690,7 @@
});

return res.redirect(result.authUrl);
} catch (error: any) {

Check warning on line 2693 in src/actions.ts

View workflow job for this annotation

GitHub Actions / baseline

Unexpected any. Specify a different type
logError("MCPOAuthAuthorize", req, error);
return res.status(500).send(error.message ?? "Authorization failed");
}
Expand Down Expand Up @@ -2780,7 +2780,7 @@
return res.redirect(
`${frontendOauth}?connection_id=${encodeURIComponent(connectionId)}&status=success`
);
} catch (error: any) {

Check warning on line 2783 in src/actions.ts

View workflow job for this annotation

GitHub Actions / baseline

Unexpected any. Specify a different type
logError("MCPLocalLoginDevStub", req, error);
const status = error?.code === "OAUTH_STATE_INVALID" || error?.statusCode === 400 ? 400 : 401;
return res.status(status).send(
Expand Down Expand Up @@ -2857,7 +2857,7 @@

res.setHeader("Content-Type", "application/json");
return res.json(token);
} catch (error: any) {

Check warning on line 2860 in src/actions.ts

View workflow job for this annotation

GitHub Actions / baseline

Unexpected any. Specify a different type
logError("MCPOAuthToken", req, error);
return res.status(400).json({
error: "invalid_grant",
Expand Down Expand Up @@ -2885,7 +2885,7 @@
});
res.setHeader("Content-Type", "application/json");
return res.status(201).json(reg);
} catch (error: any) {

Check warning on line 2888 in src/actions.ts

View workflow job for this annotation

GitHub Actions / baseline

Unexpected any. Specify a different type
logError("MCPOAuthRegister", req, error);
if (error instanceof OAuthError) {
const oauth = error as { code?: string; message?: string; statusCode?: number };
Expand Down Expand Up @@ -2921,7 +2921,7 @@
const status = await getConnectionStatus(connection_id);

return res.json({ status, connection_id });
} catch (error: any) {

Check warning on line 2924 in src/actions.ts

View workflow job for this annotation

GitHub Actions / baseline

Unexpected any. Specify a different type
logError("MCPOAuthStatus", req, error);
return sendError(res, 500, "DB_QUERY_FAILED", error.message);
}
Expand Down Expand Up @@ -7482,6 +7482,22 @@
entityId: r.entity_id,
});
if (spWarn) schemaStoreWarnings.push(spWarn);

// Issue #1838: SOURCE_PRIORITY_ESCALATION — mirror-image advisory. A
// write at the DEFAULT source_priority (100) into a `highest_priority`
// field silently OUTRANKS any prior observation written with an
// explicit lower priority. Non-blocking; reuses the same import.
const { buildSourcePriorityEscalationWarning } =
await import("./services/source_priority_warning.js");
const spEsc = buildSourcePriorityEscalationWarning({
sourcePriority,
writtenFields: r.fields,
mergePolicies: schemaEntry?.reducer_config?.merge_policies,
observationIndex: r.observation_index,
entityType: r.entity_type,
entityId: r.entity_id,
});
if (spEsc) schemaStoreWarnings.push(spEsc);
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/cli/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14272,7 +14272,13 @@ program
.option("--file-path <path>", "Path to any file to store (unstructured pipeline)")
.option("--file-content <content>", "Inline file content to store")
.option("--user-id <id>", "User ID for the operation")
.option("--source-priority <level>", "Source priority level (default: 100)")
.option(
"--source-priority <level>",
"Source priority level (default: 100). Only affects fields whose reducer policy is " +
"highest_priority (or most_specific with tie_breaker source_priority); ignored elsewhere. " +
"The default 100 silently outranks an explicit lower priority on such fields — always pass " +
"an explicit value for priority-semantic writes. See docs/subsystems/conflict_resolution.md."
)
.option(
"--observation-source <kind>",
"Kind of write: sensor, llm_summary (default), workflow_state, human, or import"
Expand Down Expand Up @@ -14519,7 +14525,13 @@ program
"--file-idempotency-key <key>",
"Optional idempotency key for the source file (default: derived from contents)"
)
.option("--source-priority <level>", "Source priority level (default: 100)")
.option(
"--source-priority <level>",
"Source priority level (default: 100). Only affects fields whose reducer policy is " +
"highest_priority (or most_specific with tie_breaker source_priority); ignored elsewhere. " +
"The default 100 silently outranks an explicit lower priority on such fields — always pass " +
"an explicit value for priority-semantic writes. See docs/subsystems/conflict_resolution.md."
)
.option(
"--observation-source <kind>",
"Kind of write: sensor, llm_summary (default), workflow_state, human, or import"
Expand Down
16 changes: 16 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6255,6 +6255,22 @@ export class NeotomaServer {
entityId: e.entityId,
});
if (spWarn) schemaStoreWarnings.push(spWarn);

// Issue #1838: SOURCE_PRIORITY_ESCALATION — mirror-image advisory. A
// write at the DEFAULT source_priority (100) into a `highest_priority`
// field silently OUTRANKS any prior observation written with an
// explicit lower priority. Non-blocking; reuses the same import.
const { buildSourcePriorityEscalationWarning } =
await import("./services/source_priority_warning.js");
const spEsc = buildSourcePriorityEscalationWarning({
sourcePriority,
writtenFields: entityFields,
mergePolicies: schemaEntry?.reducer_config?.merge_policies,
observationIndex: i,
entityType: e.entityType,
entityId: e.entityId,
});
if (spEsc) schemaStoreWarnings.push(spEsc);
}
}

Expand Down
118 changes: 118 additions & 0 deletions src/services/source_priority_warning.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,121 @@ export function buildSourcePriorityIgnoredWarning(opts: {
entity_id: entityId,
};
}

// ─── SOURCE_PRIORITY_ESCALATION (#1838) ──────────────────────────────────────
//
// The mirror-image footgun of SOURCE_PRIORITY_IGNORED. `source_priority`
// defaults to 100 (see action_schemas.ts). When a caller writes to a field
// governed by a `highest_priority` reducer WITHOUT passing an explicit
// `--source-priority`, the write silently inherits priority 100 — which
// OUTRANKS any prior observation that was written with an explicit lower
// priority (e.g. a trusted import at priority 50). The result is a silent
// trust-escalation: the unprioritized write wins the field.
//
// LIMITATION (#1838): zod applies `.default(100)` before the handler runs, so
// at the warning call site we cannot distinguish "caller omitted
// source_priority" from "caller explicitly passed 100". We therefore warn
// whenever priority === 100 AND a highest_priority field is written, framed as
// "default/non-explicit 100". A caller who genuinely wants priority 100 can
// ignore the advisory; the warning is non-blocking.

/**
* Returns the list of written field names whose merge policy honours
* source_priority (`highest_priority`, or `most_specific` with
* `tie_breaker: "source_priority"`). These are the fields where a default-100
* write can silently outrank an explicit lower priority.
*/
export function priorityHonouringFields(opts: {
writtenFields: Record<string, unknown>;
mergePolicies: ReducerConfig["merge_policies"] | undefined | null;
}): Array<{ field: string; strategy: string }> {
const { writtenFields, mergePolicies } = opts;
const result: Array<{ field: string; strategy: string }> = [];
for (const fieldName of Object.keys(writtenFields)) {
const policy = mergePolicies?.[fieldName];
if (fieldHonorsPriority(policy)) {
result.push({ field: fieldName, strategy: policy?.strategy ?? "highest_priority" });
}
}
return result;
}

/**
* Returns true when a write at the default priority (100) participates in at
* least one field that honours source_priority — i.e. the write could silently
* outrank a prior observation written with an explicit lower priority.
*
* Conditions (all must hold):
* 1. `sourcePriority === 100` — the (possibly-defaulted) value. Because zod
* applies the default before this runs, we cannot tell an omitted value
* from an explicit 100; we warn on both.
* 2. At least one written field's merge policy honours source_priority.
*/
export function sourcePriorityMayEscalate(opts: {
sourcePriority: number;
writtenFields: Record<string, unknown>;
mergePolicies: ReducerConfig["merge_policies"] | undefined | null;
}): boolean {
const { sourcePriority, writtenFields, mergePolicies } = opts;
if (sourcePriority !== 100) return false;
return priorityHonouringFields({ writtenFields, mergePolicies }).length > 0;
}

/**
* The shape of one store_warnings[] entry emitted by the
* SOURCE_PRIORITY_ESCALATION code path. Mirrors SourcePriorityIgnoredWarning.
*/
export type SourcePriorityEscalationWarning = {
code: "SOURCE_PRIORITY_ESCALATION";
message: string;
observation_index: number;
entity_type: string;
entity_id: string;
};

/**
* Returns a structured store_warnings[] entry when a default-priority (100)
* write participates in a `highest_priority` field and could silently outrank
* an explicit lower priority, or `null` when no warning is warranted.
*
* Mitigation surfaced in the message: pass an explicit `--source-priority`
* (CLI) / `source_priority` (API) for any write whose value should be ranked,
* so the relative trust is intentional rather than inherited from the default.
*/
export function buildSourcePriorityEscalationWarning(opts: {
sourcePriority: number;
writtenFields: Record<string, unknown>;
mergePolicies: ReducerConfig["merge_policies"] | undefined | null;
observationIndex: number;
entityType: string;
entityId: string;
}): SourcePriorityEscalationWarning | null {
const { sourcePriority, writtenFields, mergePolicies, observationIndex, entityType, entityId } =
opts;

if (!sourcePriorityMayEscalate({ sourcePriority, writtenFields, mergePolicies })) {
return null;
}

const honouring = priorityHonouringFields({ writtenFields, mergePolicies });
const fieldSummary = honouring
.map(({ field, strategy }) => `'${field}' uses ${strategy}`)
.join(", ");

return {
code: "SOURCE_PRIORITY_ESCALATION",
message:
`${entityType} written at the default source_priority ${sourcePriority} into ` +
`field(s) that rank by priority — ${fieldSummary}. A default/non-explicit ` +
`priority ${sourcePriority} OUTRANKS any prior observation written with an ` +
"explicit lower priority (e.g. a trusted import at 50), silently escalating " +
"this write's trust. Note: an explicit source_priority of 100 is " +
"indistinguishable from the default at write time, so this advisory fires on " +
"both. To make ranking intentional, pass an explicit --source-priority " +
"(CLI) / source_priority (API) reflecting how this write should rank against " +
"others — see docs/subsystems/conflict_resolution.md.",
observation_index: observationIndex,
entity_type: entityType,
entity_id: entityId,
};
}
Loading
Loading