Skip to content

Commit 015c992

Browse files
committed
feat(cancel): add cancellation support via AtomicBool token
Adds cooperative cancellation to the interpreter. A shared Arc<AtomicBool> token can be set from any thread to abort execution at the next command boundary with Error::Cancelled. Rust API: Bash::cancellation_token() returns the shared token. JS API: Bash.cancel() / BashTool.cancel() + AbortSignal support. Python API: Bash.cancel() / BashTool.cancel(). Closes #541
1 parent 5c2729c commit 015c992

8 files changed

Lines changed: 213 additions & 9 deletions

File tree

crates/bashkit-js/src/lib.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use bashkit::tool::VERSION;
1010
use bashkit::{Bash as RustBash, BashTool as RustBashTool, ExecutionLimits, Tool};
1111
use napi_derive::napi;
1212
use std::sync::Arc;
13+
use std::sync::atomic::{AtomicBool, Ordering};
1314
use tokio::sync::Mutex;
1415

1516
// ============================================================================
@@ -51,6 +52,7 @@ pub struct BashOptions {
5152
pub struct Bash {
5253
inner: Arc<Mutex<RustBash>>,
5354
rt: tokio::runtime::Runtime,
55+
cancelled: Arc<AtomicBool>,
5456
username: Option<String>,
5557
hostname: Option<String>,
5658
max_commands: Option<u32>,
@@ -74,6 +76,7 @@ impl Bash {
7476
opts.max_commands,
7577
opts.max_loop_iterations,
7678
);
79+
let cancelled = bash.cancellation_token();
7780

7881
let rt = tokio::runtime::Builder::new_current_thread()
7982
.enable_all()
@@ -83,6 +86,7 @@ impl Bash {
8386
Ok(Self {
8487
inner: Arc::new(Mutex::new(bash)),
8588
rt,
89+
cancelled,
8690
username: opts.username,
8791
hostname: opts.hostname,
8892
max_commands: opts.max_commands,
@@ -93,6 +97,7 @@ impl Bash {
9397
/// Execute bash commands synchronously.
9498
#[napi]
9599
pub fn execute_sync(&self, commands: String) -> napi::Result<ExecResult> {
100+
self.cancelled.store(false, Ordering::Relaxed);
96101
let inner = self.inner.clone();
97102
self.rt.block_on(async move {
98103
let mut bash = inner.lock().await;
@@ -113,6 +118,15 @@ impl Bash {
113118
})
114119
}
115120

121+
/// Cancel the currently running execution.
122+
///
123+
/// Safe to call from any thread. Execution will abort at the next
124+
/// command boundary.
125+
#[napi]
126+
pub fn cancel(&self) {
127+
self.cancelled.store(true, Ordering::Relaxed);
128+
}
129+
116130
/// Reset interpreter to fresh state, preserving configuration.
117131
#[napi]
118132
pub fn reset(&self) -> napi::Result<()> {
@@ -124,12 +138,13 @@ impl Bash {
124138

125139
self.rt.block_on(async move {
126140
let mut bash = inner.lock().await;
127-
*bash = build_bash(
141+
let new_bash = build_bash(
128142
username.as_deref(),
129143
hostname.as_deref(),
130144
max_commands,
131145
max_loop_iterations,
132146
);
147+
*bash = new_bash;
133148
Ok(())
134149
})
135150
}
@@ -147,6 +162,7 @@ impl Bash {
147162
pub struct BashTool {
148163
inner: Arc<Mutex<RustBash>>,
149164
rt: tokio::runtime::Runtime,
165+
cancelled: Arc<AtomicBool>,
150166
username: Option<String>,
151167
hostname: Option<String>,
152168
max_commands: Option<u32>,
@@ -193,6 +209,7 @@ impl BashTool {
193209
opts.max_commands,
194210
opts.max_loop_iterations,
195211
);
212+
let cancelled = bash.cancellation_token();
196213

197214
let rt = tokio::runtime::Builder::new_current_thread()
198215
.enable_all()
@@ -202,6 +219,7 @@ impl BashTool {
202219
Ok(Self {
203220
inner: Arc::new(Mutex::new(bash)),
204221
rt,
222+
cancelled,
205223
username: opts.username,
206224
hostname: opts.hostname,
207225
max_commands: opts.max_commands,
@@ -212,6 +230,7 @@ impl BashTool {
212230
/// Execute bash commands synchronously.
213231
#[napi]
214232
pub fn execute_sync(&self, commands: String) -> napi::Result<ExecResult> {
233+
self.cancelled.store(false, Ordering::Relaxed);
215234
let inner = self.inner.clone();
216235
self.rt.block_on(async move {
217236
let mut bash = inner.lock().await;
@@ -232,6 +251,12 @@ impl BashTool {
232251
})
233252
}
234253

254+
/// Cancel the currently running execution.
255+
#[napi]
256+
pub fn cancel(&self) {
257+
self.cancelled.store(true, Ordering::Relaxed);
258+
}
259+
235260
/// Reset interpreter to fresh state, preserving configuration.
236261
#[napi]
237262
pub fn reset(&self) -> napi::Result<()> {
@@ -243,12 +268,13 @@ impl BashTool {
243268

244269
self.rt.block_on(async move {
245270
let mut bash = inner.lock().await;
246-
*bash = build_bash(
271+
let new_bash = build_bash(
247272
username.as_deref(),
248273
hostname.as_deref(),
249274
max_commands,
250275
max_loop_iterations,
251276
);
277+
*bash = new_bash;
252278
Ok(())
253279
})
254280
}

crates/bashkit-js/wrapper.ts

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,45 @@ export class Bash {
5858

5959
/**
6060
* Execute bash commands synchronously and return the result.
61+
*
62+
* If `signal` is provided, the execution will be cancelled when the signal
63+
* is aborted. The result will have `error: "execution cancelled"`.
6164
*/
62-
executeSync(commands: string): ExecResult {
65+
executeSync(commands: string, options?: { signal?: AbortSignal }): ExecResult {
66+
if (options?.signal) {
67+
const signal = options.signal;
68+
if (signal.aborted) {
69+
return { stdout: "", stderr: "", exitCode: 1, error: "execution cancelled" };
70+
}
71+
const onAbort = () => this.native.cancel();
72+
signal.addEventListener("abort", onAbort, { once: true });
73+
try {
74+
return this.native.executeSync(commands);
75+
} finally {
76+
signal.removeEventListener("abort", onAbort);
77+
}
78+
}
6379
return this.native.executeSync(commands);
6480
}
6581

6682
/**
6783
* Execute bash commands synchronously. Throws `BashError` on non-zero exit.
6884
*/
69-
executeSyncOrThrow(commands: string): ExecResult {
70-
const result = this.native.executeSync(commands);
85+
executeSyncOrThrow(commands: string, options?: { signal?: AbortSignal }): ExecResult {
86+
const result = this.executeSync(commands, options);
7187
if (result.exitCode !== 0) {
7288
throw new BashError(result);
7389
}
7490
return result;
7591
}
7692

93+
/**
94+
* Cancel the currently running execution.
95+
*/
96+
cancel(): void {
97+
this.native.cancel();
98+
}
99+
77100
/**
78101
* Reset interpreter to fresh state, preserving configuration.
79102
*/
@@ -110,21 +133,41 @@ export class BashTool {
110133
/**
111134
* Execute bash commands synchronously and return the result.
112135
*/
113-
executeSync(commands: string): ExecResult {
136+
executeSync(commands: string, options?: { signal?: AbortSignal }): ExecResult {
137+
if (options?.signal) {
138+
const signal = options.signal;
139+
if (signal.aborted) {
140+
return { stdout: "", stderr: "", exitCode: 1, error: "execution cancelled" };
141+
}
142+
const onAbort = () => this.native.cancel();
143+
signal.addEventListener("abort", onAbort, { once: true });
144+
try {
145+
return this.native.executeSync(commands);
146+
} finally {
147+
signal.removeEventListener("abort", onAbort);
148+
}
149+
}
114150
return this.native.executeSync(commands);
115151
}
116152

117153
/**
118154
* Execute bash commands synchronously. Throws `BashError` on non-zero exit.
119155
*/
120-
executeSyncOrThrow(commands: string): ExecResult {
121-
const result = this.native.executeSync(commands);
156+
executeSyncOrThrow(commands: string, options?: { signal?: AbortSignal }): ExecResult {
157+
const result = this.executeSync(commands, options);
122158
if (result.exitCode !== 0) {
123159
throw new BashError(result);
124160
}
125161
return result;
126162
}
127163

164+
/**
165+
* Cancel the currently running execution.
166+
*/
167+
cancel(): void {
168+
this.native.cancel();
169+
}
170+
128171
/**
129172
* Reset interpreter to fresh state, preserving configuration.
130173
*/

crates/bashkit-python/src/lib.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use pyo3::prelude::*;
1515
use pyo3::types::{PyDict, PyList};
1616
use pyo3_async_runtimes::tokio::future_into_py;
1717
use std::sync::Arc;
18+
use std::sync::atomic::{AtomicBool, Ordering};
1819
use tokio::sync::Mutex;
1920

2021
// ============================================================================
@@ -193,6 +194,7 @@ pub struct PyBash {
193194
/// Shared tokio runtime — reused across all sync calls to avoid
194195
/// per-call OS thread/fd exhaustion (issue #414).
195196
rt: tokio::runtime::Runtime,
197+
cancelled: Arc<AtomicBool>,
196198
username: Option<String>,
197199
hostname: Option<String>,
198200
max_commands: Option<u64>,
@@ -228,6 +230,7 @@ impl PyBash {
228230
builder = builder.limits(limits);
229231

230232
let bash = builder.build();
233+
let cancelled = bash.cancellation_token();
231234

232235
let rt = tokio::runtime::Builder::new_current_thread()
233236
.enable_all()
@@ -237,13 +240,22 @@ impl PyBash {
237240
Ok(Self {
238241
inner: Arc::new(Mutex::new(bash)),
239242
rt,
243+
cancelled,
240244
username,
241245
hostname,
242246
max_commands,
243247
max_loop_iterations,
244248
})
245249
}
246250

251+
/// Cancel the currently running execution.
252+
///
253+
/// Safe to call from any thread. Execution will abort at the next
254+
/// command boundary.
255+
fn cancel(&self) {
256+
self.cancelled.store(true, Ordering::Relaxed);
257+
}
258+
247259
/// Execute commands asynchronously.
248260
fn execute<'py>(&self, py: Python<'py>, commands: String) -> PyResult<Bound<'py, PyAny>> {
249261
let inner = self.inner.clone();
@@ -371,6 +383,7 @@ pub struct BashTool {
371383
/// Shared tokio runtime — reused across all sync calls to avoid
372384
/// per-call OS thread/fd exhaustion (issue #414).
373385
rt: tokio::runtime::Runtime,
386+
cancelled: Arc<AtomicBool>,
374387
username: Option<String>,
375388
hostname: Option<String>,
376389
max_commands: Option<u64>,
@@ -429,6 +442,7 @@ impl BashTool {
429442
builder = builder.limits(limits);
430443

431444
let bash = builder.build();
445+
let cancelled = bash.cancellation_token();
432446

433447
let rt = tokio::runtime::Builder::new_current_thread()
434448
.enable_all()
@@ -438,13 +452,19 @@ impl BashTool {
438452
Ok(Self {
439453
inner: Arc::new(Mutex::new(bash)),
440454
rt,
455+
cancelled,
441456
username,
442457
hostname,
443458
max_commands,
444459
max_loop_iterations,
445460
})
446461
}
447462

463+
/// Cancel the currently running execution.
464+
fn cancel(&self) {
465+
self.cancelled.store(true, Ordering::Relaxed);
466+
}
467+
448468
fn execute<'py>(&self, py: Python<'py>, commands: String) -> PyResult<Bound<'py, PyAny>> {
449469
let inner = self.inner.clone();
450470
future_into_py(py, async move {

crates/bashkit/src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ pub enum Error {
5353
#[error("regex error: {0}")]
5454
Regex(#[from] regex::Error),
5555

56+
/// Execution was cancelled via the cancellation token.
57+
#[error("execution cancelled")]
58+
Cancelled,
59+
5660
/// Internal error for unexpected failures.
5761
///
5862
/// THREAT[TM-INT-002]: Unexpected internal failures should not crash the interpreter.

crates/bashkit/src/interpreter/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::collections::{HashMap, HashSet};
2222
use std::panic::AssertUnwindSafe;
2323
use std::path::{Path, PathBuf};
2424
use std::sync::Arc;
25-
use std::sync::atomic::{AtomicU64, Ordering};
25+
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2626

2727
/// Monotonic counter for unique process substitution file paths
2828
static PROC_SUB_COUNTER: AtomicU64 = AtomicU64::new(0);
@@ -304,6 +304,9 @@ pub struct Interpreter {
304304
/// Aliases currently being expanded (prevents infinite recursion).
305305
/// When alias `foo` expands to `foo bar`, the inner `foo` is not re-expanded.
306306
expanding_aliases: HashSet<String>,
307+
/// Cancellation token: when set to `true`, execution aborts at the next
308+
/// command boundary with `Error::Cancelled`.
309+
cancelled: Arc<AtomicBool>,
307310
}
308311

309312
impl Interpreter {
@@ -560,6 +563,22 @@ impl Interpreter {
560563
pipestatus: Vec::new(),
561564
aliases: HashMap::new(),
562565
expanding_aliases: HashSet::new(),
566+
cancelled: Arc::new(AtomicBool::new(false)),
567+
}
568+
}
569+
570+
/// Return a shared cancellation token. Set it to `true` from any thread
571+
/// to abort execution at the next command boundary.
572+
pub fn cancellation_token(&self) -> Arc<AtomicBool> {
573+
Arc::clone(&self.cancelled)
574+
}
575+
576+
/// Check if cancellation has been requested.
577+
fn check_cancelled(&self) -> Result<()> {
578+
if self.cancelled.load(Ordering::Relaxed) {
579+
Err(crate::error::Error::Cancelled)
580+
} else {
581+
Ok(())
563582
}
564583
}
565584

@@ -716,6 +735,7 @@ impl Interpreter {
716735
let mut exit_code = 0;
717736

718737
for command in &script.commands {
738+
self.check_cancelled()?;
719739
let emit_before = self.output_emit_count;
720740
let result = self.execute_command(command).await?;
721741
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
@@ -805,6 +825,7 @@ impl Interpreter {
805825
command: &'a Command,
806826
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<ExecResult>> + Send + 'a>> {
807827
Box::pin(async move {
828+
self.check_cancelled()?;
808829
// Update current line for $LINENO
809830
self.current_line = Self::command_line(command);
810831

crates/bashkit/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,17 @@ impl Bash {
679679
result
680680
}
681681

682+
/// Return a shared cancellation token.
683+
///
684+
/// Set the token to `true` from any thread to abort execution at the next
685+
/// command boundary with [`Error::Cancelled`].
686+
///
687+
/// The caller is responsible for resetting the flag to `false` before
688+
/// calling `exec()` again.
689+
pub fn cancellation_token(&self) -> Arc<std::sync::atomic::AtomicBool> {
690+
self.interpreter.cancellation_token()
691+
}
692+
682693
/// Get a clone of the underlying filesystem.
683694
///
684695
/// Provides direct access to the virtual filesystem for:

0 commit comments

Comments
 (0)