✨ engine.run/submit of WorkGraphs#7261
Draft
GeigerJ2 wants to merge 16 commits intoaiidateam:mainfrom
Draft
Conversation
cc8ac8e to
ee07a92
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #7261 +/- ##
==========================================
+ Coverage 79.85% 79.86% +0.01%
==========================================
Files 566 567 +1
Lines 43962 44005 +43
==========================================
+ Hits 35102 35138 +36
- Misses 8860 8867 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
e1f020f to
e7502f1
Compare
b155e34 to
b0cee9b
Compare
Replace `engine_run_workgraph`/`engine_submit_workgraph` helper functions that duplicated the launch logic with calls to `WorkGraph.prepare_for_launch()`, which converts a WorkGraph into a `(ProcessClass, inputs)` pair. The standard `runner.run`/`submit` path then handles the rest, with a post-launch `update_after_launch(node)` call for bookkeeping.
a3cce2a to
7ee18aa
Compare
Remove `_resolve_workgraph_for_run` and `prepare_workgraph_inputs` helper functions from aiida-core. All WorkGraph-specific business logic (input merging, metadata separation, collision checks) now lives in `WorkGraph.prepare_for_launch(inputs, **kwargs)`. The launch functions (`run`, `run_get_node`, `run_get_pk`, `submit`) now call `workgraph.prepare_for_launch(inputs=inputs, **kwargs)` directly, keeping the aiida-core adapter layer minimal: `common/workgraph.py` contains only `is_workgraph_instance`. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use positive guard clauses (`if WORKGRAPH_INSTALLED and ...`) instead of if/elif/else chains. Separate the availability question (`WORKGRAPH_INSTALLED`) from the type question (`is_workgraph_instance`) to keep their semantics honest — `is_workgraph_instance` now raises `ImportError` if called when workgraph is not installed, rather than silently returning `False`. Rename `WORKGRAPH_AVAILABLE` to `WORKGRAPH_INSTALLED` and add a docstring explaining why the helper function exists (to confine the optional-dependency import boilerplate to `common/workgraph.py`).
When `wait=True`, `update_after_launch` was called before the wait loop, so `workgraph.update()` ran on a still-running process and outputs were `None`. Now it is called after the process terminates, ensuring outputs are populated. For `wait=False`, it is still called immediately so the process node reference is set.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Requested quite a few people for review, as I think this is quite important. Not really needed that everybody goes through everything. But, in any case, good to get a few opinions on this. If you don't have the capacity to review, please feel free to remove the request.
I went through a few iterations before settling on the current approach. Documenting them here for context.
What we landed on
common/workgraph.pycentralises the optional-dependency handling. It exports aWORKGRAPH_INSTALLEDsentinel and anis_workgraph_instance()helper, solaunch.pyitself never needsfrom aiida_workgraphimports and the accompanyingtry/exceptboilerplate.launch.pyuses a positive guard clause (if WORKGRAPH_INSTALLED and is_workgraph_instance(process)) that branches into the WorkGraph path and returns early. The standard launch path is the fallthrough. Handling the WorkGraph path first will also be consistent with WorkGraph becoming the recommended way to write workflows in the future.workgraph: t.Any = processis needed to keep mypy happy since WorkGraph can't appear in the type union as an optional dependency.WorkGraph.prepare_for_launch()andWorkGraph.update_after_launch()on the aiida-workgraph side (PR Drop WG.submit/run for core engine.submit/run aiida-workgraph#768). aiida-core just calls these methods without knowing the internals.engine.submit()gainstimeout— a new optional parameter that raisesTimeoutErrorif the process doesn't terminate within the given seconds (only effectivewhen
wait=True). This ports functionality that previously lived inWorkGraph.submit()into the general-purpose launcher, where it benefits all process types (could be moved to separate PR).Alternatives I considered and rejected
1. Direct imports in
launch.pywithtry/exceptPut
from aiida_workgraph import WorkGraphdirectly inlaunch.pybehindtry/except ImportError.Rejected: pollutes
launch.pywith optional-dependency import boilerplate, and every new call site would need its own guard.2. Helper functions in
launch.py(_resolve_workgraph_for_run,prepare_workgraph_inputs)Kept the WorkGraph-specific business logic (input merging, metadata separation, collision checks) in aiida-core helper functions.
Rejected: violates Information Expert — WorkGraph should own its own preparation logic. Replaced by
WorkGraph.prepare_for_launch()on the workgraph side (now PR aiidateam/aiida-workgraph#768 onaiida-workgraph).3. Single
is_workgraph_instancethat catchesImportErrorinternallyCombined the availability check and the
isinstancecheck into one function that returnsFalsewhen workgraph isn't installed.Rejected: semantically dishonest — answering "is this a WorkGraph?" with "no, because workgraph isn't importable" conflates two different questions. Split into
WORKGRAPH_INSTALLEDsentinel +is_workgraph_instance()that assumes availability.4.
as_workgraph()with walrus operatorA function returning
Optional[WorkGraph]used asif wg := as_workgraph(process):.Rejected: still conflates the two questions (availability vs type), and the naming felt off.
5. Inverted guard clause:
if not (WORKGRAPH_INSTALLED and is_workgraph_instance(process)): return runner.run(...)Early-return for the non-workgraph path, leaving the workgraph path as the fallthrough.
Rejected: reads backwards — the negated condition just duplicates the fallthrough path and makes the control flow harder to follow.
Tests
The WorkGraph tests in
test_launch.pyare mock-based and serve as regression guards for the dispatch logic. They verify that when a WorkGraph is passed torun/run_get_node/run_get_pk, the launch functions correctly enter the WorkGraph code path (rather than the standard one), that the WorkGraph is asked to convert itself into a process class and inputs viaprepare_for_launch, that the resulting process is actually launched through the runner, and that the process node is handed back to the WorkGraph viaupdate_after_launch. They also check that both input styles (positionalinputsdict and**kwargs) are forwarded correctly toprepare_for_launch.Since
aiida-workgraphis not installed in aiida-core's CI, these tests mock the detection logic and runner. They can't catch integration issues (e.g., whetherprepare_for_launchproduces inputs the engine actually accepts). The real end-to-end coverage comes from the companion PR on aiida-workgraph (aiidateam/aiida-workgraph#768), where the full test suite passes against this branch.Companion PR on aiida-workgraph
The workgraph code changes live in aiidateam/aiida-workgraph#768, with the bulk of that PR being mechanical: replacing
wg.run()/wg.submit()withengine.run(wg)/engine.submit(wg)across docs and tests. The files worth reviewing:src/aiida_workgraph/workgraph.py— the only real source change:prepare_for_launch(inputs, **kwargs): validates inputs, separatesmetadatafrom task inputs, checks for naming collisions, and returns(WorkGraphEngine, engine_inputs).update_after_launch(node): stores the process node on the WorkGraph and callsself.update()so the in-memory state reflects the launched process.WorkGraph.run()andWorkGraph.submit()methods.timeout(parameter of the oldwg.submit()) is now handled natively byengine.submit(wg, timeout=...).interval(parameter of the oldwg.submit()) raises aValueErrorpointing toengine.submit(wg, wait=True, wait_interval=...).pyproject.toml/uv.lock— temporarily pointsaiida-coreto the feature branch on my fork. Will revert to a release pin before merging..github/workflows/ci.yaml— temporarily comments out the aiida-core version override. Same reason.Full test suite passes. Only RTD fails because it can't resolve the git dependency.