Add ZeroMQ-based message broker plugin#7284
Add ZeroMQ-based message broker plugin#7284agoscinski wants to merge 59 commits intoaiidateam:mainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #7284 +/- ##
==========================================
+ Coverage 79.86% 79.88% +0.02%
==========================================
Files 566 575 +9
Lines 43925 45100 +1175
==========================================
+ Hits 35077 36022 +945
- Misses 8848 9078 +230 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
src/aiida/engine/daemon/worker.py
Outdated
| # the broker server before disconnecting, and database connections are | ||
| # released. Without this, the server keeps routing tasks/RPCs to our | ||
| # (now-dead) identity. | ||
| #get_manager().reset_profile() |
There was a problem hiding this comment.
should be resolved after merging the leakage PRs
|
Notes for me: |
src/aiida/brokers/zmq/server.py
Outdated
|
|
||
| # Server state | ||
| self._running = False | ||
| self._lock = threading.Lock() |
There was a problem hiding this comment.
why things are not thread safe here? how does rmq handel this?
There was a problem hiding this comment.
Yes true that is basically simulating an asyncio event loop so we can switch to this one
I merged client.py into broker.py for zmq so ZmqBroker now contains the management functionalities. I think doing this for rmq might be a bit more messy because we create multiple connections but maybe also possible. Maybe a distinction between |
| # Timeout (in seconds) for waiting on RPC Future results in the poll thread. | ||
| # None means no timeout, matching kiwipy RMQ behavior where _on_rpc awaits | ||
| # without a deadline. The runner event loop will eventually produce a result. | ||
| RPC_TIMEOUT: float | None = None |
There was a problem hiding this comment.
double check that the rmq.task_timeout will be used here when starting the server
| :return: The reconstructed UUID object | ||
| """ | ||
| mapping = loader.construct_mapping(node) | ||
| return uuid.UUID(int=mapping['int']) |
There was a problem hiding this comment.
So in RMQ we just secretly convert this to int somewhere in the code. I wanted to make serialization more explicit. Its a bit overkill for one conversion. Maybe there is a better solution.
Implement a ZMQ broker as an alternative to RabbitMQ following
AiiDA's daemon pattern and RabbitMQ naming conventions:
Components:
- ZmqBrokerServer (server.py): Core message broker server with polling,
routing, task queue, and persistence. Standalone, no AiiDA dependencies.
- PersistentQueue: Folder-based task storage with crash recovery
- ZmqBrokerService (service.py): Thin process wrapper handling PID file
management, SIGINT signal handling, and status file updates.
- ZmqBrokerController (controller.py): Client-side class to start/stop
and query broker service status via PID/status files.
- ZmqBroker (broker.py): AiiDA Broker interface (like RabbitmqBroker),
uses controller and communicator.
- ZmqCommunicator (communicator.py): kiwipy Communicator interface.
The broker service is completely independent of AiiDA and can be run
as a standalone process. Tasks are persisted to disk immediately for
durability.
Key changes:
- Use SIGINT for cross-platform shutdown (SIGTERM unavailable on Windows)
- Controller reads PID/status files, no direct IPC with service
- Broker has separate storage_path and sockets_path properties
- Use AIIDA_LOGGER for broker logging consistency
- Add psutil-based process validation with OS fallback
- Periodic status file updates during service operation
Storage structure:
{base_path}/
├── storage/tasks/ # Pending/processing task queue
├── sockets/ # IPC socket files (router.sock, pub.sock)
├── logs/ # Broker log files
├── broker.pid # PID file
└── broker.status # Status JSON file
We use a temporary directory for socket files to avoid the ~107 byte
path length limit on Unix domain sockets. The temp directory path is
stored in a `broker.sockets` file within the AiiDA config directory.
- Switch from JSON to YAML encoding to match kiwipy's serialization format - Add shared AiidaYamlLoader in brokers/utils.py that handles UUID deserialization - The custom loader extends FullLoader to safely decode !!python/object:uuid.UUID tags - This fixes compatibility with plumpy processes that use UUID senders in broadcasts
- Add `core.zmq` entry point for ZMQ broker plugin - Add `verdi broker` command group (start/stop/restart/status) - Add backwards compatibility mapping for 'zmq' -> 'core.zmq' - Export ZMQ defaults from brokers.zmq module
- Update `verdi profile setup` with `--broker` option (rabbitmq/zmq/none) - Update `verdi presto` to fall back to ZMQ when RabbitMQ not detected - Add deprecated --use-rabbitmq/--no-use-rabbitmq for backward compatibility The ZMQ broker provides a zero-dependency alternative to RabbitMQ, making it easier to get started with AiiDA without external services.
- Add TestBrokerBackend enum (rmq, zmq, none) - Add --broker-backend CLI option to pytest - Add requires_broker marker for tests needing any broker - Update aiida_profile fixture to start/stop ZMQ broker service - Auto-skip requires_rmq tests when using ZMQ backend - Auto-skip requires_broker tests when using no broker Usage: pytest --broker-backend rmq # Use RabbitMQ (default) pytest --broker-backend zmq # Use ZeroMQ pytest --broker-backend none # No broker
Tests that require a message broker but work with either RabbitMQ or ZMQ now use the `requires_broker` marker. Only tests in test_rabbitmq.py and test_rmq.py retain `requires_rmq` for RabbitMQ-specific functionality. This allows running broker-dependent tests with either backend: - `--broker-backend rmq` runs all broker tests (default) - `--broker-backend zmq` runs requires_broker tests, skips requires_rmq - `--broker-backend none` skips all broker tests
Rename test_presto_with_rmq to test_presto and accept either RabbitMQ or ZMQ as valid broker backends. This reflects the actual behavior of verdi presto which auto-detects RabbitMQ and falls back to ZMQ. The test_presto_without_rmq test already covers the specific ZMQ fallback scenario by patching RabbitMQ detection.
Comprehensive test suite for ZMQ broker components: - TestZmqDefaults: Test get_zmq_config function - TestZmqBrokerController: Test controller initialization, start/stop/restart - TestZmqBrokerServer: Test server start/stop and status - TestZmqBrokerService: Test service lifecycle and files - TestPersistentQueue: Test enqueue/dequeue/complete/fail operations - TestZmqCommunicator: Test communicator connection and context manager - TestZmqBrokerIntegration: Integration tests with broker lifecycle Mirror tests from tests/engine/test_rmq.py but with requires_broker marker so they run with the ZMQ broker backend. Tests cover: - Simple process submission - Process launch with inputs - Bad input handling - Exception process handling - Process pause/play/kill control
When RabbitMQ is not available, verdi presto now falls back to ZMQ broker instead of having no broker configured.
- ci-code workflow for jobs tests-minimum-requirements and tests-presto - release workflow for job tests This removes the RabbitMQ dependency and replacing the broker backend with zmq.
Processes register as RPC subscribers with string identifiers (e.g. "70") via `str(self.pid)`, but the controller sends RPC messages with integer PKs. The server lookup `_rpc_subscribers.get(70)` failed to find key "70". Convert recipient to string before lookup for consistent matching.
Code consuming `iterate_tasks` (e.g. `get_process_tasks`, `process_repair`) expects RMQ-style task objects with `.body` attribute and `.processing()` context manager. The ZMQ broker was yielding plain dicts, causing `AttributeError: 'dict' object has no attribute 'body'`. Add `ZmqIncomingTask` wrapper class and `PersistentQueue.remove_pending()` to provide a compatible interface.
Since ZMQ broker needs no external services, tests marked requires_broker can run in the presto suite. Only requires_rmq tests (needing RabbitMQ specifically) are now excluded from presto. - Update presto marker logic to only exclude requires_rmq, not requires_broker - Configure presto profile to use core.zmq broker instead of no broker - Update marker tests to reflect the new behavior
Plumpy process subscribers (via `_schedule_rpc` / `convert_to_comm`) return a `kiwipy.Future` (`concurrent.futures.Future`) from RPC handlers. This Future contains an `_thread.RLock` which cannot be YAML-serialized, causing "cannot pickle '_thread.RLock' object" errors when the ZMQ communicator tries to encode the RPC response. The fix calls `Future.result()` to block until the result is available, then serializes the resolved value instead of the Future object. Why blocking `.result()` instead of an async approach like RMQ: The kiwipy `RmqCommunicator._on_rpc` is async and resolves futures with `await` before sending AMQP responses. This works because `RmqThreadCommunicator` runs its own asyncio event loop on a dedicated thread (via pytray `LoopScheduler`), so awaiting in `_on_rpc` does not block the caller — it only yields within the communicator's own loop. The ZMQ communicator uses a synchronous poll thread instead of an async event loop, so `await` is not available. An async outbox + done-callback pattern was considered, but ultimately `.result()` is sufficient because: 1. The poll thread calling `.result()` is a different thread from the runner's event loop where the RPC actually executes (via `asyncio.run_coroutine_threadsafe`). So blocking the poll thread does not block the runner — processes continue to execute. 2. Even in RMQ, all RPC operations serialize through the runner's single event loop. `RmqThreadCommunicator.add_rpc_subscriber` itself blocks the calling thread (the runner) via `_loop_scheduler.await_()` which calls `.result()`. The effective throughput is bounded by the runner event loop regardless of whether the communicator waits synchronously or asynchronously. 3. The poll thread cannot process other broker messages while waiting, but this is acceptable: incoming RPCs target a specific process, and that process can only handle one RPC at a time on the runner loop anyway. Concurrent RPCs to different processes would benefit from async handling, but the runner serializes their execution regardless. Note: this analysis suggests `RmqThreadCommunicator` is overengineered. It creates a dedicated thread with its own asyncio event loop, wraps every call through `LoopScheduler.await_()` (which just calls `.result()` on the calling side), and uses `await` in `_on_rpc` to resolve futures — but since all actual work runs on the runner's single event loop, the async machinery in the communicator does not provide real concurrency. The ZMQ communicator achieves the same effective behavior with a simpler synchronous poll thread and a direct `.result()` call, skipping the extra event loop entirely.
plum_to_kiwi_future can chain Futures: if the plumpy (asyncio) Future resolves to another asyncio.Future, it wraps it in a new kiwipy.Future and sets that as the result. A single .result() call then returns another Future instead of a concrete value. This mirrors kiwipy's _send_future_response which loops: while asyncio.isfuture(future): future = await future Change `if isinstance` to `while isinstance` to unwind the full chain.
The new aiida_profile_factory in tools/pytest_fixtures creates profiles via create_profile() which does not set warnings.development_version to False. The old factory in manage/tests/pytest_fixtures did. This causes check_version() to print development version warnings to stdout when loading the profile in subprocess-based CLI tests (e.g. test_autogroup with use_subprocess=True), making pk = int(output) fail with ValueError since warnings contaminate the output.
Restore `while isinstance(result, Future)` to resolve all nested Futures before serializing the RPC response. RMQ uses a multi-message pending/result protocol that creates two Future layers on the client (rpc_send Future → inner Future → value). ZMQ sends a single response with the fully resolved value. This is correct because callers in control.py already use unwrap_kiwi_future() which handles both nested and flat responses. Update test_zmq.py to match ZMQ's single-response model: await the rpc_send Future once to get the result directly, instead of the two-layer pattern from test_rmq.py.
Aligns naming with RabbitmqManagementClient and avoids confusion with plumpy's RemoteProcessThreadController which is used for actual process control (pause/play/kill). - Rename file: controller.py -> client.py (matches rabbitmq/client.py) - Rename class: ZmqBrokerController -> ZmqBrokerManagementClient - Rename property: controller -> management_client - Update all imports and references
Add start/stop/is_running to the Broker base class (no-ops for RabbitMQ) and override in ZmqBroker to delegate to the management client. DaemonClient.start_daemon now starts the broker before launching workers, and stop_daemon stops it after workers shut down. Also fixes verdi broker CLI commands that referenced a non-existent broker.controller attribute.
The broker lifecycle is managed separately via `verdi broker start/stop` or by test fixtures. Having it in the daemon CLI caused issues when tests restart the daemon between test cases.
The broker lifecycle is managed by test fixtures and will be integrated into the daemon startup. The separate `verdi broker` commands are no longer needed.
Ensures the ZMQ broker is running before launching daemon workers. The broker is only started (not stopped) here — stopping is left to the profile fixture or explicit user action, since daemon restarts between tests should not kill the broker.
Display broker PID and task counts for managed brokers (ZMQ) in the `verdi status` output. Uses broker.is_running() instead of trying to open a communicator connection.
The marker was a bare expression (no-op) instead of a module-level `pytestmark` assignment, so the marker was never applied.
list.pop(0) is O(n). Since _pending is a FIFO queue, deque is the correct data structure.
Consecutive corrupted or missing files would cause recursive calls, risking stack overflow. Use a while-loop instead.
If the broker crashes without running _remove_sockets_directory(), the temp socket directory and sockets file would persist. Now _cleanup_stale_files removes them too.
Without psutil, os.kill(pid, 0) only checks if a PID exists, which could be a different process that reused the PID. On Linux, read /proc/<pid>/cmdline to verify it's actually the broker process.
- Add `verdi daemon broker` hidden command as circus watcher entry point - Add broker watcher to circus arbiter config for `core.zmq` profiles - Remove lifecycle methods (start/stop/is_running) from Broker base class and ZmqBroker — the broker plugin is now a pure client - Simplify ZmqBrokerManagementClient to primarily status queries; keep start/stop for test fixtures (production uses circus) - Test fixture uses management_client directly for broker lifecycle
psutil is a hard dependency of aiida-core, so the try/except import guard and os.kill fallback are unnecessary.
Remove logging setup, log_file parameter, and verbose helpers since circus handles log capture via stdout/stderr streams. Add SIGTERM handler (what circus sends) and remove dead KeyboardInterrupt catch. Inline small helper methods. 306 -> 131 lines.
Move status query methods (is_running, get_status, get_pid, endpoint discovery, cleanup) from the separate ZmqBrokerManagementClient class directly into ZmqBroker. Delete client.py. Add ZmqBroker.from_base_path classmethod for profile-free testing.
Rename get_pid -> get_service_pid, get_status -> get_service_status, _validate_pid -> _validate_service_pid, _cleanup_stale_files -> _cleanup_stale_service_files, and prefix internal file attributes with _service_ to avoid confusion with the broker client's own state.
Explain why a custom protocol is needed (ZMQ is transport, not a broker), document which socket pairs carry which traffic (ROUTER/DEALER for tasks and RPC, PUB/SUB for broadcasts), and map each message type to its AMQP equivalent.
…ng locks Replace the poll thread + threading.Lock approach with an asyncio event loop running on a background thread. All ZMQ socket I/O and shared state access now happen exclusively on that loop, eliminating the need for locks and fixing a thread-safety issue where the DEALER socket was accessed from both the main thread and the poll thread. Key changes: - Use zmq.asyncio sockets with async recv coroutines - Add _run_on_loop() helper for thread-safe main→loop scheduling - Replace periodic polling of in-progress tasks/RPCs with done callbacks - Remove _futures_lock, _stop_event, and zmq.Poller
The test fixture starts a standalone ZMQ broker, but the daemon's circus configuration unconditionally started a second broker that overwrote the sockets file — causing test communicators and daemon workers to connect to different brokers. Three fixes: - Skip adding broker circus watcher if a broker is already running - Add retry logic in get_communicator() for broker startup race condition - Fix _validate_service_pid to match circus-started broker cmdline
Simplify the socket architecture from two pairs (ROUTER/DEALER + PUB/SUB) to a single ROUTER/DEALER pair for all communication. Broadcasts are now sent by the server to all connected clients (derived from task and RPC subscriber registries) via the ROUTER socket, eliminating the need for a separate PUB socket, SUB socket, and polling coroutine on each client.
add/remove_broadcast_subscriber modified the dict directly from the calling thread while _handle_broadcast iterates it on the loop thread, which can raise RuntimeError on concurrent dict mutation. Route through _run_on_loop for consistency with task/RPC subscriber methods.
- Remove AiiDA import from server.py, default to json encoder/decoder; service.py passes YAML encoder/decoder explicitly - Remove unused threading lock from server - Remove unused dataclasses from protocol.py (all construction uses make_* factory functions) - Move handler dispatch dict to instance attribute (avoid per-message allocation) - Replace hasattr broker checks with isinstance(broker, ZmqBroker)
Write 'aiida-zmq-broker <pid>' to the PID file so is_running() can validate ownership without fragile command-line string matching. Also: fix sender type in make_broadcast_message (str, not uuid.UUID), drop underscore prefix from start/stop_zmq_broker test helpers.
verdi presto now uses ZMQ as the default broker (no external services required). Pass --use-rabbitmq to use RabbitMQ instead; if RabbitMQ is not reachable the command exits with an error (no fallback).
|
@agoscinski I'll report feedback here, let me know if you would prefer a different location (e.g. HackMD doc). I wanted to results = engine.run(builder)But ran into a Traceback---------------------------------------------------------------------------
ConnectionError Traceback (most recent call last)
Cell In[8], [line 1](vscode-notebook-cell:?execution_count=8&line=1)
----> [1](vscode-notebook-cell:?execution_count=8&line=1) results = engine.run(builder)
File ~/project/qe/git/aiida-core/src/aiida/engine/launch.py:46, in run(process, inputs, **kwargs)
44 runner = process.runner
45 else:
---> [46](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/qe/jupyter/dev/~/project/qe/git/aiida-core/src/aiida/engine/launch.py:46) runner = manager.get_manager().get_runner()
48 return runner.run(process, inputs, **kwargs)
File ~/project/qe/git/aiida-core/src/aiida/manage/manager.py:437, in Manager.get_runner(self, **kwargs)
431 """Return a runner that is based on the current profile settings and can be used globally by the code.
432
433 :return: the global runner
434
435 """
436 if self._runner is None:
--> [437](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/qe/jupyter/dev/~/project/qe/git/aiida-core/src/aiida/manage/manager.py:437) self._runner = self.create_runner(**kwargs)
439 return self._runner
File ~/project/qe/git/aiida-core/src/aiida/manage/manager.py:476, in Manager.create_runner(self, with_persistence, **kwargs)
473 if 'communicator' not in settings:
474 # Only call get_communicator if we have to as it will lazily create
475 try:
--> [476](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/qe/jupyter/dev/~/project/qe/git/aiida-core/src/aiida/manage/manager.py:476) settings['communicator'] = self.get_communicator()
477 except ConfigurationError:
478 # The currently loaded profile does not define a broker and so there is no communicator
479 pass
File ~/project/qe/git/aiida-core/src/aiida/manage/manager.py:394, in Manager.get_communicator(self)
389 assert self._profile is not None
390 raise ConfigurationError(
391 f'profile `{self._profile.name}` does not provide a communicator because it does not define a broker'
392 )
--> [394](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/qe/jupyter/dev/~/project/qe/git/aiida-core/src/aiida/manage/manager.py:394) return broker.get_communicator()
File ~/project/qe/git/aiida-core/src/aiida/brokers/zmq/broker.py:159, in ZmqBroker.get_communicator(self, wait_for_broker)
157 break
158 else:
--> [159](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/qe/jupyter/dev/~/project/qe/git/aiida-core/src/aiida/brokers/zmq/broker.py:159) raise ConnectionError(f'Broker did not become ready within {wait_for_broker}s: {self}')
161 self._communicator = ZmqCommunicator(
162 router_endpoint=router_endpoint,
163 )
164 self._communicator.start()
ConnectionError: Broker did not become ready within 30.0s: ZMQ Broker @ /Users/mbercx/project/qe/.aiida/broker/8c6a9c1f5a414410b8961f8f315ff1ba <not running>I checked And saw that the A few notes here:
|
|
This comment is not finished yet but already contains quite a lot of information. I will still edit it until i answered all points. Thanks for the feedback!
For new profiles created from this branch the broker should start with the daemon. If you don't restart the daemon after changing your aiida-core version, the broker is never started. I think we can assume that people restart the daemon when installing a new aiida-core version. There are a lot of things that break if you change aiida-core and don't restart the daemon since the workers still have the old code. We never mentioned that we support hot reloading for workers so I think that problem you describe is acceptable. Maybe we can improve the communication of this problem in a different PR: We store the aiida-core version in some worker PID or config file. Then when people to
Yes, so we could choose a different design where a broker is not needed. The client directly sends messages to the workers. Since the number of workers and clients are highly limited and do not need to scale well for large numbers, it might be the best design for people using AiiDA on their local machine and connecting to different HPCs. There is only one client and the number of workers is typically the number of processes, which even on a big workstation is limited to 256 workers. However, we already have this broker architecture due to RMQ in place, and changing that is quite a dramatic change that affects more places in the code base. Because I mimicked the RMQ broker pattern, I could implement it by just mimicking the broker communication pattern in the new broker, so it was easy to implement. If you remove the broker completely, then you need to consider things like that messages are not persisted anymore by the broker, so where do you move this responsibility? There are multiple options for this and there is definitely a solution that works for our use case, but they all require changes on the clients, which means we probably need to touch plumpy and kiwipy and discuss this with the team. So let's say we have done this, then we still need to keep the old logic since we need to support the RMQ broker for the use case of many-clients-to-many-workers (only used by a fraction of AiiDA users but still they exist). Then we have two different logics for each broker. It is possible, but it increases maintenance costs definitely. Maybe this is the way we want to go in the future, but maybe this approach which is much simpler is good enough. People anyway need to start the daemon, and if we hide the broker inside the daemon setup, maybe the complexity for the user is roughly the same as not having a broker. |
TODOs: