Skip to content

Commit 1abf718

Browse files
committed
Add support for asynchronous scanning
1 parent 48eb900 commit 1abf718

14 files changed

Lines changed: 1164 additions & 482 deletions

File tree

crates/ark/src/lsp/main_loop.rs

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
//
77

88
use std::collections::HashMap;
9+
use std::collections::HashSet;
910
use std::future;
1011
use std::path::Path;
1112
use std::path::PathBuf;
@@ -15,12 +16,17 @@ use std::sync::atomic::Ordering;
1516
use std::sync::LazyLock;
1617
use std::sync::RwLock;
1718

19+
use aether_url::UrlId;
1820
use anyhow::anyhow;
1921
use futures::stream::FuturesUnordered;
2022
use futures::StreamExt;
2123
use oak_db::OakDatabase;
2224
use oak_scan::DbExt;
25+
use oak_scan::ScanCompleted;
26+
use oak_scan::ScanRequest;
27+
use oak_scan::ScanScheduler;
2328
use oak_semantic::library::Library;
29+
use stdext::result::ResultExt;
2430
use tokio::sync::mpsc;
2531
use tokio::sync::mpsc::unbounded_channel as tokio_unbounded_channel;
2632
use tokio::task;
@@ -88,6 +94,7 @@ type TaskList<T> = futures::stream::FuturesUnordered<Pin<Box<dyn AnyhowJoinHandl
8894
pub(crate) enum Event {
8995
Lsp(LspMessage),
9096
Kernel(KernelNotification),
97+
OakScanCompleted(ScanCompleted),
9198
}
9299

93100
#[derive(Debug)]
@@ -151,8 +158,9 @@ pub(crate) struct GlobalState {
151158
events_rx: TokioUnboundedReceiver<Event>,
152159
}
153160

