Skip to content

AllComplete fails when two same-keyExpr queryables on a single router have mixed complete flags #2614

@leon-somana

Description

@leon-somana

Describe the bug

When a single router (or any session) declares two queryables on the exact same key_expr — one with complete=true and one with complete=false — a client query with QueryTarget::AllComplete against a matching key receives no replies at all. Neither queryable's callback is invoked. The query times out client-side after ResponseFinal arrives with an empty result set.

Other combinations on the same key_expr work as expected:

Config (one router, one client session) target=AllComplete target=All
1 queryable, complete=true
2 queryables, both complete=true, same key_expr ✓ both reply ✓ both reply
2 queryables, both complete=false, same key_expr n/a (no complete) ✓ both reply
2 queryables, complete=true + complete=false, same key_expr ✗ neither replies ✓ both reply
2 queryables, complete=true + complete=false, different key_exprs

The trigger is specifically the combination of same key_expr + mixed complete flags + target=AllComplete. Either separating the key_exprs or using target=All works around it.

To reproduce

Rust binary to run with cargo run --release:

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Mutex;
use zenoh::query::{ConsolidationMode, QueryTarget};
use zenoh::sample::Sample;
use zenoh::Config;

type AnyErr = Box<dyn std::error::Error + Send + Sync>;
type Store = Arc<Mutex<HashMap<String, Sample>>>;

async fn attach(sess: &zenoh::Session, name: &'static str, key_expr: &'static str, complete: bool) -> Result<(), AnyErr> {
    let store: Store = Arc::new(Mutex::new(HashMap::new()));
    let sub_store = store.clone();
    let _sub = sess.declare_subscriber(key_expr).callback(move |sample| {
        let s = sub_store.clone();
        tokio::spawn(async move {
            eprintln!("  [{}] sub got put for {}", name, sample.key_expr());
            s.lock().await.insert(sample.key_expr().to_string(), sample);
        });
    }).await?;
    std::mem::forget(_sub);
    let q_store = store.clone();
    let _qabl = sess.declare_queryable(key_expr).complete(complete).callback(move |query| {
        let s = q_store.clone();
        tokio::spawn(async move {
            let ke = query.key_expr().clone();
            if let Some(sample) = s.lock().await.get(&ke.to_string()) {
                eprintln!("  [{}] qabl replying for {}", name, ke);
                let _ = query.reply(ke.clone(), sample.payload().clone()).timestamp(*sample.timestamp().unwrap()).await;
            } else {
                eprintln!("  [{}] qabl: {} not present", name, ke);
            }
        });
    }).await?;
    std::mem::forget(_qabl);
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), AnyErr> {
    let mut router_cfg = Config::default();
    router_cfg.insert_json5("mode", r#""router""#).unwrap();
    router_cfg.insert_json5("listen/endpoints", r#"["tcp/127.0.0.1:17447"]"#).unwrap();
    let router = zenoh::open(router_cfg).await?;

    // Two queryables on the SAME key_expr, mixed `complete`.
    attach(&router, "primary", "test/state/**", true).await?;
    attach(&router, "backup",  "test/state/**", false).await?;

    let mut client_cfg = Config::default();
    client_cfg.insert_json5("mode", r#""client""#).unwrap();
    client_cfg.insert_json5("connect/endpoints", r#"["tcp/127.0.0.1:17447"]"#).unwrap();
    let client = zenoh::open(client_cfg).await?;
    tokio::time::sleep(std::time::Duration::from_millis(500)).await;

    let key = "test/state/probe-key";
    client.put(key, "hello".as_bytes()).await?;

    let start = Instant::now();
    let replies = client.get(key)
        .target(QueryTarget::AllComplete)
        .consolidation(ConsolidationMode::Monotonic)
        .await?;
    let mut n = 0usize;
    while let Ok(r) = replies.recv_async().await {
        if r.result().is_ok() { n += 1; }
    }
    eprintln!("AllComplete: {} reply(ies) in {}ms", n, start.elapsed().as_millis());
    assert!(n > 0, "AllComplete returned no replies — bug reproduced");
    Ok(())
}

System info

zenoh 1.9.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions