Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/ironclaw_host_runtime/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use ironclaw_turns::{
use ironclaw_wasm::{
DenyWasmHostHttp, EmptyWasmRuntimeCredentials, PreparedWitTool, WasmError,
WasmRuntimeCredentialProvider, WasmRuntimeHttpAdapter, WasmRuntimePolicyDiscarder,
WasmStagedRuntimeCredentials, WitToolHost, WitToolRequest, WitToolRuntime,
WasmStagedRuntimeCredentials, WitToolExecution, WitToolHost, WitToolRequest, WitToolRuntime,
WitToolRuntimeConfig,
};

Expand Down
1,208 changes: 1,084 additions & 124 deletions crates/ironclaw_host_runtime/src/services/runtime_adapters.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,111 @@ async fn first_party_adapter_releases_reservation_when_reconcile_fails_after_suc
);
}

/// Handler that records it was entered, then blocks forever. Lets a test drive
/// the adapter to the `catch_unwind().await` suspend point (the reservation is
/// already taken) and then cancel the future to exercise the cancellation path.
struct BlockingFirstPartyHandler {
entered: Arc<std::sync::atomic::AtomicBool>,
}

#[async_trait]
impl crate::FirstPartyCapabilityHandler for BlockingFirstPartyHandler {
async fn dispatch(
&self,
_request: crate::FirstPartyCapabilityRequest,
) -> Result<crate::FirstPartyCapabilityResult, crate::FirstPartyCapabilityError> {
self.entered
.store(true, std::sync::atomic::Ordering::SeqCst);
// Block forever; the test cancels the dispatch future via timeout.
std::future::pending::<()>().await;
unreachable!("pending future never resolves")
}
}

/// Regression test for the permanent resource-reservation leak.
///
/// The adapter reserves *before* awaiting `handler.dispatch().catch_unwind()`.
/// Before the `ReservationGuard` fix, cancelling the dispatch future mid-await
/// (the turn scheduler does this on user cancel / lease expiry / heartbeat-store
/// timeout) left the reservation in `reserved_by_account` forever — the governor
/// has no TTL/sweep, so the per-scope budget leaked permanently. With the guard,
/// dropping the future runs `Drop`, releasing the reservation.
///
/// We force the cancellation deterministically: the handler signals it was
/// entered (proving the reservation was taken) and then blocks forever; the
/// dispatch future is wrapped in a short `tokio::time::timeout`, whose elapse
/// drops the future at the suspended await.
#[tokio::test]
async fn first_party_adapter_releases_reservation_when_dispatch_future_is_cancelled() {
let entered = Arc::new(std::sync::atomic::AtomicBool::new(false));
let descriptor = test_descriptor(RuntimeKind::FirstParty, Vec::new());
let registry = Arc::new(FirstPartyCapabilityRegistry::new().with_handler(
descriptor.id.clone(),
Arc::new(BlockingFirstPartyHandler {
entered: Arc::clone(&entered),
}),
));
let adapter = FirstPartyRuntimeAdapter::from_registry(
registry,
Arc::new(LocalInvocationServicesResolver::new(
Arc::new(LocalFilesystem::new()),
None,
Arc::new(LocalHostProcessPort::new()),
None,
)),
);
let filesystem = LocalFilesystem::new();
let governor = InMemoryResourceGovernor::new();
let scope = sample_scope();
let tenant_account = ResourceAccount::tenant(scope.tenant_id.clone());
let package = test_package(WASM_MANIFEST, "test-wasm");
let policy = policy_with(
FilesystemBackendKind::HostWorkspace,
ProcessBackendKind::LocalHost,
NetworkMode::DirectLogged,
SecretMode::ScrubbedEnv,
);
// Non-zero estimate so the held reservation is observable in the tally.
let estimate = ResourceEstimate {
output_bytes: Some(128),
..ResourceEstimate::default()
};

let dispatch = adapter.dispatch_json(RuntimeAdapterRequest {
package: &package,
descriptor: &descriptor,
filesystem: &filesystem,
governor: &governor,
runtime_policy: &policy,
capability_id: &descriptor.id,
scope,
estimate,
mounts: None,
resource_reservation: None,
input: json!({}),
});

// The handler blocks forever, so the timeout elapses and drops the dispatch
// future at the await — the cancellation the turn scheduler performs.
let outcome = tokio::time::timeout(Duration::from_millis(100), dispatch).await;
assert!(
outcome.is_err(),
"the blocking handler must not complete; the timeout must cancel the dispatch future"
);
assert!(
entered.load(std::sync::atomic::Ordering::SeqCst),
"the handler must have been entered, proving the reservation was taken before the await"
);

// The dropped future's `ReservationGuard::drop` must have released the
// reservation; the per-scope reserved tally returns to baseline.
assert_eq!(
governor.reserved_for(&tenant_account),
ResourceTally::default(),
"cancelling the dispatch future mid-await must release the reservation, not leak it"
);
}