154-
/// Unlike `WorldState`, `ParserState` cannot be cloned and is only accessed by
155-
/// exclusive handlers.
161+
/// Non-cloneable, per-session state mutated only by exclusive handlers.
162+
/// Sits alongside [`WorldState`] (which is cloneable for snapshot
163+
/// handlers); state that can't be cloned lives here instead.
156164
pub(crate) struct LspState {
157165
/// The set of tree-sitter document parsers managed by the `GlobalState`.
158166
pub(crate) parsers: HashMap<Url, tree_sitter::Parser>,
@@ -162,6 +170,11 @@ pub(crate) struct LspState {
162170

163171
/// Channel for sending notifications to Console (e.g., document changes for DAP)
164172
pub(crate) console_notification_tx: TokioUnboundedSender<ConsoleNotification>,
173+
174+
/// Coordinator for asynchronous workspace scans. Mutated only from
175+
/// main-loop handlers. Must be out of [`WorldState`] because the scheduler
176+
/// is not clonable.
177+
pub(crate) oak_scheduler: ScanScheduler,
165178
}
166179

167180
/// State for the auxiliary loop
@@ -199,6 +212,7 @@ impl GlobalState {
199212
parsers: HashMap::new(),
200213
capabilities: Capabilities::default(),
201214
console_notification_tx,
215+
oak_scheduler: ScanScheduler::new(),
202216
};
203217

204218
// FIXME: We shouldn't call R code in the kernel to figure this out
@@ -299,13 +313,15 @@ impl GlobalState {
299313
handlers::handle_initialized(&self.client, &self.lsp_state).await?;
300314
},
301315
LspNotification::DidChangeWorkspaceFolders(params) => {
302-
state_handlers::did_change_workspace_folders(params, &mut self.world)?;
316+
let pending = state_handlers::did_change_workspace_folders(params, &mut self.world, &mut self.lsp_state)?;
317+
dispatch_scan_requests(&self.events_tx, pending);
303318
},
304319
LspNotification::DidChangeConfiguration(params) => {
305320
state_handlers::did_change_configuration(params, &self.client, &mut self.world).await?;
306321
},
307322
LspNotification::DidChangeWatchedFiles(params) => {
308-
state_handlers::did_change_watched_files(params, &mut self.world)?;
323+
let pending = state_handlers::did_change_watched_files(params, &mut self.world, &mut self.lsp_state)?;
324+
dispatch_scan_requests(&self.events_tx, pending);
309325
},
310326
LspNotification::DidOpenTextDocument(params) => {
311327
state_handlers::did_open(params, &mut self.lsp_state, &mut self.world)?;
@@ -336,7 +352,8 @@ impl GlobalState {
336352

337353
match request {
338354
LspRequest::Initialize(params) => {
339-
respond(tx, || state_handlers::initialize(params, &mut self.lsp_state, &mut self.world), LspResponse::Initialize)?;
355+
let pending = respond_with(tx, || state_handlers::initialize(params, &mut self.lsp_state, &mut self.world), LspResponse::Initialize)?;
356+
dispatch_scan_requests(&self.events_tx, pending);
340357
},
341358
LspRequest::WorkspaceSymbol(params) => {
342359
respond(tx, || handlers::handle_symbol(params, &self.world), LspResponse::WorkspaceSymbol)?;
@@ -420,6 +437,25 @@ impl GlobalState {
420437
}
421438
}
422439
},
440+
441+
Event::OakScanCompleted(scan) => {
442+
// Recompute editor-owned files at apply time, not at spawn
443+
// time: a buffer may have opened or closed since the scan
444+
// kicked off. The buffer-drain inside `apply_scan_completed` uses
445+
// this set as its watcher-event `skip` argument.
446+
let editor_owned: HashSet<UrlId> = self.world
447+
.documents
448+
.keys()
449+
.map(|url| UrlId::from_url(url.clone()))
450+
.collect();
451+
452+
let followups = self.lsp_state.oak_scheduler.apply_scan_completed(
453+
&mut self.world.oak,
454+
scan,
455+
&editor_owned,
456+
);
457+
dispatch_scan_requests(&self.events_tx, followups);
458+
},
423459
}
424460

425461
// TODO Make this threshold configurable by the client
@@ -452,6 +488,24 @@ impl GlobalState {
452488
}
453489
}
454490

491+
/// Spawn each [`ScanRequest`] on a blocking task. Each task runs the
492+
/// pure-I/O [`ScanRequest::run`] and ships the [`ScanCompleted`] back
493+
/// to the main loop as [`Event::OakScanCompleted`], where the scheduler
494+
/// then applies it.
495+
pub(super) fn dispatch_scan_requests(
496+
events_tx: &TokioUnboundedSender<Event>,
497+
requests: Vec<ScanRequest>,
498+
) {
499+
for req in requests {
500+
let tx = events_tx.clone();
501+
spawn_blocking(move || {
502+
let scan = req.run();
503+
tx.send(Event::OakScanCompleted(scan)).log_err();
504+
Ok(None)
505+
});
506+
}
507+
}
508+
455509
/// Respond to a request from the LSP
456510
///
457511
/// We receive requests from the LSP client with a response channel. Once we
@@ -475,11 +529,28 @@ fn respond<T>(
475529
response: impl FnOnce() -> LspResult<T>,
476530
into_lsp_response: impl FnOnce(T) -> LspResponse,
477531
) -> anyhow::Result<()> {
478-
let response = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(response)) {
479-
Ok(response) => {
480-
let response = response.map(into_lsp_response);
481-
RequestResponse::Result(response)
482-
},
532+
respond_with(
533+
response_tx,
534+
|| response().map(|t| (t, ())),
535+
into_lsp_response,
536+
)
537+
}
538+
539+
/// Variant of [`respond`] for handlers that produce a side output alongside
540+
/// their LSP response. The `S` value is returned to the caller on success;
541+
/// on handler error or panic the side output is `S::default()` (the caller
542+
/// gets the "do nothing" value, while the error response still flows to
543+
/// the client through `response_tx`). Used by `Initialize` to ship the
544+
/// scheduler's pending `ScanRequest`s back to the main loop without an
545+
/// out-parameter.
546+
fn respond_with<T, S: Default>(
547+
response_tx: TokioUnboundedSender<RequestResponse>,
548+
response: impl FnOnce() -> LspResult<(T, S)>,
549+
into_lsp_response: impl FnOnce(T) -> LspResponse,
550+
) -> anyhow::Result<S> {
551+
let (response, side) = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(response)) {
552+
Ok(Ok((t, s))) => (RequestResponse::Result(Ok(into_lsp_response(t))), s),
553+
Ok(Err(e)) => (RequestResponse::Result(Err(e)), S::default()),
483554
Err(err) => {
484555
// Set global crash flag to disable the LSP
485556
LSP_HAS_CRASHED.store(true, Ordering::Release);
@@ -495,19 +566,22 @@ fn respond<T>(
495566
// This creates an uninformative backtrace that is reported in the
496567
// LSP logs. Note that the relevant backtrace is the one created by
497568
// our panic hook and reported via the _kernel_ logs.
498-
RequestResponse::Crashed(anyhow!("Panic occurred while handling request: {msg}"))
569+
(
570+
RequestResponse::Crashed(anyhow!("Panic occurred while handling request: {msg}")),
571+
S::default(),
572+
)
499573
},
500574
};
501575

502576
let out = match response {
503-
RequestResponse::Result(Ok(_)) => Ok(()),
577+
RequestResponse::Result(Ok(_)) => Ok(side),
504578
RequestResponse::Result(Err(ref error)) => {
505579
// The error has already been sent to the client on `response_tx`
506580
// as a jsonrpc error, so the user sees the popup. Log here at
507581
// info level (with `{:?}` for the full debug format including a
508582
// backtrace) so server logs keep diagnostic context.
509583
lsp::log_info!("Error while handling request:\n{error:?}");
510-
Ok(())
584+
Ok(side)
511585
},
512586
RequestResponse::Crashed(ref error) => {
513587
Err(anyhow!("Crashed while handling request:\n{error:?}"))

crates/ark/src/lsp/state_handlers.rs

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use anyhow::anyhow;
1313
use oak_scan::DbExt;
1414
use oak_scan::FileEvent;
1515
use oak_scan::FileEventKind;
16+
use oak_scan::ScanRequest;
1617
use oak_semantic::package::Package;
1718
use stdext::result::ResultExt;
1819
use tower_lsp::lsp_types;
@@ -94,7 +95,7 @@ pub(crate) fn initialize(
9495
params: InitializeParams,
9596
lsp_state: &mut LspState,
9697
state: &mut WorldState,
97-
) -> LspResult<InitializeResult> {
98+
) -> LspResult<(InitializeResult, Vec<ScanRequest>)> {
9899
let workspace_uris = effective_workspace_uris(&params);
99100
lsp_state.capabilities = Capabilities::new(params.capabilities);
100101

@@ -147,12 +148,14 @@ pub(crate) fn initialize(
147148
.keys()
148149
.map(|url| UrlId::from_url(url.clone()))
149150
.collect();
150-
state
151-
.oak
152-
.set_workspace_paths(&workspace_paths, &editor_owned);
151+
let pending = lsp_state.oak_scheduler.set_workspace_paths(
152+
&mut state.oak,
153+
&workspace_paths,
154+
&editor_owned,
155+
);
153156
lsp::main_loop::index_start(folders, state.clone());
154157

155-
Ok(InitializeResult {
158+
let result = InitializeResult {
156159
server_info: Some(ServerInfo {
157160
name: "Ark R Kernel".to_string(),
158161
version: Some(crate::BUILD_VERSION.to_string()),
@@ -234,7 +237,9 @@ pub(crate) fn initialize(
234237
}),
235238
..ServerCapabilities::default()
236239
},
237-
})
240+
};
241+
242+
Ok((result, pending))
238243
}
239244

240245
/// Resolve the effective workspace folders from `InitializeParams`.
@@ -400,7 +405,8 @@ pub(crate) fn did_rename_files(
400405
pub(crate) fn did_change_watched_files(
401406
params: DidChangeWatchedFilesParams,
402407
state: &mut WorldState,
403-
) -> anyhow::Result<()> {
408+
lsp_state: &mut LspState,
409+
) -> anyhow::Result<Vec<ScanRequest>> {
404410
// Editor owns the contents of files it has open: Oak should ignore
405411
// disk-side events for those URLs.
406412
let editor_owned: HashSet<UrlId> = state
@@ -426,15 +432,18 @@ pub(crate) fn did_change_watched_files(
426432
})
427433
.collect();
428434

429-
state.oak.apply_watcher_events(events, &editor_owned);
430-
Ok(())
435+
let pending = lsp_state
436+
.oak_scheduler
437+
.apply_watcher_events(&mut state.oak, events, &editor_owned);
438+
Ok(pending)
431439
}
432440

433441
#[tracing::instrument(level = "info", skip_all)]
434442
pub(crate) fn did_change_workspace_folders(
435443
params: DidChangeWorkspaceFoldersParams,
436444
state: &mut WorldState,
437-
) -> anyhow::Result<()> {
445+
lsp_state: &mut LspState,
446+
) -> anyhow::Result<Vec<ScanRequest>> {
438447
let removed: HashSet<Url> = params.event.removed.iter().map(|f| f.uri.clone()).collect();
439448
state.workspace.folders.retain(|uri| !removed.contains(uri));
440449

@@ -460,10 +469,12 @@ pub(crate) fn did_change_workspace_folders(
460469
.map(|url| UrlId::from_url(url.clone()))
461470
.collect();
462471

463-
state
464-
.oak
465-
.set_workspace_paths(&workspace_paths, &editor_owned);
466-
Ok(())
472+
let pending = lsp_state.oak_scheduler.set_workspace_paths(
473+
&mut state.oak,
474+
&workspace_paths,
475+
&editor_owned,
476+
);
477+
Ok(pending)
467478
}
468479

469480
fn parse_uri_or_none(uri: &str) -> Option<url::Url> {

crates/ark/src/lsp/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod find_references;
22
mod goto_definition;
3+
mod main_loop;
34
mod rename;
45
mod state_handlers;
56
mod utils;
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
//! End-to-end smoke test of the async workspace scan path.
2+
//!
3+
//! Exercises the real tokio plumbing the LSP main loop relies on:
4+
//! [`dispatch_scan_requests`] spawning [`ScanRequest::run`] on a
5+
//! blocking task, the mpsc round-trip back as [`Event::OakScanCompleted`],
6+
//! and the main-loop apply step. The rest of the scheduler is unit
7+
//! tested without tokio in `oak_scan`; this test pins the wiring.
8+
9+
use std::collections::HashSet;
10+
use std::fs;
11+
use std::time::Duration;
12+
13+
use oak_db::DbInputs;
14+
use oak_db::OakDatabase;
15+
use oak_scan::ScanScheduler;
16+
use tokio::runtime::Runtime;
17+
use tokio::time::timeout;
18+
19+
use crate::lsp::main_loop::dispatch_scan_requests;
20+
use crate::lsp::main_loop::init_aux_for_test;
21+
use crate::lsp::main_loop::Event;
22+
23+
#[test]
24+
fn test_workspace_scan_round_trip_through_tokio() {
25+
// `dispatch_scan_requests` routes through `lsp::spawn_blocking`, which
26+
// hands the `JoinHandle` to the aux loop. Without an aux tx the spawn
27+
// helper drops the panic-logging path; init a no-op aux for the test.
28+
init_aux_for_test();
29+
30+
let tmp = tempfile::tempdir().unwrap();
31+
fs::create_dir_all(tmp.path().join("pkg/R")).unwrap();
32+
fs::write(
33+
tmp.path().join("pkg/DESCRIPTION"),
34+
"Package: pkg\nVersion: 0.0.0\n",
35+
)
36+
.unwrap();
37+
fs::write(tmp.path().join("pkg/R/a.R"), "x <- 1\n").unwrap();
38+
39+
let rt = Runtime::new().unwrap();
40+
rt.block_on(async {
41+
let mut db = OakDatabase::new();
42+
let mut scheduler = ScanScheduler::new();
43+
let (events_tx, mut events_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
44+
45+
// Kick off the scan as `did_change_workspace_folders` would.
46+
let initial =
47+
scheduler.set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new());
48+
assert_eq!(initial.len(), 1);
49+
dispatch_scan_requests(&events_tx, initial);
50+
51+
// Pump events until the scheduler stops issuing followups. Each
52+
// iteration exercises the real spawn_blocking + mpsc + apply
53+
// round-trip the main loop performs in production.
54+
loop {
55+
let event = timeout(Duration::from_secs(5), events_rx.recv())
56+
.await
57+
.expect("scan timed out")
58+
.expect("event channel closed");
59+
let Event::OakScanCompleted(scan) = event else {
60+
panic!("unexpected event variant");
61+
};
62+
let followups = scheduler.apply_scan_completed(&mut db, scan, &HashSet::new());
63+
if followups.is_empty() {
64+
break;
65+
}
66+
dispatch_scan_requests(&events_tx, followups);
67+
}
68+
69+
// Workspace state reflects the on-disk package.
70+
let roots = db.workspace_roots().roots(&db).clone();
71+
assert_eq!(roots.len(), 1);
72+
let packages = roots[0].packages(&db).clone();
73+
assert_eq!(packages.len(), 1);
74+
assert_eq!(packages[0].name(&db), "pkg");
75+
assert_eq!(packages[0].files(&db).len(), 1);
76+
});
77+
}

0 commit comments

Comments
 (0)