Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 172 additions & 7 deletions substrate/client/network/statement/src/v2dht/explicit_affinity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,75 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Explicit topic affinity for the v2 DHT gossip path.
//!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hunk is for AI, not humans.

//! Implements [#11934](https://github.com/paritytech/polkadot-sdk/issues/11934): the module that
//! tracks which topics this node cares about, advertises a filter for them, and decides storage
//! and forwarding from that affinity.
//!
//! # Implementation plan
//!
//! Each step is gated behind `v2dht_enabled`, self-contained, and unit-tested; a single PR may
//! bundle several. After the data model (step 3), the real topic sources come next so production
//! topics flow into the set; the local readers that advertise and query them, the storage
//! obligation, and the peer side follow.
//!
//! 1. [x] **Bloom constructors.** Build the node-side [`AffinityFilter`] from a topic list,
//! mirroring smoldot ([#12276]). Everything below advertises and queries through this type.
//! 2. [x] **Stub API.** Scaffold the method surface behind the v2dht gate so the orchestrator can
//! own the module while the bodies stay no-ops ([#12278]).
//! 3. [x] **Source-aware topic set.** Replace the stub state with a per-topic, per-source refcount
//! map and an `AffinitySource` enum (configured, RPC subscription, …). `add_topics(source, …)`
//! and `remove_topics(source, …)` adjust the counts; a topic stays present until its last source
//! drops. Expose `topics()` to read the current set. Source-keying can't be retrofitted cheaply,
//! so it comes first; the enum is the extension point for future sources; `topics()` also feeds
//! the peers-topology module (#11933).
//! 4. [ ] **Configured source.** Read topics from CLI at construction and `add_topics(Configured,
//! …)`. A static, one-time input; validated through `topics()` now, with advertising following
//! at step 6. First cross-crate step; needs step 3. Closes "CLI and configuration inputs."
//! (Optional: take the advertised-filter seed from config to match the light client —
//! correctness already holds, since the seed travels on the wire.)
//! 5. [ ] **RPC-subscription source.** Plumb the statement RPC layer so opening a subscription
//! calls `add_topics(RpcSubscription, …)` and dropping it calls `remove_topics`. Dynamic
//! add/remove over the subscription lifecycle; the caller must balance each add with one remove
//! (see `remove_topics`). Needs step 3. Closes "track affinity from active subscriptions."
//! 6. [ ] **Local queries.** Over the fed topic set, implement `local_filter()` (advertised
//! [`AffinityFilter`] built from the topics) and `local_has_explicit_affinity(stmt)` (does any
//! of the statement's topics sit in the set). Configured and subscribed topics start being
//! advertised here. Tests: filter contents and membership.
//! 7. [ ] **Storage obligation.** Add a query that derives this node's store decision for a
//! statement from the sources whose topics it matches — a configured topic obliges storage
//! differently than a transient subscription. Consumed by store-limitations (#11936), which
//! firms up the return shape. Tests: obligation per source mix.
//! 8. [ ] **Peer filters.** `update_peer_filter`/`on_peer_disconnected` maintain a `HashMap<PeerId,
//! AffinityFilter>`; `peer_has_explicit_affinity(peer, stmt)` reads it for the forward decision.
//! Independent of the local side. The overlapping `Peer::topic_affinity` in `lib.rs` stays until
//! the orchestrator cutover (#11937) — v1 still reads it. Tests: store, query, drop on
//! disconnect.
//!
//! After step 8 the module is complete; the orchestrator (#11937) wires
//! `local_has_explicit_affinity` into the store decision and `peer_has_explicit_affinity` into the
//! forward decision, and retires `Peer::topic_affinity`.
//!
//! [#12276]: https://github.com/paritytech/polkadot-sdk/pull/12276
//! [#12278]: https://github.com/paritytech/polkadot-sdk/pull/12278

use crate::{affinity::AffinityFilter, LOG_TARGET};
use sc_network_types::PeerId;
use sp_statement_store::{Statement, Topic};
use std::collections::HashMap;

/// The source of this node's affinity for a topic.
///
/// Each variant names a category of holder, not a single holder: a per-source reference count
/// tracks how many holders of that category want the topic.
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(crate) enum AffinitySource {

@P1sar P1sar Jun 9, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there is no DHT based affinity?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, it's not a responsibility of this module. I beleive we can't add topic based on DHT affinity as we don't determine the topics for that, we compute the distance.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well yes, but then would be still nice to have a list of topics that we store in general for future peers steering, and the fastest way is basically add every topic that we observe and DHT affinate to to some list as well. so why it can't be this list?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issues to think about it while doing peer stearing

/// Topic configured via CLI or the config file.
Configured,
/// Topic backing an active RPC subscription.
RpcSubscription,
}

/// Tracks explicit topic affinity: the local node's own topics and the filters peers advertise.
///
Expand All @@ -34,24 +99,47 @@ pub(crate) struct ExplicitAffinity {
/// suffices.
// TODO: source it from the protocol config (as the light client does) once that is plumbed.
seed: u128,
/// Local topics, each mapped to its per-source reference counts. A topic stays in the map only
/// while some source references it.
local: HashMap<Topic, HashMap<AffinitySource, u32>>,
}