struct SucceedingFirstPartyHandler;

#[async_trait]
Expand All @@ -503,3 +608,103 @@ impl crate::FirstPartyCapabilityHandler for SucceedingFirstPartyHandler {
})
}
}

/// Handler that returns `Err(FirstPartyCapabilityError::Dispatch)` with
/// accountable usage, simulating a handler that consumed some resources
/// before failing. Used to exercise the `account_failed` path when the
/// handler error carries usage that `has_accountable_effects` considers
/// accountable (non-zero `output_bytes`).
struct DispatchFailingWithUsageHandler;

#[async_trait]
impl crate::FirstPartyCapabilityHandler for DispatchFailingWithUsageHandler {
async fn dispatch(
&self,
_request: crate::FirstPartyCapabilityRequest,
) -> Result<crate::FirstPartyCapabilityResult, crate::FirstPartyCapabilityError> {
let usage = ironclaw_host_api::ResourceUsage {
output_bytes: 64,
..ironclaw_host_api::ResourceUsage::default()
};
Err(
crate::FirstPartyCapabilityError::new(RuntimeDispatchErrorKind::OperationFailed)
.with_usage(usage),
)
}
}

/// Regression test for the `account_failed` reconcile-failure branch when the
/// handler returns `Err` WITH accountable usage.
///
/// When `governor.reconcile` fails (simulated by `ReconcileFailingGovernor`):
/// (a) The adapter must return the **original** handler error
/// (`DispatchError::FirstParty { OperationFailed }`) — NOT the
/// `Resource` accounting error that `first_party_resource_error` produces.
/// (b) The reservation must be released (reserved tally returns to baseline),
/// because `account_failed` calls `governor.release` after a reconcile
/// failure.
#[tokio::test]
async fn first_party_adapter_preserves_handler_error_when_account_failed_reconcile_fails() {
let descriptor = test_descriptor(RuntimeKind::FirstParty, Vec::new());
let registry = Arc::new(FirstPartyCapabilityRegistry::new().with_handler(
descriptor.id.clone(),
Arc::new(DispatchFailingWithUsageHandler),
));
let adapter = FirstPartyRuntimeAdapter::from_registry(
registry,
Arc::new(LocalInvocationServicesResolver::new(
Arc::new(LocalFilesystem::new()),
None,
Arc::new(LocalHostProcessPort::new()),
None,
)),
);
let filesystem = LocalFilesystem::new();
let governor = ReconcileFailingGovernor::new();
let scope = sample_scope();
let tenant_account = ResourceAccount::tenant(scope.tenant_id.clone());
let package = test_package(WASM_MANIFEST, "test-wasm");
let policy = policy_with(
FilesystemBackendKind::HostWorkspace,
ProcessBackendKind::LocalHost,
NetworkMode::DirectLogged,
SecretMode::ScrubbedEnv,
);

let result = adapter
.dispatch_json(RuntimeAdapterRequest {
package: &package,
descriptor: &descriptor,
filesystem: &filesystem,
governor: &governor,
runtime_policy: &policy,
capability_id: &descriptor.id,
scope,
estimate: ResourceEstimate::default(),
mounts: None,
resource_reservation: None,
input: json!({}),
})
.await;

// (a) Must return the original handler error — NOT DispatchError::FirstParty{Resource}.
assert!(
matches!(
result,
Err(DispatchError::FirstParty {
kind: RuntimeDispatchErrorKind::OperationFailed,
..
})
),
"adapter must preserve the original handler DispatchError kind when account_failed \
reconcile fails; got {result:?}"
);

// (b) The reservation must not leak: release() is called by account_failed
// after a reconcile failure, so the reserved tally returns to baseline.
assert_eq!(
governor.inner.reserved_for(&tenant_account),
ResourceTally::default(),
"reservation must be released when account_failed reconcile fails"
);
}
37 changes: 32 additions & 5 deletions crates/ironclaw_host_runtime/src/wasm_credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use ironclaw_host_api::{
use ironclaw_network::{network_target_for_url, target_matches_pattern};
use ironclaw_secrets::SecretStore;
use ironclaw_wasm::{WasmHostError, WasmRuntimeCredentialProvider, WasmRuntimeCredentialRequest};
use tokio::runtime::{Handle, RuntimeFlavor};
use tokio::runtime::Handle;

use crate::{
RuntimeCredentialAccountRequest, RuntimeCredentialAccountResolver,
Expand Down Expand Up @@ -287,10 +287,14 @@ where
F: Future<Output = Result<T, CredentialStageError>> + Send + 'static,
T: Send + 'static,
{
// Credential restaging is driven from inside the synchronous WASM guest
// call, which the host runtime now runs on the blocking thread pool via
// `spawn_blocking`. `block_in_place` is the wrong tool from there (it
// targets tokio worker threads and, on a worker, would park it for the
// whole restage). Always route the on-runtime case through the dedicated
// single-thread restage runtime so the synchronous thread only blocks on a
// channel recv, never a turn worker.
match Handle::try_current() {
Ok(handle) if handle.runtime_flavor() == RuntimeFlavor::MultiThread => {
tokio::task::block_in_place(|| handle.block_on(future))
}
Ok(_) => run_credential_restage_on_worker(future),
Err(_) => run_credential_restage_future(future),
}
Expand All @@ -305,8 +309,15 @@ where
.as_ref()
.map_err(|error| *error)?;
let (sender, receiver) = mpsc::sync_channel(1);
// Spawn on the worker runtime and recover the result via the join handle on
// a watchdog task, so a panicking restage future maps to
// `CredentialStageError::Backend` rather than dropping the sender and
// surfacing a misleading "recv on closed channel" error — consistent with
// the watchdog/panic-mapping pattern used by `run_runtime_http_egress_on_worker`.
let restage = runtime.spawn(future);
runtime.spawn(async move {
let _ = sender.send(future.await);
let result = restage.await.unwrap_or(Err(CredentialStageError::Backend));
Comment thread
henrypark133 marked this conversation as resolved.
let _ = sender.send(result);
});
receiver.recv().map_err(|_| CredentialStageError::Backend)?
}
Expand Down Expand Up @@ -616,4 +627,20 @@ runtime_credentials = [
invocation_id: InvocationId::new(),
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn block_on_credential_restage_maps_panicking_future_to_backend_on_worker_runtime() {
// The multi-thread runtime means Handle::try_current() succeeds, so
// block_on_credential_restage routes through run_credential_restage_on_worker.
// That function spawns the future on the dedicated restage runtime and wraps
// a panicking / join-failed join handle with `.unwrap_or(Err(CredentialStageError::Backend))`,
// so a panic must surface as Backend rather than poisoning the process.
let result = block_on_credential_restage(async {
panic!("simulated restage panic");
#[allow(unreachable_code)]
Ok::<(), CredentialStageError>(())
});

assert_eq!(result, Err(CredentialStageError::Backend));
}
}
7 changes: 6 additions & 1 deletion crates/ironclaw_wasm/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ pub enum WasmError {
}

impl WasmError {
pub(crate) fn execution_failed(message: String) -> Self {
/// Construct an [`WasmError::ExecutionFailed`] with default usage/logs.
Comment thread
henrypark133 marked this conversation as resolved.
///
/// Used by the runtime itself and by callers that offload execution (e.g.
/// the host runtime's `spawn_blocking` wrapper) and need to map a task
/// join failure into an execution error.
pub fn execution_failed(message: String) -> Self {
Self::ExecutionFailed {
message,
usage: ResourceUsage::default(),
Expand Down
27 changes: 19 additions & 8 deletions crates/ironclaw_wasm/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use ironclaw_host_api::{
is_sensitive_runtime_response_header,
};
use serde_json::{Map, Value};
use tokio::runtime::{Handle, RuntimeFlavor};
use tokio::runtime::Handle;

use crate::WasmHostError;

Expand Down Expand Up @@ -386,13 +386,15 @@ fn block_on_runtime_http_egress<F>(future: F) -> Result<RuntimeHttpEgressRespons
where
F: Future<Output = Result<RuntimeHttpEgressResponse, WasmHostError>> + Send + 'static,
{
// WASM guest execution runs on the blocking thread pool (the host runtime
// dispatches `WitToolRuntime::execute` via `spawn_blocking`). Driving egress
// with `block_in_place` is the wrong tool here: `block_in_place` is for
// tokio *worker* threads, and using it would either no-op or, on a worker,
// park that worker for the whole HTTP round-trip — the worker-pool wedge.
// Route every on-runtime case through the dedicated single-thread egress
// runtime instead, so the synchronous guest thread blocks only on a channel
// recv while the actual I/O runs on its own runtime, never on a turn worker.
match Handle::try_current() {
Ok(handle) if handle.runtime_flavor() == RuntimeFlavor::MultiThread => {
catch_unwind(AssertUnwindSafe(|| {
tokio::task::block_in_place(|| handle.block_on(future))
}))
.map_err(|_| runtime_http_egress_panicked())?
}
Ok(_) => run_runtime_http_egress_on_worker(future),
Err(_) => run_runtime_http_egress_future(future),
}
Expand All @@ -408,8 +410,17 @@ where
.as_ref()
.map_err(|error| error.clone())?;
let (sender, receiver) = mpsc::sync_channel(1);
// Spawn on the worker runtime and recover the result via the join handle on
// a watchdog task, so a panicking egress future maps to the same
// `runtime_http_egress_panicked` error the previous `block_in_place` path
// produced via `catch_unwind`, rather than dropping the sender and
// surfacing a misleading "worker stopped" error.
let egress = runtime.spawn(future);
runtime.spawn(async move {
let _ = sender.send(future.await);
let result = egress
.await
.unwrap_or_else(|_| Err(runtime_http_egress_panicked()));
let _ = sender.send(result);
});
receiver
.recv()
Expand Down
7 changes: 7 additions & 0 deletions crates/ironclaw_wasm/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ use crate::store::StoreData;
use crate::types::{PreparedWitTool, WitToolExecution, WitToolRequest};

/// Reborn WIT-compatible WASM tool runtime.
///
/// Cloning is cheap: [`Engine`] is internally reference-counted and
/// [`WitToolRuntimeConfig`] is a small `Clone` value. A clone shares the same
/// underlying wasmtime engine, so a clone can be moved into a blocking task
/// (`tokio::task::spawn_blocking`) to run the synchronous guest call off the
/// async worker pool without re-creating the engine.
#[derive(Clone)]
pub struct WitToolRuntime {
engine: Engine,
config: WitToolRuntimeConfig,
Expand Down
Loading
Loading