Skip to content

Commit 2a33ff3

Browse files
enriquephlclaude
andcommitted
feat(memory): dreaming-lite session-end classifier
Background tokio sweeper picks idle, unclassified chat sessions and runs the new memory_extraction LLM task (Haiku 4.5, grok-4-mini fallback) on their chat_messages. Extracted candidates are written to companion_memories as profile-layer rows with category set to one of fact / preference / event / emotion / relation. Sessions get classified_at stamped after a graceful pass to suppress re-sweeps; network errors skip the stamp so they retry on the next tick. Poison-pill sessions (LLM returns garbage) are still stamped to avoid looping. Knobs (env): DREAMING_TICK_SECS (default 300), DREAMING_IDLE_SECS (default 1800), DREAMING_DISABLED=1 to opt out. Migration 0007 adds engine.chat_sessions.classified_at TIMESTAMPTZ. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 36c1412 commit 2a33ff3

9 files changed

Lines changed: 444 additions & 0 deletions

File tree

crates/eros-engine-server/src/main.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,12 @@ async fn run_server() -> Result<()> {
221221
.merge(routes::router(state.clone()))
222222
.split_for_parts();
223223

224+
// Spawn the dreaming-lite sweeper alongside the HTTP service. Returns
225+
// immediately when DREAMING_DISABLED=1 (or tick=0) is set, so unit
226+
// tests and self-hosters who want only synchronous behaviour can opt out.
227+
// Cloned because the next line moves `state` into the router.
228+
tokio::spawn(crate::pipeline::dreaming::sweeper(state.clone()));
229+
224230
let app: Router = open_router
225231
.with_state(state)
226232
.merge(Scalar::with_url("/docs", api))
Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
// SPDX-License-Identifier: AGPL-3.0-only
2+
//! Dreaming-lite: session-end memory extraction sweeper.
3+
//!
4+
//! Background tokio task that scans `engine.chat_sessions` for idle,
5+
//! unclassified sessions and runs the `memory_extraction` LLM task on
6+
//! their `chat_messages`. Extracted candidates with category tags are
7+
//! written to `engine.companion_memories` as profile-layer rows; the
8+
//! session's `classified_at` is then stamped to suppress re-sweeps.
9+
//!
10+
//! Single-instance assumption: with multiple replicas the picker query
11+
//! would need `FOR UPDATE SKIP LOCKED` to avoid double-classifying the
12+
//! same session. OSS v1 ships single-instance.
13+
//!
14+
//! Failure handling:
15+
//! - Network/DB error during pick or stamp → propagate, retry next tick.
16+
//! - LLM call error → propagate (no stamp), retry next tick.
17+
//! - LLM returns garbage / empty parse → stamp anyway so a poison-pill
18+
//! session can't loop the sweeper forever.
19+
20+
use std::time::Duration;
21+
22+
use chrono::Utc;
23+
use serde::Deserialize;
24+
use uuid::Uuid;
25+
26+
use eros_engine_llm::openrouter::{ChatMessage, ChatRequest};
27+
use eros_engine_store::memory::{MemoryLayer, MemoryRepo};
28+
29+
use crate::state::AppState;
30+
31+
const MEMORY_TASK: &str = "memory_extraction";
32+
const PICK_BATCH: i64 = 10;
33+
34+
#[derive(Debug, Deserialize)]
35+
struct MemoryCandidate {
36+
content: String,
37+
category: String,
38+
}
39+
40+
/// Run forever. Spawn this once at server startup. Returns immediately
41+
/// (and never spawns the loop) if `state.config.dreaming_tick` is zero.
42+
pub async fn sweeper(state: AppState) {
43+
let interval = state.config.dreaming_tick;
44+
let idle = state.config.dreaming_idle_threshold;
45+
if interval.is_zero() {
46+
tracing::info!("dreaming sweeper disabled (DREAMING_DISABLED=1 or tick=0)");
47+
return;
48+
}
49+
tracing::info!(?interval, ?idle, "dreaming sweeper starting");
50+
51+
let mut tick = tokio::time::interval(interval);
52+
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
53+
loop {
54+
tick.tick().await;
55+
match scan_and_classify(&state, idle).await {
56+
Ok(0) => {} // quiet — common case on a low-traffic instance
57+
Ok(n) => tracing::info!(processed = n, "dreaming: sessions classified"),
58+
Err(e) => tracing::warn!("dreaming scan failed: {e}"),
59+
}
60+
}
61+
}
62+
63+
/// One sweep tick: pick eligible sessions, classify each in turn.
64+
async fn scan_and_classify(state: &AppState, idle: Duration) -> Result<usize, sqlx::Error> {
65+
let cutoff = Utc::now() - chrono::Duration::from_std(idle).unwrap_or_default();
66+
let sessions: Vec<(Uuid, Uuid, Option<Uuid>)> = sqlx::query_as(
67+
"SELECT id, user_id, instance_id FROM engine.chat_sessions \
68+
WHERE classified_at IS NULL AND last_active_at < $1 \
69+
ORDER BY last_active_at \
70+
LIMIT $2",
71+
)
72+
.bind(cutoff)
73+
.bind(PICK_BATCH)
74+
.fetch_all(&state.pool)
75+
.await?;
76+
77+
let mut count = 0;
78+
for (session_id, user_id, instance_id) in sessions {
79+
match classify_session(state, session_id, user_id, instance_id).await {
80+
Ok(written) => {
81+
tracing::info!(%session_id, written, "dreaming: session classified");
82+
count += 1;
83+
}
84+
Err(e) => tracing::warn!(%session_id, "dreaming: classify failed: {e}"),
85+
}
86+
}
87+
Ok(count)
88+
}
89+
90+
/// Classify one session. Returns the number of memory rows written.
91+
/// Stamps `classified_at` on a graceful pass (including empty extraction)
92+
/// so the picker doesn't see this session again. Network-level errors
93+
/// propagate and skip the stamp so they retry next tick.
94+
async fn classify_session(
95+
state: &AppState,
96+
session_id: Uuid,
97+
user_id: Uuid,
98+
instance_id: Option<Uuid>,
99+
) -> Result<usize, String> {
100+
// 1. Pull the conversation log. We use chat_messages (the canonical
101+
// turn record) rather than the formatted companion_memories rows so
102+
// the LLM doesn't see the "用户:X\nAI:Y" wrapper twice.
103+
let rows: Vec<(String, String)> = sqlx::query_as(
104+
"SELECT role, content FROM engine.chat_messages \
105+
WHERE session_id = $1 AND role IN ('user', 'assistant') \
106+
ORDER BY sent_at",
107+
)
108+
.bind(session_id)
109+
.fetch_all(&state.pool)
110+
.await
111+
.map_err(|e| format!("load chat_messages failed: {e}"))?;
112+
113+
if rows.is_empty() {
114+
mark_classified(&state.pool, session_id)
115+
.await
116+
.map_err(|e| format!("mark classified (empty session): {e}"))?;
117+
return Ok(0);
118+
}
119+
120+
let turns: Vec<String> = rows
121+
.into_iter()
122+
.map(|(role, content)| {
123+
let label = if role == "user" { "用户" } else { "AI" };
124+
format!("{label}:{content}")
125+
})
126+
.collect();
127+
128+
// 2. Single LLM call, structured-JSON output.
129+
let prompt = crate::prompt::extract_memories_prompt(&turns);
130+
let resolved = state.model_config.resolve(MEMORY_TASK, None);
131+
let req = ChatRequest {
132+
model: resolved.model,
133+
fallback_model: resolved.fallback_model,
134+
messages: vec![ChatMessage {
135+
role: "user".into(),
136+
content: prompt,
137+
}],
138+
temperature: resolved.temperature as f32,
139+
max_tokens: resolved.max_tokens,
140+
};
141+
let raw = state
142+
.openrouter
143+
.execute(req)
144+
.await
145+
.map_err(|e| format!("memory_extraction LLM call failed: {e}"))?;
146+
147+
let candidates = parse_memory_candidates(&raw.reply);
148+
149+
// 3. Embed + insert each candidate as a profile-layer row.
150+
// Profile layer is the right home for these — they're stable facts
151+
// about the user that should be visible across persona instances.
152+
let repo = MemoryRepo { pool: &state.pool };
153+
let mut written = 0;
154+
for cand in &candidates {
155+
let trimmed = cand.content.trim();
156+
if trimmed.is_empty() {
157+
continue;
158+
}
159+
let category = normalise_category(&cand.category);
160+
match state.voyage.embed_document(trimmed).await {
161+
Ok(embedding) => {
162+
if let Err(e) = repo
163+
.upsert(
164+
MemoryLayer::Profile,
165+
session_id,
166+
user_id,
167+
instance_id,
168+
trimmed,
169+
&embedding,
170+
Some(&category),
171+
)
172+
.await
173+
{
174+
tracing::warn!(%session_id, "dreaming: insert failed: {e}");
175+
} else {
176+
written += 1;
177+
}
178+
}
179+
Err(e) => {
180+
tracing::warn!(%session_id, "dreaming: voyage embed failed: {e}");
181+
}
182+
}
183+
}
184+
185+
// 4. Stamp on graceful completion. Even when `written == 0` we stamp,
186+
// because either the session genuinely had nothing memorable or the
187+
// model returned junk — both cases should not loop the sweeper.
188+
mark_classified(&state.pool, session_id)
189+
.await
190+
.map_err(|e| format!("mark classified (post-success): {e}"))?;
191+
Ok(written)
192+
}
193+
194+
async fn mark_classified(pool: &sqlx::PgPool, session_id: Uuid) -> Result<(), sqlx::Error> {
195+
sqlx::query("UPDATE engine.chat_sessions SET classified_at = now() WHERE id = $1")
196+
.bind(session_id)
197+
.execute(pool)
198+
.await?;
199+
Ok(())
200+
}
201+
202+
/// Walk forward from the first `{` and return the substring up to its
203+
/// balanced `}`, ignoring braces inside string literals. Mirrors the
204+
/// helper in `post_process.rs`; kept private to this module so the
205+
/// extraction-vs-classification parsing stays decoupled.
206+
fn find_json_block(raw: &str) -> Option<&str> {
207+
let bytes = raw.as_bytes();
208+
let start = bytes.iter().position(|&b| b == b'{')?;
209+
let mut depth = 0_i32;
210+
let mut in_string = false;
211+
let mut escape = false;
212+
for (i, &b) in bytes.iter().enumerate().skip(start) {
213+
if in_string {
214+
if escape {
215+
escape = false;
216+
} else if b == b'\\' {
217+
escape = true;
218+
} else if b == b'"' {
219+
in_string = false;
220+
}
221+
continue;
222+
}
223+
match b {
224+
b'"' => in_string = true,
225+
b'{' => depth += 1,
226+
b'}' => {
227+
depth -= 1;
228+
if depth == 0 {
229+
return Some(&raw[start..=i]);
230+
}
231+
}
232+
_ => {}
233+
}
234+
}
235+
None
236+
}
237+
238+
fn parse_memory_candidates(raw: &str) -> Vec<MemoryCandidate> {
239+
if let Ok(v) = serde_json::from_str::<serde_json::Value>(raw) {
240+
return extract_memory_array(&v);
241+
}
242+
if let Some(block) = find_json_block(raw) {
243+
if let Ok(v) = serde_json::from_str::<serde_json::Value>(block) {
244+
return extract_memory_array(&v);
245+
}
246+
}
247+
vec![]
248+
}
249+
250+
fn extract_memory_array(v: &serde_json::Value) -> Vec<MemoryCandidate> {
251+
v.get("memories")
252+
.and_then(|a| a.as_array())
253+
.map(|arr| {
254+
arr.iter()
255+
.filter_map(|x| serde_json::from_value::<MemoryCandidate>(x.clone()).ok())
256+
.collect()
257+
})
258+
.unwrap_or_default()
259+
}
260+
261+
/// Restrict to the documented vocabulary. Anything else collapses to "fact" —
262+
/// the prompt asks for one of these five but the model occasionally
263+
/// invents new categories; we'd rather have a coarse but valid tag than
264+
/// a high-cardinality mess.
265+
fn normalise_category(raw: &str) -> String {
266+
match raw.trim().to_ascii_lowercase().as_str() {
267+
s @ ("fact" | "preference" | "event" | "emotion" | "relation") => s.into(),
268+
_ => "fact".into(),
269+
}
270+
}
271+
272+
#[cfg(test)]
273+
mod tests {
274+
use super::*;
275+
276+
#[test]
277+
fn parse_memory_candidates_handles_clean_json() {
278+
let raw = r#"{"memories":[{"content":"住在上海","category":"fact"},
279+
{"content":"喜欢咖啡","category":"preference"}]}"#;
280+
let cands = parse_memory_candidates(raw);
281+
assert_eq!(cands.len(), 2);
282+
assert_eq!(cands[0].content, "住在上海");
283+
assert_eq!(cands[0].category, "fact");
284+
assert_eq!(cands[1].content, "喜欢咖啡");
285+
assert_eq!(cands[1].category, "preference");
286+
}
287+
288+
#[test]
289+
fn parse_memory_candidates_handles_fenced_block() {
290+
let raw = "Sure, here you go:\n```json\n\
291+
{\"memories\":[{\"content\":\"养了一只猫\",\"category\":\"fact\"}]}\n\
292+
```";
293+
let cands = parse_memory_candidates(raw);
294+
assert_eq!(cands.len(), 1);
295+
assert_eq!(cands[0].content, "养了一只猫");
296+
}
297+
298+
#[test]
299+
fn parse_memory_candidates_returns_empty_on_garbage() {
300+
assert!(parse_memory_candidates("nope, no json").is_empty());
301+
assert!(parse_memory_candidates(r#"{"facts":[]}"#).is_empty());
302+
}
303+
304+
#[test]
305+
fn parse_memory_candidates_skips_malformed_items() {
306+
// Missing `category` on second item — should drop just that one.
307+
let raw = r#"{"memories":[
308+
{"content":"a","category":"fact"},
309+
{"content":"b"},
310+
{"content":"c","category":"event"}
311+
]}"#;
312+
let cands = parse_memory_candidates(raw);
313+
assert_eq!(cands.len(), 2);
314+
assert_eq!(cands[0].content, "a");
315+
assert_eq!(cands[1].content, "c");
316+
}
317+
318+
#[test]
319+
fn normalise_category_passes_known_values() {
320+
assert_eq!(normalise_category("fact"), "fact");
321+
assert_eq!(normalise_category("PREFERENCE"), "preference");
322+
assert_eq!(normalise_category(" Event "), "event");
323+
assert_eq!(normalise_category("emotion"), "emotion");
324+
assert_eq!(normalise_category("relation"), "relation");
325+
}
326+
327+
#[test]
328+
fn normalise_category_collapses_unknowns_to_fact() {
329+
assert_eq!(normalise_category("opinion"), "fact");
330+
assert_eq!(normalise_category(""), "fact");
331+
assert_eq!(normalise_category("分类"), "fact");
332+
}
333+
334+
#[test]
335+
fn find_json_block_balanced_with_string_braces() {
336+
let raw = r#"prefix {"a": "b}c", "d": 1} trailing"#;
337+
let block = find_json_block(raw).unwrap();
338+
let v: serde_json::Value = serde_json::from_str(block).unwrap();
339+
assert_eq!(v["a"], "b}c");
340+
assert_eq!(v["d"], 1);
341+
}
342+
}

crates/eros-engine-server/src/pipeline/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//! through `persist_with_event`.
1313
//! - `state.chat_engine` → `state.openrouter`.
1414
15+
pub mod dreaming;
1516
pub mod handlers;
1617
pub mod post_process;
1718

0 commit comments

Comments
 (0)