Skip to content

Commit beaeccc

Browse files
committed
feat(sensorium): add bounded conscious aperture
1 parent 66fc732 commit beaeccc

20 files changed

Lines changed: 2419 additions & 24 deletions
Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
"""Bounded Conscious aperture over internal conscious_task candidates.
2+
3+
The aperture is a small deterministic state transition: it selects pending
4+
Subconscious advisory candidates that carry ``conscious_task`` payloads and marks
5+
at most one bounded set as currently open for Conscious review. It does not
6+
prepare worker requests, dispatch agents, send messages, or perform the final
7+
Conscious decision itself.
8+
"""
9+
10+
from __future__ import annotations
11+
12+
from datetime import datetime, timezone
13+
from typing import Any
14+
15+
from .schemas import new_id, truncate_text, utc_now_iso
16+
from .store import SensoriumStore
17+
18+
OPEN_STATUS = "in_conscious_aperture"
19+
PENDING_STATUS = "candidate"
20+
CONSCIOUS_KIND = "subconscious_advisory"
21+
DEFAULT_APERTURE_SIZE = 5
22+
DEFAULT_STALE_AFTER_MINUTES = 180
23+
VALID_SETTLEMENT_DECISIONS = {"REVIEWED", "HELD", "SETTLED", "PREPARED_EXTERNAL_WORK"}
24+
SETTLEMENT_STATUS = {
25+
"REVIEWED": "reviewed",
26+
"HELD": "held",
27+
"SETTLED": "reviewed",
28+
"PREPARED_EXTERNAL_WORK": "prepared_external_work",
29+
}
30+
31+
32+
def _parse_iso(ts: str | None) -> datetime | None:
33+
if not isinstance(ts, str) or not ts.strip():
34+
return None
35+
value = ts.strip()
36+
if value.endswith("Z"):
37+
value = value[:-1] + "+00:00"
38+
try:
39+
parsed = datetime.fromisoformat(value)
40+
except ValueError:
41+
return None
42+
if parsed.tzinfo is None:
43+
parsed = parsed.replace(tzinfo=timezone.utc)
44+
return parsed.astimezone(timezone.utc)
45+
46+
47+
def _is_stale_active(candidate: dict, *, now: datetime, stale_after_minutes: int) -> bool:
48+
aperture = candidate.get("conscious_aperture") or {}
49+
opened = _parse_iso(aperture.get("opened_at") or candidate.get("updated_at"))
50+
if opened is None:
51+
return False
52+
age_minutes = (now - opened).total_seconds() / 60.0
53+
return age_minutes >= max(1, stale_after_minutes)
54+
55+
56+
def _is_pending_conscious_task(candidate: dict) -> bool:
57+
return (
58+
candidate.get("status") == PENDING_STATUS
59+
and candidate.get("kind") == CONSCIOUS_KIND
60+
and isinstance(candidate.get("conscious_task"), dict)
61+
)
62+
63+
64+
def _task_type_priority(candidate: dict) -> int:
65+
task = candidate.get("conscious_task") or {}
66+
request_type = str(task.get("request_type") or "").upper()
67+
# Lower number = earlier in the aperture.
68+
return {
69+
"UPDATE_MEMORY_OR_SKILL": 0,
70+
"SAVE": 1,
71+
"CREATE_FOLLOWUP": 2,
72+
"DELEGATE_WORK": 3,
73+
"PRIVATE_EXPRESSION": 4,
74+
"THINK": 5,
75+
}.get(request_type, 6)
76+
77+
78+
def _candidate_sort_key(candidate: dict) -> tuple:
79+
# Prefer high pressure and action-oriented tasks, then older candidates.
80+
try:
81+
pressure = float(candidate.get("pressure") or 0.0)
82+
except (TypeError, ValueError):
83+
pressure = 0.0
84+
return (-pressure, _task_type_priority(candidate), str(candidate.get("created_at") or ""), str(candidate.get("id") or ""))
85+
86+
87+
def _aperture_item(candidate: dict) -> dict:
88+
task = candidate.get("conscious_task") or {}
89+
return {
90+
"candidate_id": candidate.get("id"),
91+
"summary": truncate_text(candidate.get("summary", ""), 220),
92+
"pressure": candidate.get("pressure"),
93+
"created_at": candidate.get("created_at", ""),
94+
"event_ids": list(candidate.get("event_ids") or []),
95+
"source_candidate_ids": list(candidate.get("source_candidate_ids") or []),
96+
"correlation_keys": list(candidate.get("correlation_keys") or []),
97+
"sensitivity": candidate.get("sensitivity", "private"),
98+
"allowed_surfaces": list(candidate.get("allowed_surfaces") or ["local"]),
99+
"conscious_task": {
100+
"id": task.get("id", ""),
101+
"request_type": task.get("request_type", ""),
102+
"title": task.get("title", ""),
103+
"why": task.get("why", ""),
104+
"expected_decision": task.get("expected_decision", ""),
105+
},
106+
"advisory_meta": dict(candidate.get("advisory_meta") or {}),
107+
}
108+
109+
110+
def open_conscious_aperture(
111+
store: SensoriumStore,
112+
*,
113+
aperture_size: int = DEFAULT_APERTURE_SIZE,
114+
max_active_sessions: int = 1,
115+
stale_after_minutes: int = DEFAULT_STALE_AFTER_MINUTES,
116+
dry_run: bool = True,
117+
now: str | None = None,
118+
) -> dict:
119+
"""Open one bounded Conscious aperture over pending internal tasks.
120+
121+
Returns a compact packet for a Conscious session. With ``dry_run=False`` the
122+
selected candidates are marked ``in_conscious_aperture`` and a decision
123+
receipt is appended. No worker request is prepared or dispatched.
124+
"""
125+
store.ensure_dirs()
126+
now_iso = now or utc_now_iso()
127+
now_dt = _parse_iso(now_iso) or datetime.now(timezone.utc)
128+
size = max(1, int(aperture_size or DEFAULT_APERTURE_SIZE))
129+
active_limit = max(1, int(max_active_sessions or 1))
130+
131+
candidates = store.read_jsonl("candidates")
132+
active = [
133+
c for c in candidates
134+
if c.get("status") == OPEN_STATUS
135+
and isinstance(c.get("conscious_task"), dict)
136+
and not _is_stale_active(c, now=now_dt, stale_after_minutes=stale_after_minutes)
137+
]
138+
stale_active = [
139+
c for c in candidates
140+
if c.get("status") == OPEN_STATUS
141+
and isinstance(c.get("conscious_task"), dict)
142+
and _is_stale_active(c, now=now_dt, stale_after_minutes=stale_after_minutes)
143+
]
144+
145+
if len(active) >= active_limit:
146+
return {
147+
"success": True,
148+
"action": "active_aperture_exists",
149+
"dry_run": dry_run,
150+
"active_count": len(active),
151+
"active_candidate_ids": [c.get("id") for c in active],
152+
"stale_active_candidate_ids": [c.get("id") for c in stale_active],
153+
"aperture": [_aperture_item(c) for c in sorted(active, key=_candidate_sort_key)[:size]],
154+
}
155+
156+
pending = sorted([c for c in candidates if _is_pending_conscious_task(c)], key=_candidate_sort_key)
157+
selected = pending[:size]
158+
aperture_id = new_id("cap")
159+
packet = {
160+
"success": True,
161+
"action": "would_open_aperture" if dry_run else "opened_aperture",
162+
"dry_run": dry_run,
163+
"aperture_id": aperture_id,
164+
"opened_at": now_iso,
165+
"aperture_size": size,
166+
"selected_count": len(selected),
167+
"pending_count": len(pending),
168+
"active_count": len(active),
169+
"stale_active_candidate_ids": [c.get("id") for c in stale_active],
170+
"candidate_ids": [c.get("id") for c in selected],
171+
"aperture": [_aperture_item(c) for c in selected],
172+
"instructions": {
173+
"settle_each_item": "Record a conscious.aperture.settled receipt for each item after Conscious decides.",
174+
"worker_requests": "Prepare worker_requests only for decisions that require external/durable execution.",
175+
},
176+
}
177+
if dry_run or not selected:
178+
return packet
179+
180+
selected_ids = set(packet["candidate_ids"])
181+
rewritten: list[dict] = []
182+
for candidate in candidates:
183+
if candidate.get("id") in selected_ids:
184+
updated = dict(candidate)
185+
updated["status"] = OPEN_STATUS
186+
updated["updated_at"] = now_iso
187+
updated["conscious_aperture"] = {
188+
"id": aperture_id,
189+
"opened_at": now_iso,
190+
"state": "open",
191+
}
192+
rewritten.append(updated)
193+
else:
194+
rewritten.append(candidate)
195+
store.rewrite_jsonl("candidates", rewritten)
196+
store.append_jsonl("decisions", {
197+
"ts": now_iso,
198+
"type": "conscious.aperture.opened",
199+
"aperture_id": aperture_id,
200+
"candidate_ids": packet["candidate_ids"],
201+
"selected_count": len(selected),
202+
"pending_count": len(pending),
203+
"max_active_sessions": active_limit,
204+
"aperture_size": size,
205+
})
206+
return packet
207+
208+
209+
def _find_candidate_index(candidates: list[dict], candidate_id: str) -> int | None:
210+
for idx, candidate in enumerate(candidates):
211+
if candidate.get("id") == candidate_id:
212+
return idx
213+
return None
214+
215+
216+
def _existing_settlement(decisions: list[dict], *, candidate_id: str, aperture_id: str, decision: str) -> dict | None:
217+
for receipt in reversed(decisions):
218+
if receipt.get("type") != "conscious.aperture.settled":
219+
continue
220+
if receipt.get("candidate_id") != candidate_id:
221+
continue
222+
if aperture_id and receipt.get("aperture_id") != aperture_id:
223+
continue
224+
if receipt.get("decision") == decision:
225+
return receipt
226+
return None
227+
228+
229+
def settle_conscious_aperture_item(
230+
store: SensoriumStore,
231+
*,
232+
candidate_id: str,
233+
decision: str,
234+
reason: str,
235+
aperture_id: str | None = None,
236+
external_work: dict | None = None,
237+
dry_run: bool = True,
238+
now: str | None = None,
239+
) -> dict:
240+
"""Settle one candidate currently opened in the Conscious aperture.
241+
242+
Settlement is a state/receipt transition only. ``external_work`` is recorded
243+
as a prepared specification for later routing; this function does not append
244+
to ``worker_requests`` and never dispatches.
245+
"""
246+
store.ensure_dirs()
247+
candidate_id = str(candidate_id or "").strip()
248+
normalized_decision = str(decision or "").strip().upper()
249+
if not candidate_id:
250+
return {"success": False, "error": "candidate_id_required"}
251+
if normalized_decision not in VALID_SETTLEMENT_DECISIONS:
252+
return {
253+
"success": False,
254+
"error": "invalid_decision",
255+
"valid_decisions": sorted(VALID_SETTLEMENT_DECISIONS),
256+
}
257+
if not str(reason or "").strip():
258+
return {"success": False, "error": "reason_required"}
259+
260+
candidates = store.read_jsonl("candidates")
261+
idx = _find_candidate_index(candidates, candidate_id)
262+
if idx is None:
263+
return {"success": False, "error": "candidate_not_found", "candidate_id": candidate_id}
264+
candidate = candidates[idx]
265+
current_aperture = candidate.get("conscious_aperture") or {}
266+
actual_aperture_id = str(aperture_id or current_aperture.get("id") or "").strip()
267+
268+
existing = _existing_settlement(
269+
store.read_jsonl("decisions"),
270+
candidate_id=candidate_id,
271+
aperture_id=actual_aperture_id,
272+
decision=normalized_decision,
273+
)
274+
if existing is not None:
275+
return {
276+
"success": True,
277+
"action": "already_settled",
278+
"dry_run": dry_run,
279+
"candidate_id": candidate_id,
280+
"aperture_id": actual_aperture_id,
281+
"receipt": existing,
282+
}
283+
284+
if candidate.get("status") != OPEN_STATUS:
285+
return {
286+
"success": False,
287+
"error": "candidate_not_in_conscious_aperture",
288+
"candidate_id": candidate_id,
289+
"status": candidate.get("status"),
290+
}
291+
if aperture_id and current_aperture.get("id") != aperture_id:
292+
return {
293+
"success": False,
294+
"error": "aperture_id_mismatch",
295+
"candidate_id": candidate_id,
296+
"expected_aperture_id": current_aperture.get("id"),
297+
"aperture_id": aperture_id,
298+
}
299+
300+
now_iso = now or utc_now_iso()
301+
receipt = {
302+
"ts": now_iso,
303+
"type": "conscious.aperture.settled",
304+
"candidate_id": candidate_id,
305+
"aperture_id": actual_aperture_id,
306+
"decision": normalized_decision,
307+
"new_status": SETTLEMENT_STATUS[normalized_decision],
308+
"reason": truncate_text(reason, 500),
309+
"conscious_task_id": (candidate.get("conscious_task") or {}).get("id", ""),
310+
"request_type": (candidate.get("conscious_task") or {}).get("request_type", ""),
311+
}
312+
if external_work:
313+
receipt["external_work"] = {
314+
"title": truncate_text(external_work.get("title", ""), 200),
315+
"summary": truncate_text(external_work.get("summary", ""), 1200),
316+
"worker_type": truncate_text(external_work.get("worker_type", "kanban_task"), 80),
317+
"profile": dict(external_work.get("profile") or {}),
318+
"target": dict(external_work.get("target") or {}),
319+
}
320+
321+
if dry_run:
322+
return {
323+
"success": True,
324+
"action": "would_settle_aperture_item",
325+
"dry_run": True,
326+
"candidate_id": candidate_id,
327+
"aperture_id": actual_aperture_id,
328+
"receipt_preview": receipt,
329+
}
330+
331+
updated = dict(candidate)
332+
updated["status"] = SETTLEMENT_STATUS[normalized_decision]
333+
updated["updated_at"] = now_iso
334+
updated_aperture = dict(current_aperture)
335+
updated_aperture.update({
336+
"state": "settled",
337+
"settled_at": now_iso,
338+
"decision": normalized_decision,
339+
"reason": truncate_text(reason, 240),
340+
})
341+
updated["conscious_aperture"] = updated_aperture
342+
updated.setdefault("conscious_settlements", []).append(receipt)
343+
candidates[idx] = updated
344+
store.rewrite_jsonl("candidates", candidates)
345+
store.append_jsonl("decisions", receipt)
346+
return {
347+
"success": True,
348+
"action": "settled_aperture_item",
349+
"dry_run": False,
350+
"candidate_id": candidate_id,
351+
"aperture_id": actual_aperture_id,
352+
"new_status": updated["status"],
353+
"receipt": receipt,
354+
}

