feat(iii-queue): add RabbitMQ priority queue support#1890
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📝 WalkthroughWalkthroughAdds end-to-end RabbitMQ priority queue support. New ChangesRabbitMQ Priority Queue Feature
Sequence Diagram(s)sequenceDiagram
participant QueueWorker
participant RabbitMQAdapter
participant TopologyManager
participant Publisher
participant RabbitMQ
rect rgba(100, 149, 237, 0.5)
Note over QueueWorker,RabbitMQ: Function queue enqueue path
QueueWorker->>QueueWorker: read priority_field from data, clamp to max_priority → Option<u8>
QueueWorker->>RabbitMQAdapter: publish_to_function_queue(..., priority)
RabbitMQAdapter->>Publisher: publish_to_exchange(job with priority set)
Publisher->>Publisher: BasicProperties.with_priority(p) when Some
Publisher->>RabbitMQ: basic_publish with priority in AMQP properties
end
rect rgba(144, 238, 144, 0.5)
Note over RabbitMQAdapter,RabbitMQ: Topic subscriber enqueue path
RabbitMQAdapter->>RabbitMQAdapter: priority_from_data(payload, priority_field) → Option<u8>
RabbitMQAdapter->>RabbitMQAdapter: Job::new().with_priority(priority)
RabbitMQAdapter->>Publisher: publish_to_exchange(job)
Publisher->>RabbitMQ: basic_publish with priority in AMQP properties
end
rect rgba(255, 165, 0, 0.5)
Note over RabbitMQAdapter,RabbitMQ: Queue and subscriber setup
RabbitMQAdapter->>TopologyManager: setup_function_queue(name, backoff_ms, max_priority)
TopologyManager->>RabbitMQ: queue_declare main queue with x-max-priority
TopologyManager->>RabbitMQ: queue_declare retry queue with x-max-priority
TopologyManager->>RabbitMQ: queue_declare DLQ (no priority)
RabbitMQAdapter->>TopologyManager: setup_subscriber_queue(topic, fn_id, max_priority)
TopologyManager->>RabbitMQ: queue_declare subscriber queue with x-max-priority
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
engine/tests/rabbitmq_queue_integration.rs (1)
1544-1545: ⚡ Quick winReplace fixed sleeps before assertions with condition-based waits.
These
sleep(Duration::from_secs(5))calls can make CI flaky; wait untilorder.len()reaches the expected count with a timeout instead.♻️ Proposed change
- tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::timeout(Duration::from_secs(10), async { + loop { + if order.lock().await.len() == 6 { + break; + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + }) + .await + .expect("priority queue should drain all test messages");- tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::timeout(Duration::from_secs(10), async { + loop { + if order.lock().await.len() == 6 { + break; + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + }) + .await + .expect("subscriber priority queue should drain all test messages");Also applies to: 1704-1705
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@engine/tests/rabbitmq_queue_integration.rs` around lines 1544 - 1545, Replace the fixed tokio::time::sleep(Duration::from_secs(5)) calls throughout the test with condition-based polling logic that waits until order.len() reaches the expected count. Implement a loop with a timeout (using tokio::time::timeout or similar) that continuously checks if the order vector has reached the desired length, polling at small intervals rather than sleeping unconditionally. This applies to the sleep calls at lines 1544-1545 and 1704-1705 (and any similar instances), eliminating the flakiness caused by arbitrary fixed delays.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@engine/src/workers/queue/adapters/rabbitmq/adapter.rs`:
- Around line 885-892: The priority set in the initial message publish is not
being preserved during retry and DLQ-redrive republish operations. Locate all
code paths where BasicProperties is rebuilt for retry or DLQ-redrive scenarios
(these are separate from the main publish shown in the diff) and ensure that
when reconstructing BasicProperties in these paths, the priority value is
extracted from the original message headers or properties and then reapplied
using with_priority, just as it is in the initial publish block shown here with
the conditional check for Some(p).
---
Nitpick comments:
In `@engine/tests/rabbitmq_queue_integration.rs`:
- Around line 1544-1545: Replace the fixed
tokio::time::sleep(Duration::from_secs(5)) calls throughout the test with
condition-based polling logic that waits until order.len() reaches the expected
count. Implement a loop with a timeout (using tokio::time::timeout or similar)
that continuously checks if the order vector has reached the desired length,
polling at small intervals rather than sleeping unconditionally. This applies to
the sleep calls at lines 1544-1545 and 1704-1705 (and any similar instances),
eliminating the flakiness caused by arbitrary fixed delays.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: ddd63b32-e9f9-4826-bb3c-b3e311e2045a
📒 Files selected for processing (14)
engine/src/workers/queue/adapters/bridge.rsengine/src/workers/queue/adapters/builtin/adapter.rsengine/src/workers/queue/adapters/rabbitmq/adapter.rsengine/src/workers/queue/adapters/rabbitmq/publisher.rsengine/src/workers/queue/adapters/rabbitmq/topology.rsengine/src/workers/queue/adapters/rabbitmq/types.rsengine/src/workers/queue/adapters/rabbitmq/worker.rsengine/src/workers/queue/adapters/redis_adapter.rsengine/src/workers/queue/config.rsengine/src/workers/queue/mod.rsengine/src/workers/queue/queue.rsengine/src/workers/queue/subscriber_config.rsengine/tests/common/rabbitmq_helpers.rsengine/tests/rabbitmq_queue_integration.rs
Priority queues are now supported on both function (named) queues and subscriber (topic fanout) queues. The per-message priority is read from a payload field, mirroring how FIFO reads `message_group_field` — no change to the enqueue/publish API or the SDKs. Configuration: - Function queues (`queue_configs.<name>`): `max_priority` declares the queue with `x-max-priority`; `priority_field` names the integer field in the message data that sets each message's priority. - Subscriber queues: `maxPriority` in the subscriber's `queue_config` declares its per-function queue with `x-max-priority`. Because a fanout publish is copied to every bound queue, the priority value is resolved once at publish time from the adapter-level `priority_field`. Behavior: - Topology declares both the main and retry queues with `x-max-priority` so priority survives retries; the DLQ stays a plain queue. - The priority property is carried on the job and re-stamped on requeue and DLQ-redrive republishes. - `x-max-priority` is fixed at queue-creation time; changing it for an existing queue has no effect (the queue must be recreated), and priority ordering is only observable at low concurrency. The subscriber worker now applies per-consumer prefetch (`basic_qos`), matching the function-queue consumer. Without it the broker delivered the whole queue to the consumer at once, so priority ordering on subscriber queues had no effect; this also gives subscribers proper broker-side backpressure. Other adapters (builtin, redis, bridge) accept the new `publish_to_function_queue` priority argument and ignore it. Tests: priority unit tests for config validation/parsing and field extraction; integration tests (testcontainers) covering function-queue ordering, subscriber ordering, and `x-max-priority` topology assertions. Claude-Session: https://claude.ai/code/session_011TpFhqmhPLVZPsuXPdJ6Fz
0f40d57 to
1ef63ff
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
engine/tests/rabbitmq_queue_integration.rs (1)
1544-1551: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winPrefer polling the recorded order over a fixed drain sleep.
The deterministic gate makes ordering reliable, but the final
sleep(Duration::from_secs(5))is a fixed wait for the backlog to drain. On a slow CI runner this can be insufficient, sorecordedmay hold fewer than 6 entries and theassert_eq!fails with a misleading "wrong order" message rather than a timeout. Polling until 6 messages are recorded (with a timeout) makes the test both faster and more robust.♻️ Poll for completion instead of a fixed sleep
- tokio::time::sleep(Duration::from_secs(5)).await; - - let recorded = order.lock().await; + tokio::time::timeout(Duration::from_secs(10), async { + loop { + if order.lock().await.len() == 6 { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await + .expect("all 6 messages should drain"); + + let recorded = order.lock().await;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@engine/tests/rabbitmq_queue_integration.rs` around lines 1544 - 1551, Replace the fixed tokio::time::sleep(Duration::from_secs(5)) with a polling loop that repeatedly checks the recorded vector until it contains 6 entries, adding a timeout guard to prevent infinite waits. This makes the test deterministic by waiting for the actual completion condition (6 messages recorded) rather than a fixed duration, which handles slow CI runners gracefully and provides clearer failure messages if the condition is not met within the timeout.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@engine/tests/rabbitmq_queue_integration.rs`:
- Around line 1544-1551: Replace the fixed
tokio::time::sleep(Duration::from_secs(5)) with a polling loop that repeatedly
checks the recorded vector until it contains 6 entries, adding a timeout guard
to prevent infinite waits. This makes the test deterministic by waiting for the
actual completion condition (6 messages recorded) rather than a fixed duration,
which handles slow CI runners gracefully and provides clearer failure messages
if the condition is not met within the timeout.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 5639ca38-72ea-4c64-8ee1-65c5193bcff9
📒 Files selected for processing (14)
engine/src/workers/queue/adapters/bridge.rsengine/src/workers/queue/adapters/builtin/adapter.rsengine/src/workers/queue/adapters/rabbitmq/adapter.rsengine/src/workers/queue/adapters/rabbitmq/publisher.rsengine/src/workers/queue/adapters/rabbitmq/topology.rsengine/src/workers/queue/adapters/rabbitmq/types.rsengine/src/workers/queue/adapters/rabbitmq/worker.rsengine/src/workers/queue/adapters/redis_adapter.rsengine/src/workers/queue/config.rsengine/src/workers/queue/mod.rsengine/src/workers/queue/queue.rsengine/src/workers/queue/subscriber_config.rsengine/tests/common/rabbitmq_helpers.rsengine/tests/rabbitmq_queue_integration.rs
🚧 Files skipped from review as they are similar to previous changes (13)
- engine/src/workers/queue/adapters/rabbitmq/publisher.rs
- engine/src/workers/queue/adapters/redis_adapter.rs
- engine/src/workers/queue/subscriber_config.rs
- engine/tests/common/rabbitmq_helpers.rs
- engine/src/workers/queue/adapters/rabbitmq/worker.rs
- engine/src/workers/queue/adapters/rabbitmq/topology.rs
- engine/src/workers/queue/adapters/bridge.rs
- engine/src/workers/queue/adapters/rabbitmq/adapter.rs
- engine/src/workers/queue/queue.rs
- engine/src/workers/queue/adapters/builtin/adapter.rs
- engine/src/workers/queue/mod.rs
- engine/src/workers/queue/adapters/rabbitmq/types.rs
- engine/src/workers/queue/config.rs
What
Adds RabbitMQ priority queue support to the RabbitMQ queue provider. Higher-priority messages are delivered before lower-priority ones waiting in the same queue. Works on both function (named) queues and subscriber (topic fanout) queues.
The per-message priority is read from a payload field — mirroring how FIFO reads
message_group_field— so there is no change to the enqueue/publish API or the SDKs.How it works
RabbitMQ priority needs two things together: the queue declared with
x-max-priority, and each message published with the AMQPpriorityproperty (the broker delivers higher first and clamps to the queue's max).Configuration
queue_configs.<name>):max_priority— declares the queue withx-max-priority(1–255; RabbitMQ recommends ≤ 10).priority_field— the integer field in the messagedatathat sets each message's priority.maxPriorityin the subscriber'squeue_configdeclares its per-function queue withx-max-priority.priority_field.Behavior
x-max-priorityso priority survives retries; the DLQ stays a plain queue.publish_to_function_queuepriority argument and ignore it (RabbitMQ-only feature).Subscriber consumer QoS fix: the subscriber worker now applies per-consumer prefetch (
basic_qos), matching the function-queue consumer. Without it the broker delivered the entire queue to the consumer at once, so priority ordering on subscriber queues would have had no effect; this also gives subscribers proper broker-side backpressure.Notes / constraints
x-max-priorityis fixed at queue-creation time. Enabling priority on an existing queue (or changing the level) requires recreating the queue; a runtime config change does not take effect on an already-declared queue.concurrency(high prefetch lets the broker hand out many messages before they can be reordered).Test plan
cargo check -p iii --all-features --tests— 0 errorsmaxPriorityparsing, payload-field extraction)x-max-prioritypresent on main + retry queues, absent on DLQOrdering tests are deterministic: a gate handler holds the first message unacked under prefetch=1 while the backlog is enqueued, so the broker provably reorders by priority on delivery.
https://claude.ai/code/session_011TpFhqmhPLVZPsuXPdJ6Fz
Summary by CodeRabbit
max_priority: 0, and supports optionalpriority_field/max_prioritywhen unset defaults remain unchanged.x-max-priority) behavior.