#[allow(dead_code)]
impl ExplicitAffinity {
pub(crate) fn new() -> Self {
Self { seed: rand::random() }
Self { seed: rand::random(), local: HashMap::new() }
}

// === Local topics ===

pub(crate) fn add_topics(&mut self, topics: &[Topic]) {
// TODO: track the local topic set; fed by RPC subscriptions and configured topics.
log::trace!(target: LOG_TARGET, "explicit_affinity: add_topics {} (stub)", topics.len());
/// Add one of `source`'s references to each topic.
pub(crate) fn add_topics(&mut self, source: AffinitySource, topics: &[Topic]) {
log::trace!(target: LOG_TARGET, "explicit_affinity: add_topics {} from {source:?}", topics.len());
for &topic in topics {
*self.local.entry(topic).or_default().entry(source).or_insert(0) += 1;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe follow the same as in func remove_topics?

let count = self.local.entry(topic).or_default().entry(source).or_insert(0);
*count = count.saturating_add(1);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

}
}

pub(crate) fn remove_topics(&mut self, topics: &[Topic]) {
// TODO: track the local topic set; fed by RPC subscriptions and configured topics.
log::trace!(target: LOG_TARGET, "explicit_affinity: remove_topics {} (stub)", topics.len());
/// Drop one of `source`'s references to each topic. A topic stays until its last source drops.
pub(crate) fn remove_topics(&mut self, source: AffinitySource, topics: &[Topic]) {
log::trace!(target: LOG_TARGET, "explicit_affinity: remove_topics {} from {source:?}", topics.len());
for topic in topics {
let Some(sources) = self.local.get_mut(topic) else { continue };
if let Some(count) = sources.get_mut(&source) {
*count = count.saturating_sub(1);
if *count == 0 {
sources.remove(&source);
}
}
if sources.is_empty() {
self.local.remove(topic);
}
}
}

/// The topics this node currently has affinity for
pub(crate) fn topics(&self) -> Vec<Topic> {
self.local.keys().copied().collect()
}

// === Advertise ===
Expand Down Expand Up @@ -88,3 +176,80 @@ impl ExplicitAffinity {
false
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;

fn topic(n: u8) -> Topic {
Topic([n; 32])
}

fn topic_set(affinity: &ExplicitAffinity) -> HashSet<Topic> {
affinity.topics().into_iter().collect()
}

#[test]
fn add_then_remove_same_source() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks redundant with the final remove/assert steps in the two refcount tests below. Could we drop it and keep only the tests that cover distinct invariants

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep

let mut affinity = ExplicitAffinity::new();
affinity.add_topics(AffinitySource::Configured, &[topic(1)]);
assert_eq!(topic_set(&affinity), HashSet::from([topic(1)]));

affinity.remove_topics(AffinitySource::Configured, &[topic(1)]);
assert!(affinity.topics().is_empty());
}

#[test]
fn topic_survives_until_last_source_drops() {
let mut affinity = ExplicitAffinity::new();
affinity.add_topics(AffinitySource::Configured, &[topic(1)]);
affinity.add_topics(AffinitySource::RpcSubscription, &[topic(1)]);

affinity.remove_topics(AffinitySource::Configured, &[topic(1)]);
assert_eq!(
topic_set(&affinity),
HashSet::from([topic(1)]),
"still held by the subscription"
);

affinity.remove_topics(AffinitySource::RpcSubscription, &[topic(1)]);
assert!(affinity.topics().is_empty(), "last source dropped");
}

#[test]
fn topic_survives_until_last_holder_of_one_source_drops() {
let mut affinity = ExplicitAffinity::new();
// Two subscriptions on the same topic each hold a reference.
affinity.add_topics(AffinitySource::RpcSubscription, &[topic(1)]);
affinity.add_topics(AffinitySource::RpcSubscription, &[topic(1)]);

affinity.remove_topics(AffinitySource::RpcSubscription, &[topic(1)]);
assert_eq!(topic_set(&affinity), HashSet::from([topic(1)]), "one subscription remains");

affinity.remove_topics(AffinitySource::RpcSubscription, &[topic(1)]);
assert!(affinity.topics().is_empty(), "both subscriptions gone");
}

#[test]
fn remove_absent_is_noop() {
let mut affinity = ExplicitAffinity::new();
affinity.add_topics(AffinitySource::Configured, &[topic(1)]);

// Unheld topic and unheld source both leave the set untouched, without underflow.
affinity.remove_topics(AffinitySource::Configured, &[topic(2)]);
affinity.remove_topics(AffinitySource::RpcSubscription, &[topic(1)]);
assert_eq!(topic_set(&affinity), HashSet::from([topic(1)]));
}

#[test]
fn topics_lists_each_live_topic_once() {
let mut affinity = ExplicitAffinity::new();
affinity.add_topics(AffinitySource::Configured, &[topic(1), topic(2)]);
affinity.add_topics(AffinitySource::RpcSubscription, &[topic(2), topic(3)]);

let topics = affinity.topics();
assert_eq!(topics.len(), 3, "no duplicates despite topic(2) held twice");
assert_eq!(topic_set(&affinity), HashSet::from([topic(1), topic(2), topic(3)]));
}
}
Loading