1818
1919use async_trait:: async_trait;
2020use serde_json:: Value ;
21+ use sqlx:: PgPool ;
2122use uuid:: Uuid ;
2223
2324use eros_engine_core:: affinity:: AffinityDeltas ;
@@ -101,10 +102,11 @@ fn assemble_chat_request(
101102
102103// ─── Memory recall + insight injection helpers ────────────────────
103104
104- /// Embed `query_text` once, then fan out to profile + relationship layers
105- /// in parallel. All errors degrade silently to empty vecs — recall failure
106- /// must never block a chat reply (the persona just looks slightly less
107- /// "with it" for that turn).
105+ /// Embed `query_text` once, then delegate to `recall_memory_with_embedding`.
106+ /// Empty query → returns (empty, empty) without hitting Voyage. Voyage
107+ /// failure also degrades silently to (empty, empty) — recall failure must
108+ /// never block a chat reply (the persona just looks slightly less "with
109+ /// it" for that turn).
108110async fn recall_memory (
109111 state : & AppState ,
110112 user_id : Uuid ,
@@ -121,15 +123,23 @@ async fn recall_memory(
121123 return ( vec ! [ ] , vec ! [ ] ) ;
122124 }
123125 } ;
124- let repo = MemoryRepo { pool : & state. pool } ;
126+ recall_memory_with_embedding ( & state. pool , user_id, instance_id, & embedding) . await
127+ }
128+
129+ /// Pure-DB inner half of memory recall. Takes a pre-computed embedding,
130+ /// fans out to profile + relationship layers in parallel via `tokio::join!`,
131+ /// and returns each layer's hits as `Vec<String>`. Split out from
132+ /// `recall_memory` so integration tests don't need a live Voyage client.
133+ async fn recall_memory_with_embedding (
134+ pool : & PgPool ,
135+ user_id : Uuid ,
136+ instance_id : Uuid ,
137+ embedding : & [ f32 ] ,
138+ ) -> ( Vec < String > , Vec < String > ) {
139+ let repo = MemoryRepo { pool } ;
125140 let ( profile_res, rel_res) = tokio:: join!(
126- repo. search( user_id, None , & embedding, PROFILE_RECALL_K ) ,
127- repo. search(
128- user_id,
129- Some ( instance_id) ,
130- & embedding,
131- RELATIONSHIP_RECALL_K ,
132- ) ,
141+ repo. search( user_id, None , embedding, PROFILE_RECALL_K ) ,
142+ repo. search( user_id, Some ( instance_id) , embedding, RELATIONSHIP_RECALL_K ) ,
133143 ) ;
134144 let profile = match profile_res {
135145 Ok ( rows) => rows. into_iter ( ) . map ( |r| r. content ) . collect ( ) ,
@@ -150,9 +160,11 @@ async fn recall_memory(
150160
151161/// Load `companion_insights` for the user and render the structured fields
152162/// as Chinese-language bullets that fit naturally into the
153- /// `【你对他的了解(通用画像)】` prompt section.
154- async fn load_insight_bullets ( state : & AppState , user_id : Uuid ) -> Vec < String > {
155- let repo = InsightRepo { pool : & state. pool } ;
163+ /// `【你对他的了解(通用画像)】` prompt section. Takes `&PgPool` directly
164+ /// (not `&AppState`) so it's reachable from sqlx integration tests without
165+ /// constructing the full state.
166+ async fn load_insight_bullets ( pool : & PgPool , user_id : Uuid ) -> Vec < String > {
167+ let repo = InsightRepo { pool } ;
156168 let row = match repo. load ( user_id) . await {
157169 Ok ( Some ( row) ) => row,
158170 Ok ( None ) => return vec ! [ ] ,
@@ -243,7 +255,7 @@ impl<'a> ActionHandler for ReplyHandler<'a> {
243255 // T14: prepend structured insights so the LLM sees both the JSONB
244256 // profile (e.g. city/MBTI) and the pgvector profile-layer recalls
245257 // in the same `【你对他的了解(通用画像)】` section.
246- let insight_bullets = load_insight_bullets ( self . state , self . user_id ) . await ;
258+ let insight_bullets = load_insight_bullets ( & self . state . pool , self . user_id ) . await ;
247259 if !insight_bullets. is_empty ( ) {
248260 let mut combined = insight_bullets;
249261 combined. append ( & mut profile_facts) ;
@@ -354,7 +366,7 @@ impl<'a> ActionHandler for GiftHandler<'a> {
354366 let ( mut profile_facts, relationship_facts) =
355367 recall_memory ( self . state , self . user_id , self . instance_id , query_text) . await ;
356368
357- let insight_bullets = load_insight_bullets ( self . state , self . user_id ) . await ;
369+ let insight_bullets = load_insight_bullets ( & self . state . pool , self . user_id ) . await ;
358370 if !insight_bullets. is_empty ( ) {
359371 let mut combined = insight_bullets;
360372 combined. append ( & mut profile_facts) ;
@@ -515,4 +527,225 @@ mod tests {
515527 ]
516528 ) ;
517529 }
530+
531+ // ─── Integration tests: recall_memory_with_embedding + load_insight_bullets ───
532+ //
533+ // These exercise the pure-DB halves of the recall pipeline against a
534+ // live Postgres (via `#[sqlx::test]`). The Voyage-dependent outer
535+ // wrapper `recall_memory` is intentionally not tested here — it would
536+ // either need a live Voyage key or a trait-mock indirection that
537+ // doesn't justify its weight for a single thin function.
538+
539+ use eros_engine_store:: insight:: InsightRepo ;
540+ use eros_engine_store:: memory:: { MemoryLayer , MemoryRepo } ;
541+ use sqlx:: PgPool ;
542+
543+ /// Deterministic 512-dim "unit" vector with a single hot index. Two
544+ /// different seeds produce orthogonal vectors → cosine distance = 1.0;
545+ /// same seed → distance = 0.0. Lets us prove nearest-neighbour ordering
546+ /// without floating-point fuzz.
547+ fn unit_embedding ( seed : usize ) -> Vec < f32 > {
548+ let mut v = vec ! [ 0.0_f32 ; 512 ] ;
549+ v[ seed % 512 ] = 1.0 ;
550+ v
551+ }
552+
553+ async fn make_session ( pool : & PgPool , user_id : Uuid , instance_id : Option < Uuid > ) -> Uuid {
554+ sqlx:: query_scalar :: < _ , Uuid > (
555+ "INSERT INTO engine.chat_sessions (user_id, instance_id) \
556+ VALUES ($1, $2) RETURNING id",
557+ )
558+ . bind ( user_id)
559+ . bind ( instance_id)
560+ . fetch_one ( pool)
561+ . await
562+ . unwrap ( )
563+ }
564+
565+ #[ sqlx:: test( migrations = "../eros-engine-store/migrations" ) ]
566+ async fn recall_memory_with_embedding_empty_db_returns_empty ( pool : PgPool ) {
567+ let user_id = Uuid :: new_v4 ( ) ;
568+ let instance_id = Uuid :: new_v4 ( ) ;
569+ let ( profile, relationship) =
570+ recall_memory_with_embedding ( & pool, user_id, instance_id, & unit_embedding ( 7 ) ) . await ;
571+ assert ! ( profile. is_empty( ) ) ;
572+ assert ! ( relationship. is_empty( ) ) ;
573+ }
574+
575+ #[ sqlx:: test( migrations = "../eros-engine-store/migrations" ) ]
576+ async fn recall_memory_with_embedding_isolates_layers ( pool : PgPool ) {
577+ let user_id = Uuid :: new_v4 ( ) ;
578+ let instance_id = Uuid :: new_v4 ( ) ;
579+ let session_id = make_session ( & pool, user_id, Some ( instance_id) ) . await ;
580+ let repo = MemoryRepo { pool : & pool } ;
581+
582+ // Same content text + same seed embedding written to BOTH layers
583+ // — differentiated only by instance_id presence.
584+ repo. upsert (
585+ MemoryLayer :: Profile ,
586+ session_id,
587+ user_id,
588+ None ,
589+ "profile fact" ,
590+ & unit_embedding ( 11 ) ,
591+ )
592+ . await
593+ . unwrap ( ) ;
594+ repo. upsert (
595+ MemoryLayer :: Relationship ,
596+ session_id,
597+ user_id,
598+ Some ( instance_id) ,
599+ "relationship fact" ,
600+ & unit_embedding ( 11 ) ,
601+ )
602+ . await
603+ . unwrap ( ) ;
604+
605+ let ( profile, relationship) =
606+ recall_memory_with_embedding ( & pool, user_id, instance_id, & unit_embedding ( 11 ) ) . await ;
607+ assert_eq ! ( profile, vec![ "profile fact" . to_string( ) ] ) ;
608+ assert_eq ! ( relationship, vec![ "relationship fact" . to_string( ) ] ) ;
609+ }
610+
611+ #[ sqlx:: test( migrations = "../eros-engine-store/migrations" ) ]
612+ async fn recall_memory_with_embedding_respects_top_k ( pool : PgPool ) {
613+ let user_id = Uuid :: new_v4 ( ) ;
614+ let instance_id = Uuid :: new_v4 ( ) ;
615+ let session_id = make_session ( & pool, user_id, Some ( instance_id) ) . await ;
616+ let repo = MemoryRepo { pool : & pool } ;
617+
618+ // Insert 6 profile rows (K=4) and 5 relationship rows (K=3) with
619+ // distinct embeddings so cosine ordering is well-defined.
620+ for i in 0 ..6 {
621+ repo. upsert (
622+ MemoryLayer :: Profile ,
623+ session_id,
624+ user_id,
625+ None ,
626+ & format ! ( "profile-{i}" ) ,
627+ & unit_embedding ( 100 + i) ,
628+ )
629+ . await
630+ . unwrap ( ) ;
631+ }
632+ for i in 0 ..5 {
633+ repo. upsert (
634+ MemoryLayer :: Relationship ,
635+ session_id,
636+ user_id,
637+ Some ( instance_id) ,
638+ & format ! ( "relationship-{i}" ) ,
639+ & unit_embedding ( 200 + i) ,
640+ )
641+ . await
642+ . unwrap ( ) ;
643+ }
644+
645+ let ( profile, relationship) =
646+ recall_memory_with_embedding ( & pool, user_id, instance_id, & unit_embedding ( 100 ) ) . await ;
647+
648+ assert_eq ! ( profile. len( ) , PROFILE_RECALL_K as usize ) ;
649+ assert_eq ! ( relationship. len( ) , RELATIONSHIP_RECALL_K as usize ) ;
650+ }
651+
652+ #[ sqlx:: test( migrations = "../eros-engine-store/migrations" ) ]
653+ async fn recall_memory_with_embedding_picks_nearest_per_layer ( pool : PgPool ) {
654+ let user_id = Uuid :: new_v4 ( ) ;
655+ let instance_id = Uuid :: new_v4 ( ) ;
656+ let session_id = make_session ( & pool, user_id, Some ( instance_id) ) . await ;
657+ let repo = MemoryRepo { pool : & pool } ;
658+
659+ // Profile-layer target at seed 42, with two decoys.
660+ repo. upsert (
661+ MemoryLayer :: Profile ,
662+ session_id,
663+ user_id,
664+ None ,
665+ "profile target" ,
666+ & unit_embedding ( 42 ) ,
667+ )
668+ . await
669+ . unwrap ( ) ;
670+ for i in 0 ..2 {
671+ repo. upsert (
672+ MemoryLayer :: Profile ,
673+ session_id,
674+ user_id,
675+ None ,
676+ & format ! ( "profile decoy-{i}" ) ,
677+ & unit_embedding ( 300 + i) ,
678+ )
679+ . await
680+ . unwrap ( ) ;
681+ }
682+
683+ // Relationship-layer target at seed 99, with one decoy.
684+ repo. upsert (
685+ MemoryLayer :: Relationship ,
686+ session_id,
687+ user_id,
688+ Some ( instance_id) ,
689+ "relationship target" ,
690+ & unit_embedding ( 99 ) ,
691+ )
692+ . await
693+ . unwrap ( ) ;
694+ repo. upsert (
695+ MemoryLayer :: Relationship ,
696+ session_id,
697+ user_id,
698+ Some ( instance_id) ,
699+ "relationship decoy" ,
700+ & unit_embedding ( 400 ) ,
701+ )
702+ . await
703+ . unwrap ( ) ;
704+
705+ // Query embedding hits the profile target seed exactly.
706+ let ( profile, _relationship) =
707+ recall_memory_with_embedding ( & pool, user_id, instance_id, & unit_embedding ( 42 ) ) . await ;
708+ assert_eq ! ( profile. first( ) . map( String :: as_str) , Some ( "profile target" ) ) ;
709+
710+ // Query at the relationship target seed.
711+ let ( _profile2, relationship2) =
712+ recall_memory_with_embedding ( & pool, user_id, instance_id, & unit_embedding ( 99 ) ) . await ;
713+ assert_eq ! (
714+ relationship2. first( ) . map( String :: as_str) ,
715+ Some ( "relationship target" ) ,
716+ ) ;
717+ }
718+
719+ #[ sqlx:: test( migrations = "../eros-engine-store/migrations" ) ]
720+ async fn load_insight_bullets_returns_empty_when_no_row ( pool : PgPool ) {
721+ let bullets = load_insight_bullets ( & pool, Uuid :: new_v4 ( ) ) . await ;
722+ assert ! ( bullets. is_empty( ) ) ;
723+ }
724+
725+ #[ sqlx:: test( migrations = "../eros-engine-store/migrations" ) ]
726+ async fn load_insight_bullets_renders_after_merge ( pool : PgPool ) {
727+ let user_id = Uuid :: new_v4 ( ) ;
728+ let repo = InsightRepo { pool : & pool } ;
729+
730+ repo. merge (
731+ user_id,
732+ json ! ( {
733+ "city" : "上海" ,
734+ "mbti_guess" : "INFP" ,
735+ "interests" : [ "登山" , "精酿" ] ,
736+ } ) ,
737+ )
738+ . await
739+ . unwrap ( ) ;
740+
741+ let bullets = load_insight_bullets ( & pool, user_id) . await ;
742+ assert_eq ! (
743+ bullets,
744+ vec![
745+ "城市:上海" . to_string( ) ,
746+ "MBTI:INFP" . to_string( ) ,
747+ "兴趣:登山、精酿" . to_string( ) ,
748+ ]
749+ ) ;
750+ }
518751}
0 commit comments