agent_sensorium/settlement.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,21 @@ def _normalize_conscious_task_ref(ref: Any) -> dict:
138138
if not isinstance(ref, dict):
139139
return {}
140140
out: dict[str, str] = {}
141-
for key in ("task_id", "thread_id", "board", "kanban_task_id", "promoted_at"):
141+
# Preserve both legacy Kanban task refs and the newer internal
142+
# conscious_task candidate refs. Conscious promotion is no longer required
143+
# to mean "spawn a conscious:review Kanban worker"; a settlement may point
144+
# at an internal candidate/thread that the bounded Conscious aperture will
145+
# inspect later.
146+
for key in (
147+
"task_id",
148+
"thread_id",
149+
"board",
150+
"kanban_task_id",
151+
"candidate_id",
152+
"conscious_task_id",
153+
"kind",
154+
"promoted_at",
155+
):
142156
value = ref.get(key)
143157
if isinstance(value, str) and value.strip():
144158
out[key] = truncate_text(value, 200)

agent_sensorium/subconscious.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def is_advisory_source_kind(kind: str | None) -> bool:
6363
"default_pressure": 0.66,
6464
"model_enabled": False,
6565
"model_provider": "minimax",
66-
"model": "MiniMax-M2.5",
66+
"model": "MiniMax-M3",
6767
"model_base_url": "https://api.minimax.io/v1",
6868
"model_api_key_env": "MINIMAX_API_KEY",
6969
"model_api_key": None,

0 commit comments

Comments
 (0)