-
Notifications
You must be signed in to change notification settings - Fork 0
libexpr: round 5 — thread-level parallelism infrastructure for eval #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devin/1774562922-eval-perf-round4
Are you sure you want to change the base?
Changes from 3 commits
7776b2a
0ea9d59
e63713d
cf74763
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2253,50 +2253,316 @@ void EvalState::tryFixupBlackHolePos(Value & v, PosIdx pos) | |
| } | ||
| } | ||
|
|
||
| /* ── Outlined slow path for forceValue ───────────────────────────── | ||
| * | ||
| * Called in two situations: | ||
| * 1. Single-threaded blackhole: thunk env is nullptr → infinite | ||
| * recursion. We throw via the original code path so | ||
| * handleEvalExceptionForThunk → tryFixupBlackHolePos can attach | ||
| * caller position info. | ||
| * 2. Parallel eval active (parallelEvalActive > 0): uses a striped | ||
| * spinlock to atomically claim thunks; tl_currentlyForcing for | ||
| * blackhole detection; spin-wait when another thread owns a thunk. | ||
| * | ||
| * The caller (forceValue, always_inline) handles the common single- | ||
| * threaded non-blackhole case entirely inline — zero overhead. | ||
| */ | ||
| [[gnu::noinline]] | ||
| void EvalState::forceValueSlowPath(Value & v, const PosIdx pos) | ||
| { | ||
| const bool parallel = parallelEvalActive.load(std::memory_order_acquire) > 0; | ||
|
|
||
| /* ── Handle thunks ─────────────────────────────────────────── */ | ||
| if (v.isThunk()) { | ||
| if (!parallel) { | ||
| /* Single-threaded path — either a blackhole (infinite | ||
| recursion) or a race where parallelEvalActive dropped | ||
| to 0 between the caller's relaxed check and our acquire | ||
| re-read. Handle both by just doing the inline force. */ | ||
| Env * env = v.thunk().env; | ||
| Expr * expr = v.thunk().expr; | ||
| try { | ||
| v.mkBlackhole(); | ||
| if (env) [[likely]] | ||
| expr->eval(*this, *env, v); | ||
| else | ||
| ExprBlackHole::throwInfiniteRecursionError(*this, v); | ||
| } catch (...) { | ||
| handleEvalExceptionForThunk(env, expr, v, pos); | ||
| throw; | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| /* ── Multi-threaded path (with locking) ──────────────────── */ | ||
| auto & lock = thunkLockFor(&v); | ||
| lock.lock(); | ||
|
|
||
| /* Double-check under lock: another thread may have finished | ||
| forcing between our initial check and acquiring the lock. */ | ||
| if (!v.isThunk()) { | ||
| lock.unlock(); | ||
| if (v.isApp()) | ||
| goto handle_app; | ||
| if (v.isFailed()) | ||
| handleEvalFailed(v, pos); | ||
| return; | ||
| } | ||
|
|
||
| Env * env = v.thunk().env; | ||
| Expr * expr = v.thunk().expr; | ||
|
|
||
| if (!env) { | ||
| /* Blackhole: either infinite recursion (this thread) or | ||
| another thread is currently forcing this thunk. */ | ||
| lock.unlock(); | ||
| if (isBeingForcedByCurrentThread(&v)) { | ||
| /* Same thread — genuine infinite recursion. | ||
| Use throwInfiniteRecursionError (not expr->eval | ||
| which would dereference the null env). */ | ||
| try { | ||
| ExprBlackHole::throwInfiniteRecursionError(*this, v); | ||
| } catch (...) { | ||
| handleEvalExceptionForThunk(env, expr, v, pos); | ||
| throw; | ||
| } | ||
| /* unreachable — throwInfiniteRecursionError always throws */ | ||
| } | ||
| /* Another thread owns it — spin-wait until done. */ | ||
| while (v.isThunk()) { | ||
| #if defined(__x86_64__) | ||
| __builtin_ia32_pause(); | ||
| #else | ||
| std::this_thread::yield(); | ||
| #endif | ||
| } | ||
| if (v.isFailed()) | ||
| handleEvalFailed(v, pos); | ||
| return; | ||
| } | ||
|
|
||
| /* We own this thunk. Mark as blackhole and release lock so | ||
| other threads can claim different thunks in the same stripe. */ | ||
| v.mkBlackhole(); | ||
| lock.unlock(); | ||
|
|
||
| tl_currentlyForcing.push_back(&v); | ||
| try { | ||
| expr->eval(*this, *env, v); | ||
| } catch (...) { | ||
| tl_currentlyForcing.pop_back(); | ||
| handleEvalExceptionForThunk(env, expr, v, pos); | ||
| throw; | ||
| } | ||
| tl_currentlyForcing.pop_back(); | ||
| return; | ||
| } | ||
|
|
||
| handle_app: | ||
| /* ── Handle apps (multi-threaded path) ─────────────────────── */ | ||
| if (v.isApp()) { | ||
| auto & lock = thunkLockFor(&v); | ||
| lock.lock(); | ||
|
|
||
| if (!v.isApp()) { | ||
| lock.unlock(); | ||
| if (v.isThunk()) { | ||
| /* Another thread is forcing this value — spin-wait. */ | ||
| while (v.isThunk()) { | ||
| #if defined(__x86_64__) | ||
| __builtin_ia32_pause(); | ||
| #else | ||
| std::this_thread::yield(); | ||
| #endif | ||
| } | ||
| if (v.isFailed()) | ||
| handleEvalFailed(v, pos); | ||
| return; | ||
| } | ||
| if (v.isFailed()) | ||
| handleEvalFailed(v, pos); | ||
| return; | ||
| } | ||
|
|
||
| Value * left = v.app().left; | ||
| Value * right = v.app().right; | ||
| Value savedApp = v; | ||
|
|
||
| /* Mark as blackhole while we evaluate, so other threads wait. */ | ||
| v.mkBlackhole(); | ||
| lock.unlock(); | ||
|
|
||
| tl_currentlyForcing.push_back(&v); | ||
| try { | ||
| callFunction(*left, *right, v, pos); | ||
| } catch (...) { | ||
| tl_currentlyForcing.pop_back(); | ||
| handleEvalExceptionForApp(v, savedApp); | ||
| throw; | ||
| } | ||
| tl_currentlyForcing.pop_back(); | ||
| return; | ||
| } | ||
|
|
||
| handle_thunk_recheck: | ||
| /* ── Handle failed values ──────────────────────────────────── */ | ||
| if (v.isFailed()) { | ||
| handleEvalFailed(v, pos); | ||
| return; | ||
| } | ||
|
|
||
| /* If we reach here, the value was already forced by another | ||
| thread between our initial check and this point. */ | ||
| } | ||
|
|
||
| /* ── Lazy thread pool accessor ─────────────────────────────────── */ | ||
| EvalThreadPool & EvalState::getThreadPool() | ||
| { | ||
| if (!evalThreadPool) | ||
| evalThreadPool = std::make_unique<EvalThreadPool>(); | ||
| return *evalThreadPool; | ||
| } | ||
|
|
||
| /* ── Parallel forceValueDeep ───────────────────────────────────── */ | ||
| void EvalState::forceValueDeep(Value & v) | ||
| { | ||
| /* Use a mutex-protected seen set. The contention is low because | ||
| each thread works on a disjoint subtree most of the time, and | ||
| the lock is only held for the O(1) insert/lookup. */ | ||
| std::mutex seenMu; | ||
| std::set<const Value *> seen; | ||
|
|
||
| [&, &state(*this)](this const auto & recurse, Value & v) { | ||
| auto _level = state.addCallDepth(v.determinePos(noPos)); | ||
| auto tryInsertSeen = [&](const Value * p) -> bool { | ||
| std::lock_guard<std::mutex> lk(seenMu); | ||
| return seen.insert(p).second; | ||
| }; | ||
|
|
||
| if (!seen.insert(&v).second) | ||
| /* The recursive lambda for deep-forcing a subtree. This is the | ||
| function that both the main thread and workers execute. */ | ||
| std::function<void(Value &)> recurse = [&](Value & v) { | ||
| auto _level = this->addCallDepth(v.determinePos(noPos)); | ||
|
|
||
| if (!tryInsertSeen(&v)) | ||
| return; | ||
|
|
||
| state.forceValue(v, v.determinePos(noPos)); | ||
| this->forceValue(v, v.determinePos(noPos)); | ||
|
|
||
| if (v.type() == nAttrs) { | ||
| for (auto & i : *v.attrs()) | ||
| try { | ||
| // If the value is a thunk, we're evaling. Otherwise no trace necessary. | ||
| auto dts = state.debugRepl && i.value->isThunk() ? makeDebugTraceStacker( | ||
| state, | ||
| *i.value->thunk().expr, | ||
| *i.value->thunk().env, | ||
| i.pos, | ||
| "while evaluating the attribute '%1%'", | ||
| state.symbols[i.name]) | ||
| : nullptr; | ||
|
|
||
| recurse(*i.value); | ||
| } catch (Error & e) { | ||
| state.addErrorTrace(e, i.pos, "while evaluating the attribute '%1%'", state.symbols[i.name]); | ||
| throw; | ||
| auto & attrs = *v.attrs(); | ||
| size_t size = attrs.size(); | ||
|
|
||
| if (size >= kMinParallelAttrs && !debugRepl && tl_parallelForceDepth == 0) { | ||
| /* ── Parallel path: distribute attrs across threads ── | ||
| * Only parallelize at the top level (tl_parallelForceDepth==0) | ||
| * to avoid deadlock from recursive task submission into the | ||
| * fixed-size thread pool. Workers force subtrees inline. */ | ||
| ParallelEvalGuard pGuard; | ||
| ParallelForceDepthGuard depthGuard; | ||
| auto & pool = getThreadPool(); | ||
|
|
||
| std::vector<std::future<void>> futures; | ||
| futures.reserve(size); | ||
|
|
||
| /* First exception from any worker — we rethrow it after | ||
| joining all futures. */ | ||
| std::mutex errMu; | ||
| std::exception_ptr firstErr; | ||
|
|
||
| for (auto & i : attrs) { | ||
| futures.push_back(pool.submit([&, val = i.value]() { | ||
| /* Each worker must mark itself as nested so that | ||
| recurse() on this thread won't re-enter the | ||
| parallel path (tl_parallelForceDepth is TLS, | ||
| so workers start at 0 without this). */ | ||
| ParallelForceDepthGuard workerDepthGuard; | ||
| try { | ||
| recurse(*val); | ||
| } catch (...) { | ||
| std::lock_guard<std::mutex> lk(errMu); | ||
| if (!firstErr) | ||
| firstErr = std::current_exception(); | ||
| } | ||
| })); | ||
|
devin-ai-integration[bot] marked this conversation as resolved.
|
||
| } | ||
|
|
||
| for (auto & f : futures) | ||
| f.wait(); | ||
|
devin-ai-integration[bot] marked this conversation as resolved.
|
||
|
|
||
| if (firstErr) | ||
| std::rethrow_exception(firstErr); | ||
|
Comment on lines
+2472
to
+2492
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Parallel The sequential path in Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Acknowledged — this is already documented in the PR description under " |
||
| } else { | ||
| /* ── Sequential path (small attrset, debug mode, or nested) ── */ | ||
| for (auto & i : attrs) | ||
| try { | ||
| auto dts = this->debugRepl && i.value->isThunk() | ||
| ? makeDebugTraceStacker( | ||
| *this, | ||
| *i.value->thunk().expr, | ||
| *i.value->thunk().env, | ||
| i.pos, | ||
| "while evaluating the attribute '%1%'", | ||
| this->symbols[i.name]) | ||
| : nullptr; | ||
|
|
||
| recurse(*i.value); | ||
| } catch (Error & e) { | ||
| this->addErrorTrace( | ||
| e, i.pos, "while evaluating the attribute '%1%'", this->symbols[i.name]); | ||
| throw; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| else if (v.isList()) { | ||
| size_t index = 0; | ||
| for (auto v2 : v.listView()) | ||
| try { | ||
| recurse(*v2); | ||
| index++; | ||
| } catch (Error & e) { | ||
| state.addErrorTrace(e, "while evaluating list element at index %1%", index); | ||
| throw; | ||
| auto view = v.listView(); | ||
| size_t size = view.size(); | ||
|
|
||
| if (size >= kMinParallelListElems && !debugRepl && tl_parallelForceDepth == 0) { | ||
| /* ── Parallel path for large lists (top-level only) ─── */ | ||
| ParallelEvalGuard pGuard; | ||
| ParallelForceDepthGuard depthGuard; | ||
| auto & pool = getThreadPool(); | ||
|
|
||
| std::vector<std::future<void>> futures; | ||
| futures.reserve(size); | ||
|
|
||
| std::mutex errMu; | ||
| std::exception_ptr firstErr; | ||
|
|
||
| for (auto v2 : view) { | ||
| futures.push_back(pool.submit([&, v2]() { | ||
| ParallelForceDepthGuard workerDepthGuard; | ||
| try { | ||
| recurse(*v2); | ||
| } catch (...) { | ||
| std::lock_guard<std::mutex> lk(errMu); | ||
| if (!firstErr) | ||
| firstErr = std::current_exception(); | ||
| } | ||
| })); | ||
| } | ||
|
|
||
| for (auto & f : futures) | ||
| f.wait(); | ||
|
|
||
| if (firstErr) | ||
| std::rethrow_exception(firstErr); | ||
| } else { | ||
| /* ── Sequential path for small lists or nested ──────── */ | ||
| size_t index = 0; | ||
| for (auto v2 : view) | ||
| try { | ||
| recurse(*v2); | ||
| index++; | ||
| } catch (Error & e) { | ||
| this->addErrorTrace(e, "while evaluating list element at index %1%", index); | ||
| throw; | ||
| } | ||
| } | ||
| } | ||
| }(v); | ||
| }; | ||
|
|
||
| recurse(v); | ||
| } | ||
|
|
||
| NixInt EvalState::forceInt(Value & v, const PosIdx pos, std::string_view errorCtx) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Cross-thread circular thunk dependency causes infinite spin-wait (deadlock) instead of infinite recursion error
When
forceValueDeepdistributes thunks from a large attrset (≥64 attrs) or list (≥128 elems) across worker threads, a cross-thread circular dependency causes both workers to spin forever.isBeingForcedByCurrentThreadatsrc/libexpr/include/nix/expr/parallel-eval.hh:101only checks the current thread'stl_currentlyForcingstack, so it cannot detect that thread A is waiting on a thunk owned by thread B while thread B is waiting on a thunk owned by thread A. Both threads enter the unboundedwhile (v.isThunk())spin-wait and never terminate.In single-threaded mode, the same circular dependency is correctly caught: forcing thunk X → evaluates → needs Y → forcing Y → evaluates → needs X → X is a blackhole on the same thread →
isBeingForcedByCurrentThreadreturns true → clean "infinite recursion encountered" error. The parallel path converts this recoverable error into a process hang with no output.Deadlock scenario trace
Prompt for agents
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valid finding — cross-thread circular dependencies would indeed cause unbounded spin. In practice this can't happen today because
forceValueDeeponly parallelizes at the top level (workers stay sequential viaParallelForceDepthGuard), so two workers never force thunks that depend on each other — they each recursively force their own subtree sequentially.However, if the parallelism were ever deepened (e.g. removing the depth-1 restriction), this would become a real deadlock risk. A timeout + cycle detection in the spin-wait would be the right fix at that point. Adding to the reviewer checklist in the PR description.