Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ workers:
adapter:
name: fs
config:
directory: ./data/configuration
directory: ./iii-config
# 0 disables TTL cleanup. Set >0 (in seconds) to delete a configuration
# entry whose last subscriber trigger has been unregistered for that long.
ttl_seconds: 0
Expand Down
68 changes: 52 additions & 16 deletions engine/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,22 +537,21 @@ fn read_persisted_observability_value(cfg: &EngineConfig) -> Option<serde_json::
};
let value = entry.get("value").cloned().filter(|v| !v.is_null())?;

// `expand_value` panics on a `${VAR}` placeholder with no default and no
// env value. At runtime that fails one bus call; here it would brick
// every engine start until the data file is hand-edited — so contain it
// and fall back to the yaml block.
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
crate::workers::configuration::store::expand_value(&value)
})) {
Ok(expanded) => Some(expanded),
Err(_) => {
eprintln!(
"persisted configuration entry {} references an environment variable with no value and no default; using the config.yaml block",
path.display()
);
None
}
// Expand `${VAR:default}` (with scalar type-coercion) the same way
// `configuration::get` does. A `${VAR}` with no env value and no default
// can't be resolved at boot: log an ERROR and fall back to the config.yaml
// block rather than load a value still carrying literal `${VAR}` text.
// `expand_value` never panics, so no `catch_unwind` guard is needed.
let (expanded, missing) = crate::workers::configuration::store::expand_value(&value);
if !missing.is_empty() {
eprintln!(
"persisted configuration entry {} references environment variable(s) with no value and no default ({}); it will not be loaded — using the config.yaml block",
path.display(),
missing.join(", ")
);
return None;
}
Some(expanded)
}

