Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions engine/src/workers/queue/adapters/builtin/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: Some(1000),
max_priority: None,
};

let subscription_config = Some(config).map(|c| SubscriptionConfig {
Expand Down Expand Up @@ -729,6 +730,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: None,
max_priority: None,
};

let subscription_config = Some(config).map(|c| SubscriptionConfig {
Expand Down Expand Up @@ -756,6 +758,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: None,
max_priority: None,
};

let subscription_config = Some(config).map(|c| SubscriptionConfig {
Expand All @@ -782,6 +785,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: None,
max_priority: None,
};

let subscription_config = Some(config).map(|c| SubscriptionConfig {
Expand Down Expand Up @@ -855,6 +859,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: Some(25),
max_priority: None,
};
adapter
.subscribe("jobs", "sub-fifo", "queue.success", None, Some(fifo_config))
Expand All @@ -868,6 +873,7 @@ mod tests {
delay_seconds: None,
backoff_type: None,
backoff_delay_ms: Some(10),
max_priority: None,
};
adapter
.subscribe(
Expand Down
36 changes: 32 additions & 4 deletions engine/src/workers/queue/adapters/rabbitmq/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,12 @@ impl QueueAdapter for RabbitMQAdapter {
traceparent: Option<String>,
baggage: Option<String>,
) {
let job = Job::new(topic, data, self.config.max_attempts, traceparent, baggage);
let priority = data
.get("_priority")
.and_then(|v| v.as_u64())
.map(|v| v.min(255) as u8);
let mut job = Job::new(topic, data, self.config.max_attempts, traceparent, baggage);
job.priority = priority;

if let Err(e) = self.topology.setup_topic(topic).await {
tracing::error!(
Expand Down Expand Up @@ -307,9 +312,10 @@ impl QueueAdapter for RabbitMQAdapter {
return;
}

let effective_max_priority = queue_config.as_ref().and_then(|c| c.max_priority);
if let Err(e) = self
.topology
.setup_subscriber_queue(&topic, &function_id)
.setup_subscriber_queue(&topic, &function_id, effective_max_priority)
.await
{
tracing::error!(
Expand Down Expand Up @@ -466,6 +472,11 @@ impl QueueAdapter for RabbitMQAdapter {
} else {
properties
};
let properties = if let Some(p) = delivery.delivery.properties.priority() {
properties.with_priority(*p)
} else {
properties
};

self.channel
.basic_publish(
Expand Down Expand Up @@ -563,6 +574,11 @@ impl QueueAdapter for RabbitMQAdapter {
} else {
properties
};
let properties = if let Some(p) = delivery_props.priority() {
properties.with_priority(*p)
} else {
properties
};

channel
.basic_publish(
Expand Down Expand Up @@ -872,11 +888,19 @@ impl QueueAdapter for RabbitMQAdapter {
);
}

let properties = lapin::BasicProperties::default()
let priority = data
.get("_priority")
.and_then(|v| v.as_u64())
.map(|v| v.min(255) as u8);

let mut properties = lapin::BasicProperties::default()
.with_content_type("application/json".into())
.with_delivery_mode(2)
.with_message_id(message_id.into())
.with_headers(headers);
if let Some(p) = priority {
properties = properties.with_priority(p);
}

match self
.channel
Expand Down Expand Up @@ -906,7 +930,7 @@ impl QueueAdapter for RabbitMQAdapter {
config: &FunctionQueueConfig,
) -> anyhow::Result<()> {
self.topology
.setup_function_queue(queue_name, config.backoff_ms)
.setup_function_queue(queue_name, config.backoff_ms, config.max_priority)
.await
.map_err(|e| anyhow::anyhow!("Failed to setup function queue topology: {}", e))
}
Expand Down Expand Up @@ -1102,6 +1126,10 @@ impl QueueAdapter for RabbitMQAdapter {
if let Some(mid) = delivery.properties.message_id() {
properties = properties.with_message_id(mid.clone());
}
// Preserve priority through retries so priority queues keep ordering
if let Some(p) = delivery.properties.priority() {
properties = properties.with_priority(*p);
}

self.channel
.basic_publish(
Expand Down
6 changes: 5 additions & 1 deletion engine/src/workers/queue/adapters/rabbitmq/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,15 @@ impl Publisher {
) -> Result<()> {
let payload = serde_json::to_vec(job)?;

let properties = lapin::BasicProperties::default()
let mut properties = lapin::BasicProperties::default()
.with_content_type("application/json".into())
.with_delivery_mode(2)
.with_headers(headers.unwrap_or_default());

if let Some(p) = job.priority {
properties = properties.with_priority(p);
}

self.channel
.basic_publish(
exchange,
Expand Down
23 changes: 20 additions & 3 deletions engine/src/workers/queue/adapters/rabbitmq/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,20 @@ impl TopologyManager {
Ok(())
}

pub async fn setup_subscriber_queue(&self, topic: &str, function_id: &str) -> Result<()> {
pub async fn setup_subscriber_queue(
&self,
topic: &str,
function_id: &str,
max_priority: Option<u8>,
) -> Result<()> {
let names = RabbitNames::new(topic);

let queue_name = names.function_queue(function_id);
let dlq_name = names.function_dlq(function_id);
let mut subscriber_queue_args = FieldTable::default();
if let Some(p) = max_priority {
subscriber_queue_args.insert("x-max-priority".into(), AMQPValue::ShortShortUInt(p));
}

self.channel
.queue_declare(
Expand All @@ -91,7 +100,7 @@ impl TopologyManager {
durable: true,
..Default::default()
},
FieldTable::default(),
subscriber_queue_args,
)
.await?;

Expand All @@ -114,7 +123,12 @@ impl TopologyManager {
Ok(())
}

pub async fn setup_function_queue(&self, queue_name: &str, backoff_ms: u64) -> Result<()> {
pub async fn setup_function_queue(
&self,
queue_name: &str,
backoff_ms: u64,
max_priority: Option<u8>,
) -> Result<()> {
let names = FnQueueNames::new(queue_name);

// Main exchange + queue with DLX to retry
Expand All @@ -138,6 +152,9 @@ impl TopologyManager {
"x-dead-letter-exchange".into(),
AMQPValue::LongString(names.dlq_exchange().into()),
);
if let Some(p) = max_priority {
main_queue_args.insert("x-max-priority".into(), AMQPValue::ShortShortUInt(p));
}

self.channel
.queue_declare(
Expand Down
3 changes: 3 additions & 0 deletions engine/src/workers/queue/adapters/rabbitmq/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct Job {
pub traceparent: Option<String>,
#[serde(default)]
pub baggage: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub priority: Option<u8>,
}

impl Job {
Expand All @@ -46,6 +48,7 @@ impl Job {
.as_millis() as u64,
traceparent,
baggage,
priority: None,
}
}

Expand Down
10 changes: 10 additions & 0 deletions engine/src/workers/queue/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ pub struct FunctionQueueConfig {
/// Defaults to 100.
#[serde(default = "default_poll_interval_ms")]
pub poll_interval_ms: u64,

/// Maximum message priority for this queue, turning it into a RabbitMQ
/// priority queue (`x-max-priority`). When set (1–255; ≤10 recommended),
/// each message carries a `priority` taken from its `_priority` field and
/// higher-priority messages are delivered first. Omit for a non-priority
/// queue. Cannot be changed after the queue is first declared.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 1, max = 255))]
pub max_priority: Option<u8>,
}

impl Default for FunctionQueueConfig {
Expand All @@ -96,6 +105,7 @@ impl Default for FunctionQueueConfig {
message_group_field: None,
backoff_ms: default_backoff_ms(),
poll_interval_ms: default_poll_interval_ms(),
max_priority: None,
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions engine/src/workers/queue/subscriber_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct SubscriberQueueConfig {
pub delay_seconds: Option<u64>,
pub backoff_type: Option<String>,
pub backoff_delay_ms: Option<u64>,
pub max_priority: Option<u8>,
}

impl SubscriberQueueConfig {
Expand Down Expand Up @@ -93,6 +94,14 @@ impl SubscriberQueueConfig {
as_u64,
|v| v
);
extract_field!(
config,
"maxPriority",
subscriber_config.max_priority,
has_any_value,
as_u64,
|v| v as u8
);

if has_any_value {
Some(subscriber_config)
Expand Down
Loading