Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ resolver = "2"
members = ["crates/*"]

[workspace.package]
version = "0.4.3-dev"
version = "0.5.1-dev"
edition = "2021"
license = "AGPL-3.0-only"
repository = "https://github.com/etherfunlab/eros-engine"
Expand Down
2 changes: 1 addition & 1 deletion crates/eros-engine-llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ keywords = ["companion", "openrouter", "voyage", "embeddings", "llm"]
categories = ["api-bindings"]

[dependencies]
eros-engine-core = { path = "../eros-engine-core", version = "0.4.3-dev" }
eros-engine-core = { path = "../eros-engine-core", version = "0.5.1-dev" }
serde = { workspace = true }
serde_json = { workspace = true }
reqwest = { workspace = true }
Expand Down
9 changes: 6 additions & 3 deletions crates/eros-engine-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ name = "eros-engine"
path = "src/main.rs"

[dependencies]
eros-engine-core = { path = "../eros-engine-core", version = "0.4.3-dev" }
eros-engine-llm = { path = "../eros-engine-llm", version = "0.4.3-dev" }
eros-engine-store = { path = "../eros-engine-store", version = "0.4.3-dev" }
eros-engine-core = { path = "../eros-engine-core", version = "0.5.1-dev" }
eros-engine-llm = { path = "../eros-engine-llm", version = "0.5.1-dev" }
eros-engine-store = { path = "../eros-engine-store", version = "0.5.1-dev" }
axum = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
Expand Down Expand Up @@ -51,6 +51,9 @@ tokio-stream = { workspace = true }
async-stream = { workspace = true }
ulid = { workspace = true }
rand = "0.8"
# 6-field cron schedule parser (sec min hr dom mon dow). Pure parser; the
# sweeper drives ticks via tokio::time::sleep. No runtime scheduler.
cron = "0.12"

