-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat(memory): host-managed memory lifecycle — two-lane retrieval + after-turn record (mem0 flow) #5326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(memory): host-managed memory lifecycle — two-lane retrieval + after-turn record (mem0 flow) #5326
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,65 @@ impl ProductionMemoryPromptContextService { | |
| pub fn new(memory_service: Arc<dyn MemoryService>) -> Self { | ||
| Self { memory_service } | ||
| } | ||
|
|
||
| /// Fetch a single surfacing lane, degrading a retrieval failure to an empty | ||
| /// lane rather than erroring the whole call. Memory is best-effort: a lane | ||
| /// outage must never break the turn, so the error is swallowed here (logged | ||
| /// at `debug!`, never `info!`/`warn!` in this background path) and the caller | ||
| /// continues with the other lane. | ||
| async fn retrieve_lane( | ||
| &self, | ||
| invocation: MemoryInvocation, | ||
| query: String, | ||
| max_snippets: usize, | ||
| context_profile_id: MemoryContextProfileId, | ||
| lane: MemoryLane, | ||
| ) -> Vec<MemoryServiceContextSnippet> { | ||
| match self | ||
| .memory_service | ||
| .retrieve_context( | ||
| invocation, | ||
| MemoryServiceContextRequest { | ||
| query, | ||
| max_snippets, | ||
| context_profile_id, | ||
| }, | ||
| ) | ||
| .await | ||
| { | ||
| Ok(snippets) => snippets, | ||
| // silent-ok: a single lane's retrieval outage degrades that lane to | ||
| // empty so proactive memory never breaks a turn; only the sanitized | ||
| // error kind is logged for diagnosis. | ||
| Err(error) => { | ||
| tracing::debug!( | ||
| lane = lane.as_str(), | ||
| kind = ?error.kind(), | ||
| "memory context lane retrieval failed; degrading lane to empty" | ||
| ); | ||
| Vec::new() | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Which surfacing lane a `retrieve_context` call serves. Carried only so a lane | ||
| /// degradation log line names the lane that failed. | ||
| #[derive(Debug, Clone, Copy)] | ||
| enum MemoryLane { | ||
| /// Active-thread scratch memory (invocation keeps the thread). | ||
| ShortTerm, | ||
| /// User-general memory (invocation clears the thread). | ||
| LongTerm, | ||
| } | ||
|
|
||
| impl MemoryLane { | ||
| fn as_str(self) -> &'static str { | ||
| match self { | ||
| Self::ShortTerm => "short_term", | ||
| Self::LongTerm => "long_term", | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
|
|
@@ -58,7 +117,6 @@ impl MemoryPromptContextService for ProductionMemoryPromptContextService { | |
| if memory_context_disabled(request.context_profile_id.as_str()) { | ||
| return Ok(Vec::new()); | ||
| } | ||
| let invocation = invocation_for_context_request(&request); | ||
| // Capture the request scope up front (before `request.query` is moved | ||
| // below) so admission can reject any snippet a provider returns outside | ||
| // the requested tenant/user/agent/project. | ||
|
|
@@ -67,29 +125,50 @@ impl MemoryPromptContextService for ProductionMemoryPromptContextService { | |
| // construction won't fail in practice — but propagate rather than unwrap. | ||
| let context_profile_id = MemoryContextProfileId::new(request.context_profile_id.as_str()) | ||
| .map_err(map_memory_service_error)?; | ||
| let snippets = self | ||
| .memory_service | ||
| .retrieve_context( | ||
| invocation, | ||
| MemoryServiceContextRequest { | ||
| query: request.query, | ||
| max_snippets: request.max_snippets, | ||
| context_profile_id, | ||
| }, | ||
|
|
||
| // Two lanes, fetched once each (mem0 `on_run_start` shape): | ||
| // short-term: the active thread's scratch memory — invocation keeps the | ||
| // thread, so the native provider restricts to `threads/<T>/`. | ||
| // long-term : the user's general memory — invocation clears the thread, | ||
| // so the native provider excludes any `threads/*` scratch. | ||
| // Concatenate short-term BEFORE long-term so the active conversation wins | ||
| // under the shared aggregate budget enforced over the combined block below. | ||
| let short_term_invocation = invocation_for_context_request(&request); | ||
| let long_term_invocation = MemoryInvocation { | ||
| scope: short_term_invocation.scope.without_thread_and_mission(), | ||
| correlation_id: CorrelationId::new(), | ||
| }; | ||
|
|
||
| let mut combined = self | ||
| .retrieve_lane( | ||
| short_term_invocation, | ||
| request.query.clone(), | ||
| request.max_snippets, | ||
| context_profile_id.clone(), | ||
| MemoryLane::ShortTerm, | ||
| ) | ||
| .await | ||
| .map_err(map_memory_service_error)?; | ||
| .await; | ||
| combined.extend( | ||
| self.retrieve_lane( | ||
| long_term_invocation, | ||
| request.query, | ||
| request.max_snippets, | ||
| context_profile_id, | ||
| MemoryLane::LongTerm, | ||
| ) | ||
| .await, | ||
| ); | ||
|
Comment on lines
+142
to
+160
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The short-term and long-term lane retrievals are independent I/O-bound operations. Running them sequentially adds unnecessary latency to the prompt generation path. We can run them concurrently using let (mut combined, long_term) = tokio::join!(
self.retrieve_lane(
short_term_invocation,
request.query.clone(),
request.max_snippets,
context_profile_id.clone(),
MemoryLane::ShortTerm,
),
self.retrieve_lane(
long_term_invocation,
request.query,
request.max_snippets,
context_profile_id,
MemoryLane::LongTerm,
)
);
combined.extend(long_term); |
||
|
|
||
| // Host-owned admission: hash the reference, sanitize, and wrap each raw | ||
| // candidate, then enforce the per-snippet + aggregate budgets here so the | ||
| // provider can never shape model-visible content. The aggregate budget | ||
| // mirrors the pre-lift provider's `collect_context_snippets`: stop | ||
| // collecting once the next snippet would exceed the ceiling (break, not | ||
| // skip), keeping the model-visible output byte-identical for the native | ||
| // provider. | ||
| // Host-owned admission over the COMBINED list (short-term first): hash the | ||
| // reference, sanitize, and wrap each raw candidate, then enforce the | ||
| // per-snippet + aggregate budgets here so the provider can never shape | ||
| // model-visible content. The aggregate budget mirrors the pre-lift | ||
| // provider's `collect_context_snippets`: stop collecting once the next | ||
| // snippet would exceed the ceiling (break, not skip). The total aggregate | ||
| // byte budget applies to the combined two-lane block. | ||
| let mut admitted = Vec::new(); | ||
| let mut total_bytes = 0usize; | ||
| for snippet in snippets { | ||
| for snippet in combined { | ||
| if admitted.len() >= request.max_snippets { | ||
| break; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reusing the same
correlation_idis important for distributed tracing and log correlation. Creating a newCorrelationIdfor the long-term lane breaks the correlation of the two lanes under the same request.