Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 119 additions & 21 deletions crates/ark/src/lsp/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//

use std::collections::HashMap;
use std::collections::HashSet;
use std::future;
use std::path::Path;
use std::path::PathBuf;
Expand All @@ -15,12 +16,17 @@ use std::sync::atomic::Ordering;
use std::sync::LazyLock;
use std::sync::RwLock;

use aether_url::UrlId;
use anyhow::anyhow;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use oak_db::OakDatabase;
use oak_scan::DbScan;
use oak_scan::ScanCompleted;
use oak_scan::ScanRequest;
use oak_scan::ScanScheduler;
use oak_semantic::library::Library;
use stdext::result::ResultExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::unbounded_channel as tokio_unbounded_channel;
use tokio::task;
Expand Down Expand Up @@ -88,6 +94,7 @@ type TaskList<T> = futures::stream::FuturesUnordered<Pin<Box<dyn AnyhowJoinHandl
pub(crate) enum Event {
Lsp(LspMessage),
Kernel(KernelNotification),
OakScanCompleted(ScanCompleted),
}

#[derive(Debug)]
Expand Down Expand Up @@ -151,8 +158,9 @@ pub(crate) struct GlobalState {
events_rx: TokioUnboundedReceiver<Event>,
}

/// Unlike `WorldState`, `ParserState` cannot be cloned and is only accessed by
/// exclusive handlers.
/// Non-cloneable, per-session state mutated only by exclusive handlers.
/// Sits alongside [`WorldState`] (which is cloneable for snapshot
/// handlers); state that can't be cloned lives here instead.
pub(crate) struct LspState {
/// The set of tree-sitter document parsers managed by the `GlobalState`.
pub(crate) parsers: HashMap<Url, tree_sitter::Parser>,
Expand All @@ -162,6 +170,11 @@ pub(crate) struct LspState {

/// Channel for sending notifications to Console (e.g., document changes for DAP)
pub(crate) console_notification_tx: TokioUnboundedSender<ConsoleNotification>,

/// Coordinator for asynchronous workspace scans. Mutated only from
/// main-loop handlers. Must be out of [`WorldState`] because the scheduler
/// is not clonable.
pub(crate) oak_scheduler: ScanScheduler,
}

/// State for the auxiliary loop
Expand Down Expand Up @@ -191,16 +204,6 @@ impl GlobalState {
_r_home: PathBuf,
console_notification_tx: TokioUnboundedSender<ConsoleNotification>,
) -> Self {
// Transmission channel for the main loop events. Shared with the
// tower-lsp backend and the Jupyter kernel.
let (events_tx, events_rx) = tokio_unbounded_channel::<Event>();

let lsp_state = LspState {
parsers: HashMap::new(),
capabilities: Capabilities::default(),
console_notification_tx,
};

// FIXME: We shouldn't call R code in the kernel to figure this out
let library_paths = crate::r_task(|| -> anyhow::Result<Vec<String>> {
Ok(harp::RFunction::new("base", ".libPaths")
Expand All @@ -223,8 +226,34 @@ impl GlobalState {

let library = Library::new(library_paths);

Self::from_parts(
client,
console_notification_tx,
WorldState::new(db, library),
)
}

/// Assemble the state around an already-built `WorldState`. Splitting this
/// out from [`GlobalState::new`] lets tests construct a state without the
/// R `.libPaths()` lookup that `new` does.
fn from_parts(
client: Client,
console_notification_tx: TokioUnboundedSender<ConsoleNotification>,
world: WorldState,
) -> Self {
// Transmission channel for the main loop events. Shared with the
// tower-lsp backend and the Jupyter kernel.
let (events_tx, events_rx) = tokio_unbounded_channel::<Event>();

let lsp_state = LspState {
parsers: HashMap::new(),
capabilities: Capabilities::default(),
console_notification_tx,
oak_scheduler: ScanScheduler::new(),
};

Self {
world: WorldState::new(db, library),
world,
lsp_state,
client,
events_tx,
Expand Down Expand Up @@ -299,13 +328,13 @@ impl GlobalState {
handlers::handle_initialized(&self.client, &self.lsp_state).await?;
},
LspNotification::DidChangeWorkspaceFolders(params) => {
state_handlers::did_change_workspace_folders(params, &mut self.world)?;
state_handlers::did_change_workspace_folders(params, &mut self.world, &mut self.lsp_state, &self.events_tx)?;
},
LspNotification::DidChangeConfiguration(params) => {
state_handlers::did_change_configuration(params, &self.client, &mut self.world).await?;
},
LspNotification::DidChangeWatchedFiles(params) => {
state_handlers::did_change_watched_files(params, &mut self.world)?;
state_handlers::did_change_watched_files(params, &mut self.world, &mut self.lsp_state, &self.events_tx)?;
},
LspNotification::DidOpenTextDocument(params) => {
state_handlers::did_open(params, &mut self.lsp_state, &mut self.world)?;
Expand Down Expand Up @@ -336,7 +365,7 @@ impl GlobalState {

match request {
LspRequest::Initialize(params) => {
respond(tx, || state_handlers::initialize(params, &mut self.lsp_state, &mut self.world), LspResponse::Initialize)?;
respond(tx, || state_handlers::initialize(params, &mut self.lsp_state, &mut self.world, &self.events_tx), LspResponse::Initialize)?;
},
LspRequest::WorkspaceSymbol(params) => {
respond(tx, || handlers::handle_symbol(params, &self.world), LspResponse::WorkspaceSymbol)?;
Expand Down Expand Up @@ -420,6 +449,25 @@ impl GlobalState {
}
}
},

Event::OakScanCompleted(scan) => {
// Recompute editor-owned files at apply time, not at spawn
// time: a buffer may have opened or closed since the scan
// kicked off. The buffer-drain inside `apply_scan_completed` uses
// this set as its watcher-event `skip` argument.
let editor_owned: HashSet<UrlId> = self.world
.documents
.keys()
.map(|url| UrlId::from_url(url.clone()))
.collect();

let followups = self.lsp_state.oak_scheduler.apply_scan_completed(
&mut self.world.db,
scan,
&editor_owned,
);
dispatch_scan_requests(&self.events_tx, followups);
},
}

// TODO Make this threshold configurable by the client
Expand Down Expand Up @@ -452,6 +500,54 @@ impl GlobalState {
}
}

/// Test-only methods for driving the main loop without R or a live LSP
/// connection. Kept here, next to the loop they exercise, so the pump uses the
/// real `handle_event()` and the private channels rather than a reconstruction.
#[cfg(test)]
impl GlobalState {
/// Build a state with an empty db and no R library paths. Takes a `client`
/// because the struct holds one, but the event paths exercised in tests
/// never touch it.
pub(crate) fn new_test(client: Client) -> Self {
let (console_notification_tx, _) = tokio_unbounded_channel::<ConsoleNotification>();
let world = WorldState::new(OakDatabase::new(), Library::new(vec![]));
Self::from_parts(client, console_notification_tx, world)
}

/// Run `event` through the real `handle_event`, then pump the scan
/// completions it spawns until the scheduler goes idle. This is what
/// `main_loop()` does, minus the surrounding `loop`.
pub(crate) async fn handle_event_to_quiescence(&mut self, event: Event) {
self.handle_event(event).await.unwrap();
while self.lsp_state.oak_scheduler.has_pending_scans() {
let event = self.next_event().await;
self.handle_event(event).await.unwrap();
}
}

pub(crate) fn world(&self) -> &WorldState {
&self.world
}
}

/// Spawn each [`ScanRequest`] on a blocking task. Each task runs the
/// pure-I/O [`ScanRequest::run`] and ships the [`ScanCompleted`] back
/// to the main loop as [`Event::OakScanCompleted`], where the scheduler
/// then applies it.
pub(super) fn dispatch_scan_requests(
events_tx: &TokioUnboundedSender<Event>,
requests: Vec<ScanRequest>,
) {
for req in requests {
let tx = events_tx.clone();
spawn_blocking(move || {
let scan = req.run();
tx.send(Event::OakScanCompleted(scan)).log_err();
Ok(None)
});
}
}
Comment on lines +537 to +549

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than tweaking the value of each LSP Request to possibly return ScanRequests, I think I would much rather allow state_handlers::initialize() and state_handlers::did_change_workspace_folders() to kick off the scan requests themselves.

i.e. if you just pass events_tx to each of those, they should have everything they need to kick off a spawn_blocking() call.

That way you can, for example, keep the dispatch close to the lsp_state.oak_scheduler.set_workspace_paths() call that requires it, which seems nice to me.

I also really like that that would mean that we would not need the respond_with() changes (I'm not super comfortable with how the ::default() behavior there works) and tweaks to the return values just for this one feature.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This one feels important enough to me to talk about)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this idea has merit, with dispatch_scan_requests() calls moving right after oak_scheduler usage like this

Screenshot 2026-06-08 at 2.59.55 PM.png

and then this nicely simplifies back to where we started

Screenshot 2026-06-08 at 3.00.24 PM.png

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea


/// Respond to a request from the LSP
///
/// We receive requests from the LSP client with a response channel. Once we
Expand All @@ -476,10 +572,8 @@ fn respond<T>(
into_lsp_response: impl FnOnce(T) -> LspResponse,
) -> anyhow::Result<()> {
let response = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(response)) {
Ok(response) => {
let response = response.map(into_lsp_response);
RequestResponse::Result(response)
},
Ok(Ok(t)) => RequestResponse::Result(Ok(into_lsp_response(t))),
Ok(Err(e)) => RequestResponse::Result(Err(e)),
Err(err) => {
// Set global crash flag to disable the LSP
LSP_HAS_CRASHED.store(true, Ordering::Release);
Expand Down Expand Up @@ -1047,10 +1141,14 @@ pub(crate) fn diagnostics_refresh_all(state: WorldState) {
continue;
}

// The task sits in the indexer queue off the main loop.
// `legacy_snapshot()` hands it a detached oak db so it can't pin the
// live one against the main loop's next `set_*` (diagnostics read only
// non-oak state).
INDEXER_QUEUE
.send(IndexerQueueTask::Diagnostics(RefreshDiagnosticsTask {
uri: uri.clone(),
state: state.clone(),
state: state.legacy_snapshot(),
}))
.unwrap_or_else(|err| lsp::log_error!("Failed to queue diagnostics refresh: {err}"));
}
Expand Down
80 changes: 80 additions & 0 deletions crates/ark/src/lsp/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,28 @@ impl WorldState {
Err(anyhow!("Can't find document for URI {uri}"))
}
}

/// Copy the world state for a background handler that does not query oak.
///
/// The copy gets a fresh, empty `OakDatabase` instead of a handle to the
/// live one. A salsa db handle held off the main loop blocks the next
/// `set_*` on the owner: the setter waits for `clones == 1`, and an idle
/// handle (parked in the indexer queue, or held by a handler blocked in
/// `r_task`) never drops on its own.
///
/// This is the snapshot for the non-salsa handlers (diagnostics,
/// indexing) that read only the plain `WorldState` fields. A salsa-based
/// handler that queries oak off the main loop needs a different snapshot,
/// one that keeps the live db handle and runs its queries under
/// cancellation (catch `Cancelled`, don't span `r_task`), so it sees real
/// oak data. That's what the `legacy_` prefix warns: don't reach for this
/// from oak-querying code.
pub(crate) fn legacy_snapshot(&self) -> WorldState {
WorldState {
db: OakDatabase::new(),
..self.clone()
}
}
}

pub(crate) fn with_document<T, F>(
Expand Down Expand Up @@ -132,3 +154,61 @@ pub(crate) fn workspace_uris(state: &WorldState) -> Vec<Url> {
let uris: Vec<Url> = state.documents.iter().map(|elt| elt.0.clone()).collect();
uris
}

#[cfg(test)]
mod tests {
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Barrier;
use std::time::Duration;

use oak_db::OakDatabase;
use oak_scan::DbScan;
use oak_semantic::library::Library;

use super::WorldState;

/// A legacy background snapshot must not pin the oak db against a
/// main-loop mutation.
///
/// salsa reclaims `&mut` access for a setter by raising the cancellation
/// flag and then blocking on `clones == 1`. That flag only frees a clone
/// whose thread is inside a running query and notices it. A snapshot that
/// sits idle (parked in the indexer queue, or held by a `spawn_blocking`
/// handler blocked in `r_task`) never notices, so the next setter on the
/// owner blocks until the snapshot drops. This test parks a snapshot with
/// no query running and asserts a setter on the owner still completes.
#[test]
fn legacy_snapshot_does_not_pin_oak_against_mutation() {
let mut state = WorldState::new(OakDatabase::new(), Library::new(vec![]));

let snapshot = state.legacy_snapshot();

// Park the snapshot with no salsa query running, then hold it until
// the main thread has finished timing the mutation.
let release = Arc::new(Barrier::new(2));
let held = {
let release = Arc::clone(&release);
std::thread::spawn(move || {
let _snapshot = snapshot;
release.wait();
})
};

let (tx, rx) = mpsc::channel();
let mutator = std::thread::spawn(move || {
state.db.set_library_paths(&[]);
let _ = tx.send(());
});

let completed = rx.recv_timeout(Duration::from_secs(2)).is_ok();

// Release the parked snapshot so a blocked mutator can finish and both
// threads join, regardless of the outcome.
release.wait();
held.join().unwrap();
mutator.join().unwrap();

assert!(completed);
}
}
Loading
Loading