Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions engine/src/workers/queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,11 @@ impl QueueWorker {
let mut all_topics = topics;
for (name, config) in &self._config.queue_configs {
let namespaced = format!("__fn_queue::{}", name);
if let Some(existing) = all_topics
.iter_mut()
.find(|t| t.name == namespaced || t.name == *name)
{
if let Some(existing) = all_topics.iter_mut().find(|t| {
t.name == namespaced
|| (t.name == *name && t.broker_type == "function_queue")
}) {
existing.broker_type = "function_queue".to_string();
existing.subscriber_count = config.concurrency as u64;
} else {
all_topics.push(TopicInfo {
Expand All @@ -373,6 +374,7 @@ impl QueueWorker {
for topic in &mut all_topics {
if let Some(stripped) = topic.name.strip_prefix("__fn_queue::") {
topic.name = stripped.to_string();
topic.broker_type = "function_queue".to_string();
}
}
FunctionResult::Success(Some(
Expand Down Expand Up @@ -2177,11 +2179,16 @@ mod tests {
// "default" should have concurrency from config (10 is default)
let default_topic = topics.iter().find(|t| t["name"] == "default");
assert!(default_topic.is_some(), "default topic should exist");
let sub_count = default_topic.unwrap()["subscriber_count"].as_u64().unwrap();
let default_topic = default_topic.unwrap();
let sub_count = default_topic["subscriber_count"].as_u64().unwrap();
assert!(
sub_count > 0,
"subscriber_count should be overridden from config"
);
assert_eq!(
default_topic["broker_type"], "function_queue",
"function queue topics should be displayed with the function_queue broker type"
);
}
_ => panic!("Expected Success"),
}
Expand Down Expand Up @@ -2214,6 +2221,45 @@ mod tests {
}
}

#[tokio::test]
async fn console_list_topics_keeps_regular_topics_with_function_queue_name() {
let (_engine, module, adapter) = setup_queue_module_with_configs();
*adapter.list_topics_result.lock().await = vec![TopicInfo {
name: "default".to_string(),
broker_type: "builtin".to_string(),
subscriber_count: 2,
}];

let result = module.console_list_topics(json!({})).await;
match result {
FunctionResult::Success(Some(val)) => {
let topics: Vec<Value> = serde_json::from_value(val).unwrap();
let default_topics: Vec<&Value> =
topics.iter().filter(|t| t["name"] == "default").collect();

assert_eq!(
default_topics.len(),
2,
"regular topics and function queues with the same visible name should both be listed"
);
assert!(
default_topics.iter().any(|t| {
t["broker_type"] == "builtin" && t["subscriber_count"] == json!(2)
}),
"regular topic entry should keep its original broker type and subscriber count"
);
assert!(
default_topics.iter().any(|t| {
t["broker_type"] == "function_queue"
&& t["subscriber_count"].as_u64().unwrap_or_default() > 0
}),
"configured function queue should remain visible as a separate entry"
);
}
_ => panic!("Expected Success"),
}
}

#[tokio::test]
async fn console_list_topics_adapter_error() {
let (_engine, module, adapter) = setup_queue_module();
Expand Down
Loading