Skip to content

feat(iii-queue): add RabbitMQ priority queue support#1890

Open
ytallo wants to merge 1 commit into
mainfrom
feat/rabbitmq-priority-queues
Open

feat(iii-queue): add RabbitMQ priority queue support#1890
ytallo wants to merge 1 commit into
mainfrom
feat/rabbitmq-priority-queues

Conversation

@ytallo

@ytallo ytallo commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

What

Adds RabbitMQ priority queue support to the RabbitMQ queue provider. Higher-priority messages are delivered before lower-priority ones waiting in the same queue. Works on both function (named) queues and subscriber (topic fanout) queues.

The per-message priority is read from a payload field — mirroring how FIFO reads message_group_field — so there is no change to the enqueue/publish API or the SDKs.

How it works

RabbitMQ priority needs two things together: the queue declared with x-max-priority, and each message published with the AMQP priority property (the broker delivers higher first and clamps to the queue's max).

Configuration

  • Function queues (queue_configs.<name>):
    • max_priority — declares the queue with x-max-priority (1–255; RabbitMQ recommends ≤ 10).
    • priority_field — the integer field in the message data that sets each message's priority.
  • Subscriber queues:
    • maxPriority in the subscriber's queue_config declares its per-function queue with x-max-priority.
    • Because one fanout publish is copied to every bound queue, the priority value is resolved once at publish time from the adapter-level priority_field.
queue_configs:
  jobs:
    type: standard
    concurrency: 1          # priority ordering is only observable at low concurrency
    max_priority: 10        # declares x-max-priority
    priority_field: priority # read from data.priority

Behavior

  • Topology declares both the main and retry queues with x-max-priority so priority survives retries; the DLQ stays a plain queue.
  • The priority property is carried on the job and re-stamped on requeue and DLQ-redrive republishes.
  • Other adapters (builtin, redis, bridge) accept the new publish_to_function_queue priority argument and ignore it (RabbitMQ-only feature).

Subscriber consumer QoS fix: the subscriber worker now applies per-consumer prefetch (basic_qos), matching the function-queue consumer. Without it the broker delivered the entire queue to the consumer at once, so priority ordering on subscriber queues would have had no effect; this also gives subscribers proper broker-side backpressure.

Notes / constraints

  • x-max-priority is fixed at queue-creation time. Enabling priority on an existing queue (or changing the level) requires recreating the queue; a runtime config change does not take effect on an already-declared queue.
  • Priority ordering is only meaningful at low concurrency (high prefetch lets the broker hand out many messages before they can be reordered).

Test plan

  • cargo check -p iii --all-features --tests — 0 errors
  • Lib unit suite — 1813 passed (incl. 9 new: config validation/deserialization, maxPriority parsing, payload-field extraction)
  • RabbitMQ integration (testcontainers) — 20 passed, including 3 new:
    • function-queue drains highest-priority-first
    • subscriber queue drains highest-priority-first
    • x-max-priority present on main + retry queues, absent on DLQ
  • Builtin queue integration — 12 passed (trait signature change)
  • clippy — 0 errors

Ordering tests are deterministic: a gate handler holds the first message unacked under prefetch=1 while the backlog is enqueued, so the broker provably reorders by priority on delivery.

https://claude.ai/code/session_011TpFhqmhPLVZPsuXPdJ6Fz

Summary by CodeRabbit

  • New Features
    • Added RabbitMQ priority queue support, including priority-based ordering for function queues and topic subscribers.
    • Introduced configuration for per-message priority extraction and per-queue maximum priority limits.
    • RabbitMQ now applies priority to outgoing AMQP messages and enables priority queues for main and retry paths.
  • Configuration
    • Added validation rejecting max_priority: 0, and supports optional priority_field/max_priority when unset defaults remain unchanged.
  • Tests
    • Added integration tests covering priority ordering and verifying queue topology (x-max-priority) behavior.

@vercel

vercel Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
iii-website Ready Ready Preview, Comment Jun 23, 2026 6:38pm

Request Review

@coderabbitai

coderabbitai Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

Adds end-to-end RabbitMQ priority queue support. New max_priority and priority_field config fields are introduced on FunctionQueueConfig, SubscriberQueueConfig, and adapter configs. The Job model gains a priority field. RabbitMQ topology declares x-max-priority on main and retry queues. Per-message priority is resolved from payload data and propagated through AMQP BasicProperties. All QueueAdapter implementations are updated to accept the new priority parameter. The Worker now applies per-consumer QoS via basic_qos. Integration tests verify ordering and topology declarations.

Changes

RabbitMQ Priority Queue Feature

Layer / File(s) Summary
Priority config fields, Job model, and validation
engine/src/workers/queue/config.rs, engine/src/workers/queue/subscriber_config.rs, engine/src/workers/queue/adapters/rabbitmq/types.rs
FunctionQueueConfig gains max_priority (schema range 1–255, optional) and priority_field; SubscriberQueueConfig gains max_priority; RabbitMQConfig and RabbitmqAdapterConfig gain priority_field; Job gains optional priority field with with_priority builder; priority_from_data helper reads priority from JSON and saturates to u8::MAX; validation rejects max_priority == Some(0); unit tests cover config parsing, Job initialization, and priority resolution edge cases.
RabbitMQ x-max-priority topology declaration
engine/src/workers/queue/adapters/rabbitmq/topology.rs
setup_function_queue and setup_subscriber_queue accept new max_priority: Option<u8> parameter; with_max_priority helper inserts x-max-priority AMQP argument as LongInt; main and retry queues are declared as priority queues when configured; DLQ queue is left unmodified for priority handling.
Priority resolution and AMQP property propagation
engine/src/workers/queue/adapters/rabbitmq/adapter.rs, engine/src/workers/queue/adapters/rabbitmq/publisher.rs, engine/src/workers/queue/queue.rs
enqueue resolves priority from payload via priority_from_data and attaches it to Job via with_priority; subscribe forwards queue_config.max_priority to topology setup; publish_to_function_queue accepts priority parameter and applies it to AMQP BasicProperties with with_priority; enqueue_to_function_queue reads priority_field, clamps to max_priority, and passes Option<u8> to adapter publish.
Worker per-consumer QoS prefetch wiring
engine/src/workers/queue/adapters/rabbitmq/worker.rs
Worker struct stores prefetch_count: u16; Worker::new persists the constructor argument; Worker::run calls channel.basic_qos before the consumer loop and returns early with an error log on failure.
QueueAdapter trait update and adapter conformance
engine/src/workers/queue/mod.rs, engine/src/workers/queue/adapters/bridge.rs, engine/src/workers/queue/adapters/builtin/adapter.rs, engine/src/workers/queue/adapters/redis_adapter.rs, engine/src/workers/queue/queue.rs
Trait publish_to_function_queue gains _priority: Option<u8> parameter with clippy allow attribute; bridge, builtin, and Redis adapters add the ignored parameter and suppress clippy warnings; builtin adapter tests updated to include max_priority: None in all SubscriberQueueConfig literals; mock and all existing test call sites updated with trailing None for priority.
Integration test helpers and priority ordering tests
engine/tests/common/rabbitmq_helpers.rs, engine/tests/rabbitmq_queue_integration.rs
Adds rabbitmq_priority_queue_config (queue with max_priority and priority_field) and rabbitmq_priority_topic_config (adapter-level priority_field) test helpers; adds register_priority_gate_function gating helper using tokio::sync::Notify; adds three integration tests: rmq_priority_queue_orders_by_priority (drain ordering), rmq_priority_queue_topology_declares_x_max_priority (Management API topology assertion), rmq_subscriber_priority_orders_by_priority (topic subscriber priority ordering).

Sequence Diagram(s)

sequenceDiagram
  participant QueueWorker
  participant RabbitMQAdapter
  participant TopologyManager
  participant Publisher
  participant RabbitMQ

  rect rgba(100, 149, 237, 0.5)
    Note over QueueWorker,RabbitMQ: Function queue enqueue path
    QueueWorker->>QueueWorker: read priority_field from data, clamp to max_priority → Option<u8>
    QueueWorker->>RabbitMQAdapter: publish_to_function_queue(..., priority)
    RabbitMQAdapter->>Publisher: publish_to_exchange(job with priority set)
    Publisher->>Publisher: BasicProperties.with_priority(p) when Some
    Publisher->>RabbitMQ: basic_publish with priority in AMQP properties
  end

  rect rgba(144, 238, 144, 0.5)
    Note over RabbitMQAdapter,RabbitMQ: Topic subscriber enqueue path
    RabbitMQAdapter->>RabbitMQAdapter: priority_from_data(payload, priority_field) → Option<u8>
    RabbitMQAdapter->>RabbitMQAdapter: Job::new().with_priority(priority)
    RabbitMQAdapter->>Publisher: publish_to_exchange(job)
    Publisher->>RabbitMQ: basic_publish with priority in AMQP properties
  end

  rect rgba(255, 165, 0, 0.5)
    Note over RabbitMQAdapter,RabbitMQ: Queue and subscriber setup
    RabbitMQAdapter->>TopologyManager: setup_function_queue(name, backoff_ms, max_priority)
    TopologyManager->>RabbitMQ: queue_declare main queue with x-max-priority
    TopologyManager->>RabbitMQ: queue_declare retry queue with x-max-priority
    TopologyManager->>RabbitMQ: queue_declare DLQ (no priority)
    RabbitMQAdapter->>TopologyManager: setup_subscriber_queue(topic, fn_id, max_priority)
    TopologyManager->>RabbitMQ: queue_declare subscriber queue with x-max-priority
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • iii-hq/iii#1873: Main PR adds priority fields to queue configs that this PR extends; the linked PR makes queue configs hot-reloadable and JSON-schema-serializable, directly touching the same config struct definitions.

Suggested reviewers

  • sergiofilhowz

Poem

🐇 A rabbit hops through queues of might,
With priorities set just right!
High numbers rush, low numbers wait,
x-max-priority seals their fate.
Dead letters retry, still ranked by score —
No message lost, none ignored anymore! 🎯

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main feature being added: RabbitMQ priority queue support.
Description check ✅ Passed The description comprehensively covers what was changed, why, how it works, configuration details, behavior expectations, constraints, and testing performed.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/rabbitmq-priority-queues

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
engine/tests/rabbitmq_queue_integration.rs (1)

1544-1545: ⚡ Quick win

Replace fixed sleeps before assertions with condition-based waits.

These sleep(Duration::from_secs(5)) calls can make CI flaky; wait until order.len() reaches the expected count with a timeout instead.

♻️ Proposed change
-    tokio::time::sleep(Duration::from_secs(5)).await;
+    tokio::time::timeout(Duration::from_secs(10), async {
+        loop {
+            if order.lock().await.len() == 6 {
+                break;
+            }
+            tokio::time::sleep(Duration::from_millis(25)).await;
+        }
+    })
+    .await
+    .expect("priority queue should drain all test messages");
-    tokio::time::sleep(Duration::from_secs(5)).await;
+    tokio::time::timeout(Duration::from_secs(10), async {
+        loop {
+            if order.lock().await.len() == 6 {
+                break;
+            }
+            tokio::time::sleep(Duration::from_millis(25)).await;
+        }
+    })
+    .await
+    .expect("subscriber priority queue should drain all test messages");

Also applies to: 1704-1705

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@engine/tests/rabbitmq_queue_integration.rs` around lines 1544 - 1545, Replace
the fixed tokio::time::sleep(Duration::from_secs(5)) calls throughout the test
with condition-based polling logic that waits until order.len() reaches the
expected count. Implement a loop with a timeout (using tokio::time::timeout or
similar) that continuously checks if the order vector has reached the desired
length, polling at small intervals rather than sleeping unconditionally. This
applies to the sleep calls at lines 1544-1545 and 1704-1705 (and any similar
instances), eliminating the flakiness caused by arbitrary fixed delays.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@engine/src/workers/queue/adapters/rabbitmq/adapter.rs`:
- Around line 885-892: The priority set in the initial message publish is not
being preserved during retry and DLQ-redrive republish operations. Locate all
code paths where BasicProperties is rebuilt for retry or DLQ-redrive scenarios
(these are separate from the main publish shown in the diff) and ensure that
when reconstructing BasicProperties in these paths, the priority value is
extracted from the original message headers or properties and then reapplied
using with_priority, just as it is in the initial publish block shown here with
the conditional check for Some(p).

---

Nitpick comments:
In `@engine/tests/rabbitmq_queue_integration.rs`:
- Around line 1544-1545: Replace the fixed
tokio::time::sleep(Duration::from_secs(5)) calls throughout the test with
condition-based polling logic that waits until order.len() reaches the expected
count. Implement a loop with a timeout (using tokio::time::timeout or similar)
that continuously checks if the order vector has reached the desired length,
polling at small intervals rather than sleeping unconditionally. This applies to
the sleep calls at lines 1544-1545 and 1704-1705 (and any similar instances),
eliminating the flakiness caused by arbitrary fixed delays.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: ddd63b32-e9f9-4826-bb3c-b3e311e2045a

📥 Commits

Reviewing files that changed from the base of the PR and between dbd7dec and 0f40d57.

📒 Files selected for processing (14)
  • engine/src/workers/queue/adapters/bridge.rs
  • engine/src/workers/queue/adapters/builtin/adapter.rs
  • engine/src/workers/queue/adapters/rabbitmq/adapter.rs
  • engine/src/workers/queue/adapters/rabbitmq/publisher.rs
  • engine/src/workers/queue/adapters/rabbitmq/topology.rs
  • engine/src/workers/queue/adapters/rabbitmq/types.rs
  • engine/src/workers/queue/adapters/rabbitmq/worker.rs
  • engine/src/workers/queue/adapters/redis_adapter.rs
  • engine/src/workers/queue/config.rs
  • engine/src/workers/queue/mod.rs
  • engine/src/workers/queue/queue.rs
  • engine/src/workers/queue/subscriber_config.rs
  • engine/tests/common/rabbitmq_helpers.rs
  • engine/tests/rabbitmq_queue_integration.rs

Comment thread engine/src/workers/queue/adapters/rabbitmq/adapter.rs
Priority queues are now supported on both function (named) queues and
subscriber (topic fanout) queues. The per-message priority is read from a
payload field, mirroring how FIFO reads `message_group_field` — no change to
the enqueue/publish API or the SDKs.

Configuration:
- Function queues (`queue_configs.<name>`): `max_priority` declares the queue
  with `x-max-priority`; `priority_field` names the integer field in the
  message data that sets each message's priority.
- Subscriber queues: `maxPriority` in the subscriber's `queue_config` declares
  its per-function queue with `x-max-priority`. Because a fanout publish is
  copied to every bound queue, the priority value is resolved once at publish
  time from the adapter-level `priority_field`.

Behavior:
- Topology declares both the main and retry queues with `x-max-priority` so
  priority survives retries; the DLQ stays a plain queue.
- The priority property is carried on the job and re-stamped on requeue and
  DLQ-redrive republishes.
- `x-max-priority` is fixed at queue-creation time; changing it for an existing
  queue has no effect (the queue must be recreated), and priority ordering is
  only observable at low concurrency.

The subscriber worker now applies per-consumer prefetch (`basic_qos`), matching
the function-queue consumer. Without it the broker delivered the whole queue to
the consumer at once, so priority ordering on subscriber queues had no effect;
this also gives subscribers proper broker-side backpressure.

Other adapters (builtin, redis, bridge) accept the new `publish_to_function_queue`
priority argument and ignore it.

Tests: priority unit tests for config validation/parsing and field extraction;
integration tests (testcontainers) covering function-queue ordering, subscriber
ordering, and `x-max-priority` topology assertions.

Claude-Session: https://claude.ai/code/session_011TpFhqmhPLVZPsuXPdJ6Fz

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
engine/tests/rabbitmq_queue_integration.rs (1)

1544-1551: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Prefer polling the recorded order over a fixed drain sleep.

The deterministic gate makes ordering reliable, but the final sleep(Duration::from_secs(5)) is a fixed wait for the backlog to drain. On a slow CI runner this can be insufficient, so recorded may hold fewer than 6 entries and the assert_eq! fails with a misleading "wrong order" message rather than a timeout. Polling until 6 messages are recorded (with a timeout) makes the test both faster and more robust.

♻️ Poll for completion instead of a fixed sleep
-    tokio::time::sleep(Duration::from_secs(5)).await;
-
-    let recorded = order.lock().await;
+    tokio::time::timeout(Duration::from_secs(10), async {
+        loop {
+            if order.lock().await.len() == 6 {
+                break;
+            }
+            tokio::time::sleep(Duration::from_millis(50)).await;
+        }
+    })
+    .await
+    .expect("all 6 messages should drain");
+
+    let recorded = order.lock().await;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@engine/tests/rabbitmq_queue_integration.rs` around lines 1544 - 1551, Replace
the fixed tokio::time::sleep(Duration::from_secs(5)) with a polling loop that
repeatedly checks the recorded vector until it contains 6 entries, adding a
timeout guard to prevent infinite waits. This makes the test deterministic by
waiting for the actual completion condition (6 messages recorded) rather than a
fixed duration, which handles slow CI runners gracefully and provides clearer
failure messages if the condition is not met within the timeout.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@engine/tests/rabbitmq_queue_integration.rs`:
- Around line 1544-1551: Replace the fixed
tokio::time::sleep(Duration::from_secs(5)) with a polling loop that repeatedly
checks the recorded vector until it contains 6 entries, adding a timeout guard
to prevent infinite waits. This makes the test deterministic by waiting for the
actual completion condition (6 messages recorded) rather than a fixed duration,
which handles slow CI runners gracefully and provides clearer failure messages
if the condition is not met within the timeout.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 5639ca38-72ea-4c64-8ee1-65c5193bcff9

📥 Commits

Reviewing files that changed from the base of the PR and between 0f40d57 and 1ef63ff.

📒 Files selected for processing (14)
  • engine/src/workers/queue/adapters/bridge.rs
  • engine/src/workers/queue/adapters/builtin/adapter.rs
  • engine/src/workers/queue/adapters/rabbitmq/adapter.rs
  • engine/src/workers/queue/adapters/rabbitmq/publisher.rs
  • engine/src/workers/queue/adapters/rabbitmq/topology.rs
  • engine/src/workers/queue/adapters/rabbitmq/types.rs
  • engine/src/workers/queue/adapters/rabbitmq/worker.rs
  • engine/src/workers/queue/adapters/redis_adapter.rs
  • engine/src/workers/queue/config.rs
  • engine/src/workers/queue/mod.rs
  • engine/src/workers/queue/queue.rs
  • engine/src/workers/queue/subscriber_config.rs
  • engine/tests/common/rabbitmq_helpers.rs
  • engine/tests/rabbitmq_queue_integration.rs
🚧 Files skipped from review as they are similar to previous changes (13)
  • engine/src/workers/queue/adapters/rabbitmq/publisher.rs
  • engine/src/workers/queue/adapters/redis_adapter.rs
  • engine/src/workers/queue/subscriber_config.rs
  • engine/tests/common/rabbitmq_helpers.rs
  • engine/src/workers/queue/adapters/rabbitmq/worker.rs
  • engine/src/workers/queue/adapters/rabbitmq/topology.rs
  • engine/src/workers/queue/adapters/bridge.rs
  • engine/src/workers/queue/adapters/rabbitmq/adapter.rs
  • engine/src/workers/queue/queue.rs
  • engine/src/workers/queue/adapters/builtin/adapter.rs
  • engine/src/workers/queue/mod.rs
  • engine/src/workers/queue/adapters/rabbitmq/types.rs
  • engine/src/workers/queue/config.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants