-
Notifications
You must be signed in to change notification settings - Fork 1.5k
fix(reborn): release Slack admission permit once inbound is durably accepted #5225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -96,15 +96,24 @@ impl NativeProductAdapterRunner { | |
| } | ||
| } | ||
| tasks.spawn(async move { | ||
| let _permit = permit; | ||
| // The admission permit bounds the whole tracked post-ACK task, including | ||
| // observer follow-up. Otherwise quick workflow acks could release permits | ||
| // while long-running observers accumulate without backpressure. | ||
| // The admission permit gates only fast intake (auth/parse/stamp/ | ||
| // submit), bounded by `workflow_timeout`. It must NOT be held across | ||
| // the post-ACK observer delivery poll, which can wait for the run's | ||
| // final reply for far longer (Slack's observer polls up to ~120s). | ||
| // Holding admission across that unbounded wait would let | ||
| // `max_in_flight` slow turns exhaust every intake slot and silently | ||
| // reject new inbound webhooks under load. The permit is therefore | ||
| // released the moment the workflow durably accepts the inbound | ||
| // (`is_durable_outcome`), before observer follow-up runs. On error | ||
| // or timeout (no durable acceptance) the permit drops at scope end, | ||
| // which keeps correct backpressure over the bounded intake window. | ||
| // | ||
| // Timeout drops the in-flight workflow future. That is the intended | ||
| // cancellation boundary for this generic async trait call: the | ||
| // runner does not hold a separate task handle or protocol-specific | ||
| // resource owner to abort. Workflows that open DB/network resources | ||
| // must make their own futures cancellation-safe at await points. | ||
| let mut permit = Some(permit); | ||
| let workflow_result = | ||
| tokio::time::timeout(workflow_timeout, workflow.submit_inbound(workflow_envelope)) | ||
| .await; | ||
|
|
@@ -116,6 +125,13 @@ impl NativeProductAdapterRunner { | |
| "async webhook workflow dispatch requested retry after protocol ack; event was not retried by protocol transport" | ||
| ); | ||
| } | ||
| // Release admission before the (potentially long) delivery | ||
| // poll once the run is durably submitted. A non-durable ack | ||
| // (e.g. retryable rejection) keeps the permit until scope | ||
| // end so admission still backpressures un-accepted intake. | ||
| if ack.is_durable_outcome() { | ||
| drop(permit.take()); | ||
|
Comment on lines
+132
to
+133
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a durable ack comes back, this drops the only runner-level bound before calling the observer. I checked the Slack observer path: Useful? React with 👍 / 👎. |
||
| } | ||
| if let Some(observer) = observer { | ||
| observer.observe_workflow_ack(envelope, ack).await; | ||
| } | ||
|
|
@@ -162,7 +178,7 @@ impl NativeProductAdapterRunner { | |
| #[cfg(test)] | ||
| mod tests { | ||
| use std::sync::Arc; | ||
| use std::sync::atomic::{AtomicUsize, Ordering}; | ||
| use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; | ||
| use std::time::Duration; | ||
|
|
||
| use super::ImmediateAckWorkflowObserver; | ||
|
|
@@ -314,6 +330,38 @@ mod tests { | |
| } | ||
| } | ||
|
|
||
| /// Workflow that returns a *non-durable* retryable rejection via the | ||
| /// `Ok(ack)` arm (distinct from `RejectingWorkflow`, which returns `Err`). | ||
| /// `ProductInboundAck::Rejected(Retryable)` reports `is_durable_outcome() == | ||
| /// false`, so the admission permit must be retained across the observer. | ||
| struct NonDurableRejectWorkflow; | ||
|
|
||
| #[async_trait] | ||
| impl ironclaw_product_adapters::ProductWorkflow for NonDurableRejectWorkflow { | ||
| async fn submit_inbound( | ||
| &self, | ||
| _envelope: ProductInboundEnvelope, | ||
| ) -> Result<ProductInboundAck, ProductAdapterError> { | ||
| Ok(ProductInboundAck::Rejected( | ||
| ironclaw_product_adapters::ProductRejection::retryable( | ||
| ironclaw_product_adapters::ProductRejectionKind::PolicyDenied, | ||
| "policy temporarily unavailable", | ||
| ), | ||
| )) | ||
| } | ||
|
|
||
| async fn resolve_projection_subscription( | ||
| &self, | ||
| _envelope: ProductInboundEnvelope, | ||
| ) -> Result<ProjectionSubscriptionRequest, ProductAdapterError> { | ||
| Err(ProductAdapterError::Internal { | ||
| detail: ironclaw_product_adapters::redaction::RedactedString::new( | ||
| "test stub: resolve_projection_subscription not supported", | ||
| ), | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| struct RecordingObserver { | ||
| ack_count: Arc<AtomicUsize>, | ||
| error_count: Arc<AtomicUsize>, | ||
|
|
@@ -338,6 +386,34 @@ mod tests { | |
| } | ||
| } | ||
|
|
||
| /// Observer that blocks inside `observe_workflow_ack` until released. Models | ||
| /// the real `SlackFinalReplyDeliveryObserver`, which polls the run for its | ||
| /// final reply for up to `max_wait` (default 120s) inside this callback. | ||
| /// | ||
| /// Release uses a latched `AtomicBool` polled with `yield_now`, not a | ||
| /// `Notify`: a single shared observer is invoked by two tasks, and the | ||
| /// second invocation can register *after* the release fires, which | ||
| /// `Notify::notify_waiters` would not wake. The latch wakes current and | ||
| /// future waiters deterministically. | ||
| struct BlockingObserver { | ||
| entered: Arc<AtomicUsize>, | ||
| released: Arc<AtomicBool>, | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl ImmediateAckWorkflowObserver for BlockingObserver { | ||
| async fn observe_workflow_ack( | ||
| &self, | ||
| _envelope: ProductInboundEnvelope, | ||
| _ack: ProductInboundAck, | ||
| ) { | ||
| self.entered.fetch_add(1, Ordering::SeqCst); | ||
| while !self.released.load(Ordering::SeqCst) { | ||
| tokio::task::yield_now().await; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| struct BlockingWorkflow { | ||
| entered: Arc<AtomicUsize>, | ||
| release: Arc<Notify>, | ||
|
|
@@ -398,6 +474,20 @@ mod tests { | |
| .expect("webhook auth should verify") | ||
| } | ||
|
|
||
| async fn wait_for_observer_entries( | ||
| entered: &AtomicUsize, | ||
| expected: usize, | ||
| failure_context: &str, | ||
| ) { | ||
| tokio::time::timeout(Duration::from_secs(1), async { | ||
| while entered.load(Ordering::SeqCst) < expected { | ||
| tokio::task::yield_now().await; | ||
| } | ||
| }) | ||
| .await | ||
| .unwrap_or_else(|_| panic!("timed out waiting for {failure_context}")); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn process_verified_webhook_immediate_ack_dispatches_workflow() { | ||
| let parse_count = Arc::new(AtomicUsize::new(0)); | ||
|
|
@@ -510,6 +600,118 @@ mod tests { | |
| assert_eq!(parse_count.load(Ordering::SeqCst), 2); | ||
| } | ||
|
|
||
| /// Regression: the admission permit must be released once the workflow has | ||
| /// durably accepted the inbound (its `submit_inbound` returned), NOT held | ||
| /// across the post-ACK observer delivery poll. The real Slack observer polls | ||
| /// the submitted run for its final reply for up to 120s inside | ||
| /// `observe_workflow_ack`; if admission stayed pinned for that whole window, | ||
| /// only `SLACK_MAX_IN_FLIGHT_WEBHOOKS` slow turns would exhaust all admission | ||
| /// slots and new inbound webhooks would be rejected `TooManyInFlight` for up | ||
| /// to two minutes — silent backpressure-induced message loss under load. | ||
| /// | ||
| /// With `max_in_flight = 1`: webhook #1 durably accepts, then its observer | ||
| /// blocks (modelling the long delivery poll). Webhook #2 must still be | ||
| /// admitted because admission is freed after durable acceptance. | ||
| #[tokio::test] | ||
| async fn admission_released_after_durable_accept_not_held_across_delivery() { | ||
| let parse_count = Arc::new(AtomicUsize::new(0)); | ||
| let observer_entered = Arc::new(AtomicUsize::new(0)); | ||
| let observer_released = Arc::new(AtomicBool::new(false)); | ||
| // AckWorkflow returns a durable NoOp ack immediately; the observer then | ||
| // blocks, standing in for the up-to-120s final-reply delivery poll. | ||
| let runner = runner_with_workflow(Arc::clone(&parse_count), Arc::new(AckWorkflow), 1); | ||
| let observer = Arc::new(BlockingObserver { | ||
| entered: Arc::clone(&observer_entered), | ||
| released: Arc::clone(&observer_released), | ||
| }); | ||
| let evidence = verified_evidence(&runner); | ||
|
|
||
| let first = runner | ||
| .process_verified_webhook_immediate_ack_with_observer( | ||
| b"{}", | ||
| &evidence, | ||
| Some(Arc::clone(&observer) as Arc<dyn ImmediateAckWorkflowObserver>), | ||
| ) | ||
| .await | ||
| .expect("first webhook should be accepted for async dispatch"); | ||
| assert_eq!(first, WebhookProcessOutcome::AcceptedForAsyncDispatch); | ||
|
|
||
| // Wait until the observer is actively blocking — i.e. the workflow has | ||
| // durably accepted and we are now in the delivery phase. | ||
| wait_for_observer_entries(&observer_entered, 1, "durable observer to enter").await; | ||
|
|
||
| // The second webhook must be admitted: admission only gates intake, not | ||
| // the unbounded delivery wait the observer is currently sitting in. | ||
| let second = runner | ||
| .process_verified_webhook_immediate_ack_with_observer( | ||
| b"{}", | ||
| &evidence, | ||
| Some(Arc::clone(&observer) as Arc<dyn ImmediateAckWorkflowObserver>), | ||
| ) | ||
| .await | ||
| .expect("second webhook must be admitted while first is still in delivery"); | ||
| assert_eq!(second, WebhookProcessOutcome::AcceptedForAsyncDispatch); | ||
|
|
||
| observer_released.store(true, Ordering::SeqCst); | ||
| runner.drain_immediate_ack_tasks().await; | ||
| assert_eq!(parse_count.load(Ordering::SeqCst), 2); | ||
| } | ||
|
|
||
| /// Counterpart to the durable-accept test: a *non-durable* ack | ||
| /// (`Rejected(Retryable)`) must KEEP the admission permit across the observer | ||
| /// so admission still backpressures un-accepted intake. This pins the | ||
| /// `is_durable_outcome()` guard so a future edit that drops the permit | ||
| /// unconditionally would be caught. | ||
| #[tokio::test] | ||
| async fn admission_retained_across_observer_for_non_durable_ack() { | ||
| let parse_count = Arc::new(AtomicUsize::new(0)); | ||
| let observer_entered = Arc::new(AtomicUsize::new(0)); | ||
| let observer_released = Arc::new(AtomicBool::new(false)); | ||
| let runner = runner_with_workflow( | ||
| Arc::clone(&parse_count), | ||
| Arc::new(NonDurableRejectWorkflow), | ||
| 1, | ||
| ); | ||
| let observer = Arc::new(BlockingObserver { | ||
| entered: Arc::clone(&observer_entered), | ||
| released: Arc::clone(&observer_released), | ||
| }); | ||
| let evidence = verified_evidence(&runner); | ||
|
|
||
| let first = runner | ||
| .process_verified_webhook_immediate_ack_with_observer( | ||
| b"{}", | ||
| &evidence, | ||
| Some(Arc::clone(&observer) as Arc<dyn ImmediateAckWorkflowObserver>), | ||
| ) | ||
| .await | ||
| .expect("first webhook should be accepted for async dispatch"); | ||
| assert_eq!(first, WebhookProcessOutcome::AcceptedForAsyncDispatch); | ||
|
|
||
| // Wait until the observer is blocking — the workflow returned a | ||
| // non-durable ack, so the permit must still be held at this point. | ||
| wait_for_observer_entries(&observer_entered, 1, "non-durable observer to enter").await; | ||
|
|
||
| // The permit is retained across the observer for a non-durable ack, so a | ||
| // second intake must be rejected while the first is still in delivery. | ||
| let err = runner | ||
| .process_verified_webhook_immediate_ack_with_observer( | ||
| b"{}", | ||
| &evidence, | ||
| Some(Arc::clone(&observer) as Arc<dyn ImmediateAckWorkflowObserver>), | ||
| ) | ||
| .await | ||
| .expect_err("second webhook should be rejected while non-durable permit is held"); | ||
| assert!(matches!( | ||
| err, | ||
| RunnerError::TooManyInFlight { max_in_flight: 1 } | ||
| )); | ||
|
|
||
| observer_released.store(true, Ordering::SeqCst); | ||
| runner.drain_immediate_ack_tasks().await; | ||
| assert_eq!(parse_count.load(Ordering::SeqCst), 1); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn process_verified_webhook_immediate_ack_rejects_mismatched_evidence_type() { | ||
| let parse_count = Arc::new(AtomicUsize::new(0)); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| # Slack admission permit held across delivery poll — fix plan | ||
|
|
||
| Date: 2026-06-25 | ||
| Branch: `fix/reborn-slack-admission-permit` | ||
| Owner crate/file: `crates/ironclaw_wasm_product_adapters/src/runner_immediate_ack.rs` | ||
|
|
||
| ## Verified bug (file:line evidence) | ||
|
|
||
| The immediate-ACK webhook path acquires an **admission** permit and holds it | ||
| across the entire post-ACK task — including the unbounded post-ACK delivery | ||
| observer — instead of releasing it once the inbound is durably accepted. | ||
|
|
||
| - Admission permit acquired in `prepare_inbound_envelope` | ||
| (`runner.rs:316`, `try_acquire_owned` on `admission: Arc<Semaphore>`, | ||
| capacity = `max_in_flight`). | ||
| - In `process_verified_webhook_immediate_ack_with_observer` | ||
| (`runner_immediate_ack.rs`), the permit is moved into the spawned task | ||
| (`let _permit = permit;`) and dropped only when the task ends. | ||
| - The task awaits `tokio::time::timeout(workflow_timeout, submit_inbound)` | ||
| (~2s for Slack via `SLACK_WEBHOOK_WORKFLOW_TIMEOUT`, `slack_host_beta.rs:83`) | ||
| and then, on `Ok(Ok(ack))`, awaits `observer.observe_workflow_ack(...)` | ||
| **outside** the timeout. | ||
| - For Slack the observer is `SlackFinalReplyDeliveryObserver` | ||
| (`slack_delivery.rs:1230`) whose `observe_workflow_ack` calls | ||
| `deliver_final_reply` (`slack_delivery.rs:1392`), which polls the submitted | ||
| run for its final reply for up to `max_wait` — default **120s** | ||
| (`slack_delivery.rs:109`). | ||
| - Net effect: the admission permit is pinned for up to ~120s per turn even | ||
| though `submit_inbound` already returned a durable `Accepted`/`NoOp` ack in | ||
| ~2s. With `SLACK_MAX_IN_FLIGHT_WEBHOOKS = 64` (`slack_host_beta.rs:84`), | ||
| 64 slow turns exhaust all admission slots; further inbound webhooks are | ||
| rejected `TooManyInFlight` (`runner.rs:316-320`) → HTTP 429 | ||
| (`slack_serve.rs:337-340`). Slack retries a bounded number of times; under | ||
| sustained load the 429s persist for the whole delivery window and Slack | ||
| eventually stops retrying → user messages are silently lost. | ||
|
|
||
| The anti-pattern: an **admission/intake** slot conflated with **work duration**, | ||
| held across an unbounded downstream wait (the delivery poll). | ||
|
|
||
| `ProductInboundAck::is_durable_outcome()` (`inbound.rs:852`) already exists and | ||
| is the right signal for "the run is durably accepted; admission can be freed." | ||
|
|
||
| ## Red proof | ||
|
|
||
| New test `admission_released_after_durable_accept_not_held_across_delivery` | ||
| (`runner_immediate_ack.rs`): `max_in_flight = 1`, workflow returns a durable ack | ||
| immediately, observer blocks (models the 120s poll). On the unpatched base the | ||
| second webhook is rejected `TooManyInFlight { max_in_flight: 1 }` → test fails | ||
| RED, exactly reproducing the bug. After the fix the second webhook is admitted → | ||
| GREEN. | ||
|
|
||
| ## Fix (minimal, smallest correct change) | ||
|
|
||
| Decouple admission from delivery **inside the spawned task** in | ||
| `runner_immediate_ack.rs`: | ||
|
|
||
| 1. Keep the admission permit owned by the task. | ||
| 2. Run `submit_inbound` under `workflow_timeout` as today. | ||
| 3. On a **durable** outcome (`ack.is_durable_outcome()`), explicitly | ||
| `drop(permit)` to release the admission slot *before* invoking | ||
| `observer.observe_workflow_ack(...)`. The run is already durably submitted, so | ||
| the long delivery poll no longer consumes an intake slot. | ||
| 4. On workflow error or timeout (no durable acceptance), the permit drops at the | ||
| end of the task as today — error/timeout paths are short and bounded by | ||
| `workflow_timeout`, so holding admission across them is correct backpressure. | ||
| The observer's `observe_workflow_error` is best-effort and short. | ||
|
|
||
| Admission now gates only fast intake (auth/parse/stamp/submit, bounded by | ||
| `workflow_timeout`). The unbounded delivery wait is bounded by its own | ||
| mechanism — the delivery-side machinery in `slack_delivery.rs` (the shared | ||
| delivery semaphore, single-flight per-run guard, and `max_wait`) — which is | ||
| owned by another agent and is **not** modified here. | ||
|
|
||
| ### Why this is sufficient (not over-engineered) | ||
|
|
||
| - The run is durable once `submit_inbound` returns `Accepted`/`NoOp` etc.; the | ||
| reply is produced by the turn runtime independently of admission. Losing the | ||
| delivery poll does not lose the user's message — only the inline push of the | ||
| reply, which the delivery side already bounds and guards. | ||
| - No new durable-retry subsystem is needed: the existing delivery bounds plus | ||
| early admission release remove the silent-drop-under-load failure mode. | ||
| - Change is confined to one function in the file lane we own. | ||
|
|
||
| ### Cancellation / RAII safety | ||
|
|
||
| `OwnedSemaphorePermit` releases on drop on every path (durable → explicit early | ||
| drop; error/timeout/panic → scope-end drop). No `unwrap`/`expect` added. | ||
|
|
||
| ## Guardrail | ||
|
|
||
| Add a note to the crate guidance (module doc / nearest `CLAUDE.md`/`AGENTS.md`): | ||
| an admission/intake permit must NOT be held across an unbounded downstream wait | ||
| (delivery / LLM poll); release it once work is durably accepted and bound the | ||
| work with its own mechanism. | ||
|
|
||
| ## Test / quality gate | ||
|
|
||
| - `cargo fmt --all` | ||
| - `cargo clippy -p ironclaw_wasm_product_adapters --all-targets` (zero warnings) | ||
| - `cargo test -p ironclaw_wasm_product_adapters` (red→green + existing suite) | ||
| - Build the dependent composition crate to ensure no contract drift. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an important rule to add. For better readability and to make it easier to digest as a guideline, consider restructuring this paragraph into a main bullet point with sub-bullets. This will help future developers quickly grasp the core principles.