Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions engine/src/workers/queue/adapters/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ impl QueueAdapter for BridgeAdapter {
))
}

#[allow(clippy::too_many_arguments)]
async fn publish_to_function_queue(
&self,
queue_name: &str,
Expand All @@ -330,6 +331,9 @@ impl QueueAdapter for BridgeAdapter {
_backoff_ms: u64,
_traceparent: Option<String>,
_baggage: Option<String>,
// Priority is resolved by the remote engine via its own adapter; the
// bridge forwards the enqueue unchanged.
_priority: Option<u8>,
) {
if let Err(e) = self
.bridge
Expand Down
18 changes: 18 additions & 0 deletions engine/src/workers/queue/adapters/builtin/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ impl QueueAdapter for BuiltinQueueAdapter {
Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn publish_to_function_queue(
&self,
queue_name: &str,
Expand All @@ -405,6 +406,9 @@ impl QueueAdapter for BuiltinQueueAdapter {
backoff_ms: u64,
traceparent: Option<String>,
baggage: Option<String>,
// Priority is a RabbitMQ-only feature; the in-process builtin queue
// ignores it.
_priority: Option<u8>,
) {
let namespaced_queue = format!("__fn_queue::{}", queue_name);
let job = Job {
Expand Down Expand Up @@ -700,6 +704,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: Some(1000),
max_priority: None,
};

let subscription_config = Some(config).map(|c| SubscriptionConfig {
Expand Down Expand Up @@ -729,6 +734,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: None,
max_priority: None,
};

let subscription_config = Some(config).map(|c| SubscriptionConfig {
Expand Down Expand Up @@ -756,6 +762,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: None,
max_priority: None,
};

let subscription_config = Some(config).map(|c| SubscriptionConfig {
Expand All @@ -782,6 +789,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: None,
max_priority: None,
};

let subscription_config = Some(config).map(|c| SubscriptionConfig {
Expand Down Expand Up @@ -859,6 +867,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: Some(25),
max_priority: None,
};
adapter
.subscribe("jobs", "sub-fifo", "queue.success", None, Some(fifo_config))
Expand All @@ -872,6 +881,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: Some(10),
max_priority: None,
};
adapter
.subscribe(
Expand Down Expand Up @@ -936,6 +946,7 @@ mod tests {
1000,
None,
None,
None,
)
.await;

Expand Down Expand Up @@ -983,6 +994,7 @@ mod tests {
1000,
None,
None,
None,
)
.await;

Expand Down Expand Up @@ -1057,6 +1069,7 @@ mod tests {
1000,
None,
None,
None,
)
.await;
}
Expand Down Expand Up @@ -1133,6 +1146,7 @@ mod tests {
1000,
None,
None,
None,
)
.await;

Expand Down Expand Up @@ -1176,6 +1190,7 @@ mod tests {
1000,
None,
None,
None,
)
.await;

Expand Down Expand Up @@ -1222,6 +1237,7 @@ mod tests {
1000,
None,
None,
None,
)
.await;
}
Expand Down Expand Up @@ -1277,6 +1293,7 @@ mod tests {
1000,
None,
None,
None,
)
.await;
}
Expand Down Expand Up @@ -1319,6 +1336,7 @@ mod tests {
1000,
Some("00-abc-def-01".to_string()),
Some("key=value".to_string()),
None,
)
.await;

Expand Down
21 changes: 17 additions & 4 deletions engine/src/workers/queue/adapters/rabbitmq/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,14 @@ impl QueueAdapter for RabbitMQAdapter {
traceparent: Option<String>,
baggage: Option<String>,
) {
let job = Job::new(topic, data, self.config.max_attempts, traceparent, baggage);
// Topic fanout publishes one message to every bound subscriber queue, so
// the priority is resolved once here from the adapter-level
// `priority_field`. Each subscriber queue honors it only if declared with
// `maxPriority`, and clamps it to its own `x-max-priority`.
let priority =
super::types::priority_from_data(&data, self.config.priority_field.as_deref());
let job = Job::new(topic, data, self.config.max_attempts, traceparent, baggage)
.with_priority(priority);

if let Err(e) = self.topology.setup_topic(topic).await {
tracing::error!(
Expand Down Expand Up @@ -307,9 +314,10 @@ impl QueueAdapter for RabbitMQAdapter {
return;
}

let subscriber_max_priority = queue_config.as_ref().and_then(|c| c.max_priority);
if let Err(e) = self
.topology
.setup_subscriber_queue(&topic, &function_id)
.setup_subscriber_queue(&topic, &function_id, subscriber_max_priority)
.await
{
tracing::error!(
Expand Down Expand Up @@ -831,6 +839,7 @@ impl QueueAdapter for RabbitMQAdapter {
Ok(results)
}

#[allow(clippy::too_many_arguments)]
async fn publish_to_function_queue(
&self,
queue_name: &str,
Expand All @@ -841,6 +850,7 @@ impl QueueAdapter for RabbitMQAdapter {
_backoff_ms: u64,
traceparent: Option<String>,
baggage: Option<String>,
priority: Option<u8>,
) {
use super::naming::FnQueueNames;

Expand Down Expand Up @@ -872,11 +882,14 @@ impl QueueAdapter for RabbitMQAdapter {
);
}

let properties = lapin::BasicProperties::default()
let mut properties = lapin::BasicProperties::default()
.with_content_type("application/json".into())
.with_delivery_mode(2)
.with_message_id(message_id.into())
.with_headers(headers);
if let Some(p) = priority {
properties = properties.with_priority(p);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

match self
.channel
Expand Down Expand Up @@ -906,7 +919,7 @@ impl QueueAdapter for RabbitMQAdapter {
config: &FunctionQueueConfig,
) -> anyhow::Result<()> {
self.topology
.setup_function_queue(queue_name, config.backoff_ms)
.setup_function_queue(queue_name, config.backoff_ms, config.max_priority)
.await
.map_err(|e| anyhow::anyhow!("Failed to setup function queue topology: {}", e))
}
Expand Down
8 changes: 7 additions & 1 deletion engine/src/workers/queue/adapters/rabbitmq/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,16 @@ impl Publisher {
) -> Result<()> {
let payload = serde_json::to_vec(job)?;

let properties = lapin::BasicProperties::default()
let mut properties = lapin::BasicProperties::default()
.with_content_type("application/json".into())
.with_delivery_mode(2)
.with_headers(headers.unwrap_or_default());
// Carry the job's priority through fanout publishes, requeues, and
// DLQ-redrive republishes so ordering survives retries on priority
// queues. No-op on queues without `x-max-priority`.
if let Some(p) = job.priority {
properties = properties.with_priority(p);
}

self.channel
.basic_publish(
Expand Down
34 changes: 31 additions & 3 deletions engine/src/workers/queue/adapters/rabbitmq/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ impl TopologyManager {
Ok(())
}

pub async fn setup_subscriber_queue(&self, topic: &str, function_id: &str) -> Result<()> {
pub async fn setup_subscriber_queue(
&self,
topic: &str,
function_id: &str,
max_priority: Option<u8>,
) -> Result<()> {
let names = RabbitNames::new(topic);

let queue_name = names.function_queue(function_id);
Expand All @@ -84,14 +89,17 @@ impl TopologyManager {
)
.await?;

let mut queue_args = FieldTable::default();
with_max_priority(&mut queue_args, max_priority);

self.channel
.queue_declare(
&queue_name,
QueueDeclareOptions {
durable: true,
..Default::default()
},
FieldTable::default(),
queue_args,
)
.await?;

Expand All @@ -114,7 +122,12 @@ impl TopologyManager {
Ok(())
}

pub async fn setup_function_queue(&self, queue_name: &str, backoff_ms: u64) -> Result<()> {
pub async fn setup_function_queue(
&self,
queue_name: &str,
backoff_ms: u64,
max_priority: Option<u8>,
) -> Result<()> {
let names = FnQueueNames::new(queue_name);

// Main exchange + queue with DLX to retry
Expand All @@ -138,6 +151,10 @@ impl TopologyManager {
"x-dead-letter-exchange".into(),
AMQPValue::LongString(names.dlq_exchange().into()),
);
// A priority queue must be declared with `x-max-priority`; declare the
// retry queue with it too so retried messages keep their ordering (the
// `priority` property survives dead-lettering). The DLQ stays plain.
with_max_priority(&mut main_queue_args, max_priority);

self.channel
.queue_declare(
Expand Down Expand Up @@ -186,6 +203,7 @@ impl TopologyManager {
"x-dead-letter-routing-key".into(),
AMQPValue::LongString(queue_name.into()),
);
with_max_priority(&mut retry_queue_args, max_priority);

self.channel
.queue_declare(
Expand Down Expand Up @@ -246,3 +264,13 @@ impl TopologyManager {
Ok(())
}
}

/// Insert the `x-max-priority` declaration argument when a priority level is
/// configured, turning the queue into a RabbitMQ priority queue. Declared as a
/// signed integer (`AMQPValue::LongInt`) to match the canonical client encoding;
/// the value is fixed at queue-creation time and cannot be changed later.
fn with_max_priority(args: &mut FieldTable, max_priority: Option<u8>) {
if let Some(max) = max_priority {
args.insert("x-max-priority".into(), AMQPValue::LongInt(max as i32));
}
}
Loading
Loading