pub fn init_log_from_engine_config(cfg: &EngineConfig) {
Expand Down Expand Up @@ -797,7 +796,7 @@ mod tests {
fn boot_read_path_constants_align_with_fs_adapter() {
use crate::workers::configuration::adapters::fs;
assert_eq!(fs::ADAPTER_NAME, "fs");
assert_eq!(fs::DEFAULT_DIRECTORY, "./data/configuration");
assert_eq!(fs::DEFAULT_DIRECTORY, "./iii-config");
assert_eq!(fs::FILE_EXTENSION, "yaml");
assert_eq!(
format!(
Expand Down Expand Up @@ -1364,6 +1363,43 @@ mod tests {
assert_eq!(resolved.service_name.as_deref(), Some("fallback-name"));
}

#[test]
fn boot_merge_coerces_typed_placeholder() {
// #1916: a templated numeric field must coerce to an integer so the
// typed config deserializes, instead of arriving as a string and
// dropping the persisted value back to the yaml block.
unsafe {
std::env::remove_var("III_BOOT_LOGS_MAX");
}
let dir = tempfile::tempdir().unwrap();
write_persisted_entry(
dir.path(),
serde_json::json!({ "logs_max_count": "${III_BOOT_LOGS_MAX:4096}" }),
);
let cfg = boot_cfg_with_dir(dir.path(), serde_json::json!({ "logs_max_count": 1 }));

let resolved = resolve_boot_observability_config(&cfg).expect("entry present");
assert_eq!(resolved.logs_max_count, Some(4096));
}

#[test]
fn boot_merge_falls_back_when_required_var_missing_no_panic() {
// `${VAR}` with no default and no env value can't be resolved at boot:
// fall back to the yaml block without panicking (catch_unwind removed).
unsafe {
std::env::remove_var("III_BOOT_LOGS_MAX_MISSING");
}
let dir = tempfile::tempdir().unwrap();
write_persisted_entry(
dir.path(),
serde_json::json!({ "logs_max_count": "${III_BOOT_LOGS_MAX_MISSING}" }),
);
let cfg = boot_cfg_with_dir(dir.path(), serde_json::json!({ "logs_max_count": 123 }));

let resolved = resolve_boot_observability_config(&cfg).expect("entry present");
assert_eq!(resolved.logs_max_count, Some(123));
}

#[test]
fn boot_merge_falls_back_on_malformed_file() {
let dir = tempfile::tempdir().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion engine/src/workers/config_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! and insert one comment, so everything else in the file is byte-preserved.

/// Comment left in place of a stripped `config:` block. `location` names where
/// the value now lives (e.g. `./data/configuration/<id>.yaml` for the fs
/// the value now lives (e.g. `./iii-config/<id>.yaml` for the fs
/// adapter).
fn seed_comment(config_id: &str, location: &str) -> String {
format!(
Expand Down
10 changes: 6 additions & 4 deletions engine/src/workers/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ npx skills add iii-hq/iii --full-depth --skill configuration
adapter:
name: fs
config:
directory: ./data/configuration
directory: ./iii-config
ttl_seconds: 0
```

Expand All @@ -48,14 +48,16 @@ File-system adapter that stores one YAML file per configuration id and watches t
```yaml
name: fs
config:
directory: ./data/configuration
directory: ./iii-config
```

| Field | Type | Description |
|---|---|---|
| `directory` | string | Directory holding `<id>.yaml` files. Created on boot. Defaults to `./data/configuration`. |
| `directory` | string | Directory holding `<id>.yaml` files. Created on boot. Defaults to `./iii-config`. |

External writes / edits / removals to the watched directory are debounced (500 ms) and replayed as `configuration:registered`, `configuration:updated`, or `configuration:deleted` events through the same trigger fan-out used by SDK calls.
The default location was previously `./data/configuration`. When running on the default `./iii-config`, the adapter performs a one-time soft migration on boot: every `<id>.yaml` still in `./data/configuration` is moved across (logged at INFO). If a file with the same name already exists in `./iii-config`, that file is **skipped with a WARNING** and the legacy copy is left untouched for you to reconcile. An explicit `directory:` override disables the migration.

External writes / edits / removals to the watched directory are debounced (500 ms) and replayed as `configuration:registered`, `configuration:updated`, or `configuration:deleted` events through the same trigger fan-out used by SDK calls. A hand-edit that fails schema validation (after `${VAR}` expansion) is **rejected with a WARNING** — the previous good value is kept rather than loaded.

### bridge

Expand Down
211 changes: 208 additions & 3 deletions engine/src/workers/configuration/adapters/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ pub(crate) const ADAPTER_NAME: &str = "fs";
// `DEFAULT_DIRECTORY` / `FILE_EXTENSION` are `pub(crate)` so boot-time
// persisted-config readers (e.g. the `iii-state` boot-read) resolve the same
// on-disk location the configuration worker persists entries under.
pub(crate) const DEFAULT_DIRECTORY: &str = "./data/configuration";
pub(crate) const DEFAULT_DIRECTORY: &str = "./iii-config";
pub(crate) const FILE_EXTENSION: &str = "yaml";
/// The previous default store location. When the resolved directory is the
/// current default and this legacy folder still holds entries, `new` migrates
/// them across once (a soft transition) so upgrading doesn't silently start the
/// store from empty. An explicit `directory:` override is never touched.
const LEGACY_DEFAULT_DIRECTORY: &str = "./data/configuration";

pub struct FsAdapter {
directory: PathBuf,
Expand Down Expand Up @@ -73,6 +78,13 @@ impl FsAdapter {
)
})?;

// Soft transition: when running on the current default location, move any
// entries left in the legacy default folder across once. Guarded on the
// default dir, so an explicit `directory:` override is never disturbed.
if directory.as_path() == Path::new(DEFAULT_DIRECTORY) {
Self::migrate_legacy_default(&directory).await;
}

let cache = Self::load_directory(&directory).await?;
tracing::info!(
directory = %directory.display(),
Expand All @@ -91,6 +103,70 @@ impl FsAdapter {
self.directory.join(format!("{}.{}", id, FILE_EXTENSION))
}

/// One-time soft migration of `*.yaml` entries from the legacy default
/// store location ([`LEGACY_DEFAULT_DIRECTORY`]) into `target`.
async fn migrate_legacy_default(target: &Path) {
Self::migrate_dir(Path::new(LEGACY_DEFAULT_DIRECTORY), target).await;
}

/// Move every `*.yaml` entry from `legacy` into `target`. Best-effort: a
/// file already present in `target` is left in place (WARNING, the legacy
/// copy is kept so the operator can reconcile); any I/O error is logged and
/// the file is skipped. Never fails the adapter.
async fn migrate_dir(legacy: &Path, target: &Path) {
if legacy == target {
return;
}
let mut read_dir = match tokio::fs::read_dir(legacy).await {
Ok(rd) => rd,
// Legacy folder absent (a fresh install) — nothing to migrate.
Err(_) => return,
};

let mut moved = 0usize;
while let Ok(Some(entry)) = read_dir.next_entry().await {
let path = entry.path();
if !path.is_file()
|| path.extension().and_then(|e| e.to_str()) != Some(FILE_EXTENSION)
{
continue;
}
let Some(name) = path.file_name() else {
continue;
};
let dest = target.join(name);
if tokio::fs::try_exists(&dest).await.unwrap_or(false) {
tracing::warn!(
file = %name.to_string_lossy(),
location = %target.display(),
"Skipped migrating configuration file from the legacy '{}' folder: a file with that name already exists in the new location; the legacy copy was left untouched",
LEGACY_DEFAULT_DIRECTORY,
);
continue;
}
match tokio::fs::rename(&path, &dest).await {
Ok(()) => moved += 1,
Err(err) => tracing::warn!(
file = %name.to_string_lossy(),
error = %err,
"Failed to migrate configuration file from the legacy '{}' folder; leaving it in place",
LEGACY_DEFAULT_DIRECTORY,
),
}
}

if moved > 0 {
tracing::info!(
from = %legacy.display(),
to = %target.display(),
count = moved,
"The configuration store now defaults to '{}'; migrated existing entries out of the legacy '{}' folder",
DEFAULT_DIRECTORY,
LEGACY_DEFAULT_DIRECTORY,
);
}
}

async fn load_directory(dir: &Path) -> anyhow::Result<HashMap<String, ConfigurationEntry>> {
let mut entries = HashMap::new();
let mut read_dir = tokio::fs::read_dir(dir).await?;
Expand Down Expand Up @@ -240,16 +316,25 @@ impl ConfigurationAdapter for FsAdapter {
}
})
.map_err(|e| anyhow::anyhow!("failed to create configuration watcher: {}", e))?;
// Watch an absolute path: the macOS FSEvents backend can silently fail
// to deliver events for a relative path like `./iii-config`, which is
// exactly the default. Canonicalize (the dir always exists by now —
// `new` created it) and fall back to the raw path only if that fails.
let watch_path = std::fs::canonicalize(&directory).unwrap_or_else(|_| directory.clone());
watcher
.watch(&directory, RecursiveMode::NonRecursive)
.watch(&watch_path, RecursiveMode::NonRecursive)
.map_err(|e| {
anyhow::anyhow!(
"failed to watch configuration directory '{}': {}",
directory.display(),
watch_path.display(),
e
)
})?;
*self.watcher.lock().await = Some(watcher);
tracing::info!(
directory = %watch_path.display(),
"Watching configuration directory for external edits (hot-reload enabled)"
);

// Debounce loop: drains every queued raw event in a 500ms window
// and then diffs the directory snapshot against the cache, emitting
Expand Down Expand Up @@ -569,6 +654,126 @@ mod tests {
}
}

#[tokio::test]
async fn internal_set_does_not_echo_through_watcher() {
// An internal `set` writes the file AND updates the adapter cache, so
// the watcher's diff sees disk == cache and emits nothing. This is what
// stops a save → reload → save loop. (An *external* edit, where the
// cache is stale relative to disk, still fires — see the tests above.)
let dir = temp_dir();
let adapter = FsAdapter::new(Some(json!({ "directory": dir.path().to_str().unwrap() })))
.await
.unwrap();
adapter.register(sample_entry("looptest")).await.unwrap();

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
adapter.watch(tx).await.unwrap();

adapter
.set("looptest", json!({ "port": 4242 }))
.await
.unwrap();

// Well past the 500ms debounce: the self-write must not surface as an
// external change.
let echoed = tokio::time::timeout(Duration::from_millis(1500), rx.recv()).await;
assert!(
echoed.is_err(),
"an internal set must not echo back through the watcher (would loop)"
);
}

#[tokio::test]
async fn migrate_dir_moves_yaml_entries_and_ignores_others() {
let legacy = temp_dir();
let target = temp_dir();
let stream = "id: iii-stream\nvalue:\n port: 1\n";
tokio::fs::write(legacy.path().join("iii-stream.yaml"), stream)
.await
.unwrap();
tokio::fs::write(legacy.path().join("iii-http.yaml"), "id: iii-http\n")
.await
.unwrap();
// A non-yaml file must be left behind.
tokio::fs::write(legacy.path().join("notes.txt"), "ignore me")
.await
.unwrap();

FsAdapter::migrate_dir(legacy.path(), target.path()).await;

assert!(target.path().join("iii-stream.yaml").exists());
assert!(target.path().join("iii-http.yaml").exists());
assert_eq!(
tokio::fs::read_to_string(target.path().join("iii-stream.yaml"))
.await
.unwrap(),
stream,
"content must be preserved across the move"
);
// Moved, not copied.
assert!(!legacy.path().join("iii-stream.yaml").exists());
assert!(!legacy.path().join("iii-http.yaml").exists());
// Non-yaml left untouched in the legacy folder.
assert!(!target.path().join("notes.txt").exists());
assert!(legacy.path().join("notes.txt").exists());
}

#[tokio::test]
async fn migrate_dir_skips_conflicts_and_keeps_legacy_copy() {
let legacy = temp_dir();
let target = temp_dir();
tokio::fs::write(
legacy.path().join("iii-stream.yaml"),
"id: iii-stream\nvalue:\n port: 1\n",
)
.await
.unwrap();
tokio::fs::write(legacy.path().join("iii-http.yaml"), "id: iii-http\n")
.await
.unwrap();
// The new location already has a file with the same name (different
// content): the migration of THAT file must be skipped (WARNING).
let existing = "id: iii-stream\nvalue:\n port: 999\n";
tokio::fs::write(target.path().join("iii-stream.yaml"), existing)
.await
.unwrap();

FsAdapter::migrate_dir(legacy.path(), target.path()).await;

// Conflict: target keeps its own content; legacy copy is left in place.
assert_eq!(
tokio::fs::read_to_string(target.path().join("iii-stream.yaml"))
.await
.unwrap(),
existing,
"a conflicting file must not be overwritten"
);
assert!(
legacy.path().join("iii-stream.yaml").exists(),
"the conflicting legacy copy is kept for the operator to reconcile"
);
// The non-conflicting file still migrates.
assert!(target.path().join("iii-http.yaml").exists());
assert!(!legacy.path().join("iii-http.yaml").exists());
}

#[tokio::test]
async fn migrate_dir_is_a_noop_when_legacy_absent() {
let target = temp_dir();
let legacy = target.path().join("nonexistent-legacy");
// Must not panic or create anything.
FsAdapter::migrate_dir(&legacy, target.path()).await;
assert!(
tokio::fs::read_dir(target.path())
.await
.unwrap()
.next_entry()
.await
.unwrap()
.is_none()
);
}

#[test]
fn id_from_path_extracts_stem() {
assert_eq!(
Expand Down
Loading
Loading