[dev-dependencies]
wiremock = "0.6"
5 changes: 5 additions & 0 deletions crates/eros-engine-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ async fn run_server() -> Result<()> {
// Cloned because the next line moves `state` into the router.
tokio::spawn(crate::pipeline::dreaming::sweeper(state.clone()));

// companion_insights_snapshot sweeper. Returns immediately when
// SNAPSHOT_DISABLED=1 or the cron expression fails to parse, so the
// chat path is unaffected by snapshot misconfig.
tokio::spawn(crate::pipeline::snapshot::sweeper(state.clone()));

if state.marketplace_svc_url.is_some() {
tokio::spawn(crate::pipeline::sync::run(state.clone()));
tracing::info!("marketplace self-heal task spawned");
Expand Down
1 change: 1 addition & 0 deletions crates/eros-engine-server/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
pub mod dreaming;
pub mod handlers;
pub mod post_process;
pub mod snapshot;
pub mod stream;
pub mod sync;

Expand Down
89 changes: 89 additions & 0 deletions crates/eros-engine-server/src/pipeline/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// SPDX-License-Identifier: AGPL-3.0-only
//! companion_insights_snapshot sweeper.
//!
//! On a cron schedule (default 23:00 SGT daily), inserts one row per
//! user with a companion_insights record into
//! engine.companion_insights_snapshot, preserving the JSONB and
//! training_level at that instant for downstream time-series consumers.
//! No LLM, no dedupe, no transformation.

use std::str::FromStr;

use chrono::Utc;
use cron::Schedule;

use eros_engine_store::insight::InsightRepo;

use crate::state::AppState;

/// Run forever. Spawn once at server startup. Returns immediately if
/// `SNAPSHOT_DISABLED=1` or if the cron expression fails to parse — the
/// sweeper failure does not affect the chat path.
pub async fn sweeper(state: AppState) {
let cfg = &state.config.snapshot;
if cfg.disabled {
tracing::info!("snapshot sweeper disabled (SNAPSHOT_DISABLED=1)");
return;
}
let schedule = match Schedule::from_str(&cfg.cron) {
Ok(s) => s,
Err(e) => {
tracing::error!(cron = %cfg.cron, error = %e,
"snapshot: invalid SNAPSHOT_CRON; sweeper disabled");
return;
}
};
tracing::info!(cron = %cfg.cron, tz = %cfg.tz, "snapshot sweeper starting");

loop {
let next = match schedule.upcoming(cfg.tz).next() {
Some(n) => n.with_timezone(&Utc),
None => {
tracing::error!("snapshot cron yielded no upcoming fire; exiting");
return;
}
};
let delay = (next - Utc::now())
.to_std()
.unwrap_or(std::time::Duration::from_secs(1));
tokio::time::sleep(delay).await;

let fire_at = Utc::now();
let repo = InsightRepo { pool: &state.pool };
match repo.snapshot_all_users(fire_at).await {
Ok(n) => tracing::info!(written = n, %fire_at, "snapshot: fire complete"),
Err(e) => {
tracing::warn!(error = %e, "snapshot: fire failed; retrying next tick")
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use chrono::Timelike;
use chrono_tz::Asia::Singapore;

#[test]
fn default_cron_parses() {
Schedule::from_str("0 0 23 * * *").expect("default cron must parse");
}

#[test]
fn default_cron_next_fire_in_sgt_is_23h_local() {
let sched = Schedule::from_str("0 0 23 * * *").unwrap();
let next = sched
.upcoming(Singapore)
.next()
.expect("at least one upcoming fire");
assert_eq!(next.hour(), 23, "next fire is at 23:00 SGT local");
assert_eq!(next.minute(), 0);
assert_eq!(next.second(), 0);
}

#[test]
fn malformed_cron_returns_err() {
assert!(Schedule::from_str("not a cron").is_err());
}
}
6 changes: 6 additions & 0 deletions crates/eros-engine-server/src/routes/companion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,12 @@ pub(crate) fn test_state(pool: sqlx::PgPool) -> AppState {
dreaming_idle_threshold: std::time::Duration::from_secs(1800),
dreaming_claim_stale_threshold: std::time::Duration::from_secs(600),
openrouter_usage_hidden_keys: std::collections::HashSet::new(),
// Snapshot sweeper disabled in tests — same rationale as dreaming.
snapshot: crate::state::SnapshotConfig {
disabled: true,
cron: "0 0 23 * * *".into(),
tz: chrono_tz::Asia::Singapore,
},
},
openrouter: Arc::new(eros_engine_llm::openrouter::OpenRouterClient::new(
"stub".into(),
Expand Down
71 changes: 71 additions & 0 deletions crates/eros-engine-server/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,38 @@ pub(crate) fn parse_usage_hidden_keys(raw: Option<&str>) -> HashSet<String> {
.collect()
}

/// Knobs for the companion_insights_snapshot sweeper. Defaults: daily
/// 23:00 SGT, enabled. The cron string is stored raw and validated by
/// the sweeper at task start (so an invalid expression fails the sweeper
/// task only, not the whole server boot).
#[derive(Clone, Debug)]
pub struct SnapshotConfig {
pub disabled: bool,
pub cron: String,
pub tz: chrono_tz::Tz,
}

/// Pure parser for the three env vars. Mirrors `parse_usage_hidden_keys`
/// in that tests can exercise edge cases without touching process env.
///
/// - `SNAPSHOT_DISABLED=1` → disabled
/// - `SNAPSHOT_CRON` raw 6-field cron string (default `"0 0 23 * * *"`)
/// - `SNAPSHOT_TZ` IANA zone (default `"Asia/Singapore"`; falls back on parse failure)
pub(crate) fn parse_snapshot_config(
disabled_raw: Option<&str>,
cron_raw: Option<&str>,
tz_raw: Option<&str>,
) -> SnapshotConfig {
let disabled = disabled_raw.map(|v| v == "1").unwrap_or(false);
let cron = cron_raw
.map(str::to_owned)
.unwrap_or_else(|| "0 0 23 * * *".to_string());
let tz = tz_raw
.and_then(|s| s.parse::<chrono_tz::Tz>().ok())
.unwrap_or(chrono_tz::Asia::Singapore);
SnapshotConfig { disabled, cron, tz }
}

/// Per-user in-flight SSE stream counter. Used by the
/// `send_message_stream` handler to enforce spec §1.9 (≤3 concurrent
/// active streams per user, returning HTTP 429 over the cap).
Expand Down Expand Up @@ -136,6 +168,9 @@ pub struct ServerConfig {
/// stays intact. Populated from `OPENROUTER_USAGE_HIDDEN_KEYS`
/// (comma-separated).
pub openrouter_usage_hidden_keys: HashSet<String>,
/// Cron-scheduled companion_insights_snapshot sweeper config. See
/// `pipeline::snapshot` for the sweep loop.
pub snapshot: SnapshotConfig,
}

impl ServerConfig {
Expand Down Expand Up @@ -169,6 +204,11 @@ impl ServerConfig {
.and_then(|v| v.parse().ok())
.unwrap_or(600),
);
let snapshot = parse_snapshot_config(
std::env::var("SNAPSHOT_DISABLED").ok().as_deref(),
std::env::var("SNAPSHOT_CRON").ok().as_deref(),
std::env::var("SNAPSHOT_TZ").ok().as_deref(),
);
Self {
expose_affinity_debug: std::env::var("EXPOSE_AFFINITY_DEBUG")
.map(|v| v == "true" || v == "1")
Expand All @@ -187,6 +227,7 @@ impl ServerConfig {
.ok()
.as_deref(),
),
snapshot,
}
}
}
Expand Down Expand Up @@ -247,4 +288,34 @@ mod tests {
let _g3 = slots.try_acquire(uid, 2).expect("acquire after drop ok");
drop(g2);
}

#[test]
fn snapshot_config_defaults_when_env_unset() {
let cfg = parse_snapshot_config(None, None, None);
assert!(!cfg.disabled);
assert_eq!(cfg.cron, "0 0 23 * * *");
assert_eq!(cfg.tz, chrono_tz::Asia::Singapore);
}

#[test]
fn snapshot_config_disabled_when_env_says_one() {
let cfg = parse_snapshot_config(Some("1"), None, None);
assert!(cfg.disabled);
let cfg = parse_snapshot_config(Some("0"), None, None);
assert!(!cfg.disabled, "any value other than 1 leaves it enabled");
}

#[test]
fn snapshot_config_honours_env_overrides() {
let cfg = parse_snapshot_config(None, Some("0 */5 * * * *"), Some("UTC"));
assert_eq!(cfg.cron, "0 */5 * * * *");
assert_eq!(cfg.tz, chrono_tz::UTC);
}

#[test]
fn snapshot_config_falls_back_on_bad_tz() {
// Misspelled tz → default + (caller will warn-log; we just verify fallback)
let cfg = parse_snapshot_config(None, None, Some("Not/A_Real_Zone"));
assert_eq!(cfg.tz, chrono_tz::Asia::Singapore);
}
}
2 changes: 1 addition & 1 deletion crates/eros-engine-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ keywords = ["companion", "memory", "pgvector", "postgres", "sqlx"]
categories = ["database"]

[dependencies]
eros-engine-core = { path = "../eros-engine-core", version = "0.4.3-dev" }
eros-engine-core = { path = "../eros-engine-core", version = "0.5.1-dev" }
sqlx = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
-- SPDX-License-Identifier: AGPL-3.0-only
-- engine.companion_insights_snapshot — append-only history of
-- companion_insights, one row per user per sweeper fire.
--
-- captured_at carries no DEFAULT: the sweeper passes the fire-instant
-- timestamp explicitly so every row of a fire shares one value, letting
-- downstream group cleanly by "fire" without bucketing.
--
-- Spec: docs/superpowers/specs/2026-05-29-engine-cleanup-and-snapshot-design.md §3

CREATE TABLE engine.companion_insights_snapshot (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
insights JSONB NOT NULL,
training_level DOUBLE PRECISION NOT NULL,
captured_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX idx_companion_insights_snapshot_user_time
ON engine.companion_insights_snapshot (user_id, captured_at DESC);

-- Supabase lockdown, mirroring migration 0013. REVOKEs are wrapped in
-- pg_roles existence checks so non-Supabase Postgres (where anon /
-- authenticated don't exist — including the sqlx test DB) skips them
-- silently. The RLS enable runs unconditionally; with no policy attached,
-- only owner (postgres) and service_role connections can touch the table.
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'anon') THEN
REVOKE ALL ON engine.companion_insights_snapshot FROM anon;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'authenticated') THEN
REVOKE ALL ON engine.companion_insights_snapshot FROM authenticated;
END IF;
END
$$;

ALTER TABLE engine.companion_insights_snapshot ENABLE ROW LEVEL SECURITY;
Loading
Loading