Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cumulus/polkadot-omni-node/lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,17 @@ pub struct Cli<Config: CliConfig> {
#[arg(long, default_value_t = sc_statement_store::DEFAULT_PURGE_AFTER_SEC)]
pub statement_store_purge_after_sec: u64,

/// Affinity topic advertised by this node. Repeatable; each value is a 32-byte hex

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Affinity topic advertised by this node. Repeatable; each value is a 32-byte hex
/// Affinity topic of this node. Repeatable; each value is a 32-byte hex
///
/// Make the node responsible for finding and storing all statements with topics in this list.

/// hash.
///
/// Only relevant when `--enable-statement-store` is used.
#[arg(
long = "statement-affinity-topic",
value_name = "TOPIC",
num_args = 1..,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the combination of 1.. here and Vec<_> (instead of Option<Vec<_>>) allow not specifying the argument at all? 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary here, removed. IIUC we receive an empty vec if nothing was provided

)]
pub statement_affinity_topics: Vec<sc_statement_store::Topic>,

/// HOP (Hand-Off Protocol) configuration parameters.
#[command(flatten)]
pub hop: sc_hop::HopParams,
Expand Down Expand Up @@ -317,6 +328,7 @@ impl<Config: CliConfig> Cli<Config> {
purge_after_sec: self.statement_store_purge_after_sec,
network_workers: self.statement_network_workers,
rate_limit: self.statement_rate_limit,
affinity_topics: self.statement_affinity_topics.clone(),
},
),
storage_monitor: self.storage_monitor.clone(),
Expand Down
19 changes: 10 additions & 9 deletions cumulus/polkadot-omni-node/lib/src/common/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,16 @@ pub(crate) trait NodeSpec: BaseNodeSpec {
parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
);

let statement_handler_proto = node_extra_args.statement_store_config.map(|config| {
let proto = new_statement_handler_proto(
&*client,
&parachain_config,
&metrics,
&mut net_config,
);
(proto, config)
});
let statement_handler_proto =
node_extra_args.statement_store_config.clone().map(|config| {
let proto = new_statement_handler_proto(
&*client,
&parachain_config,
&metrics,
&mut net_config,
);
(proto, config)
});

let (network, system_rpc_tx, tx_handler_controller, sync_service) =
build_network(BuildNetworkParams {
Expand Down
9 changes: 7 additions & 2 deletions cumulus/polkadot-omni-node/lib/src/common/statement_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub(crate) fn build_statement_store<
statement_handler_proto: sc_network_statement::StatementHandlerPrototype,
config: sc_statement_store::Config,
) -> sc_service::error::Result<Arc<Store>> {
let network_workers = config.network_workers;
let rate_limit = config.rate_limit;
let affinity_topics = config.affinity_topics.clone();

let statement_store = sc_statement_store::Store::new_shared(
&parachain_config.data_path,
config,
Expand All @@ -86,8 +90,9 @@ pub(crate) fn build_statement_store<
statement_store.clone(),
parachain_config.prometheus_registry(),
statement_protocol_executor,
config.network_workers,
config.rate_limit,
network_workers,
rate_limit,
&affinity_topics,
)?;
task_manager.spawn_handle().spawn(
"network-statement-handler",
Expand Down
2 changes: 1 addition & 1 deletion cumulus/polkadot-omni-node/lib/src/nodes/aura.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ where
&metrics,
&mut net_config,
);
(proto, *ss_config)
(proto, ss_config.clone())
});

let (network, system_rpc_tx, tx_handler_controller, sync_service) =
Expand Down
34 changes: 34 additions & 0 deletions substrate/bin/node/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ pub struct Cli {
#[arg(long, default_value_t = sc_statement_store::DEFAULT_PURGE_AFTER_SEC)]
pub statement_store_purge_after_sec: u64,

/// Affinity topic advertised by this node. Repeatable; each value is a 32-byte hex
/// hash.
///
/// Only relevant when `--enable-statement-store` is used.
#[arg(
long = "statement-affinity-topic",
value_name = "TOPIC",
num_args = 1..,
)]
pub statement_affinity_topics: Vec<sc_statement_store::Topic>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes, me wonder if at least for doing proper E2E testing we should have a test RPC to be able to manipulate this somehow at runtime.


#[allow(missing_docs)]
#[clap(flatten)]
pub storage_monitor: sc_storage_monitor::StorageMonitorParams,
Expand Down Expand Up @@ -148,3 +159,26 @@ pub enum Subcommand {
/// Db meta columns information.
ChainInfo(sc_cli::ChainInfoCmd),
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn statement_affinity_topics_accumulate_repeated_flags() {
use clap::Parser;
let topic_a = "11".repeat(32);
let topic_b = "22".repeat(32);
let cli = Cli::parse_from([
"substrate-node",
"--statement-affinity-topic",
&format!("0x{topic_a}"),
"--statement-affinity-topic",
&format!("0x{topic_b}"),
]);
assert_eq!(
cli.statement_affinity_topics,
vec![sc_statement_store::Topic([0x11; 32]), sc_statement_store::Topic([0x22; 32])]
);
}
}
10 changes: 8 additions & 2 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,10 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
})
.flatten();

let statement_network_workers = statement_store_config.network_workers;
let statement_rate_limit = statement_store_config.rate_limit;
let statement_affinity_topics = statement_store_config.affinity_topics.clone();

let sc_service::PartialComponents {
client,
backend,
Expand Down Expand Up @@ -800,8 +804,9 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
statement_store.clone(),
prometheus_registry.as_ref(),
statement_protocol_executor,
statement_store_config.network_workers,
statement_store_config.rate_limit,
statement_network_workers,
statement_rate_limit,
&statement_affinity_topics,
)?;
task_manager.spawn_handle().spawn(
"network-statement-handler",
Expand Down Expand Up @@ -853,6 +858,7 @@ pub fn new_full(config: Configuration, cli: Cli) -> Result<TaskManager, ServiceE
purge_after_sec: cli.statement_store_purge_after_sec,
network_workers: cli.statement_network_workers,
rate_limit: cli.statement_rate_limit,
affinity_topics: cli.statement_affinity_topics.clone(),
};

let task_manager = match config.network.network_backend {
Expand Down
6 changes: 5 additions & 1 deletion substrate/client/network/statement/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ Applies to all prose for humans — comments, doc comments, commit messages, PR
chat replies. Follow Strunk's *Elements of Style*:

- **Omit needless words.** Cut anything that does not change the meaning. Shorter is the default.
- **Use the active voice.** "The node advertises the filter," not "the filter is advertised."
- **Prefer the active voice.** "The node advertises the filter," not "the filter is advertised."
Not absolute: a noun-phrase label may take a participle ("topics advertised by this node") when
it reads tighter than the active clause ("topics this node advertises").
- **State it positively.** Say what is, not what is not. "Empty sets match nothing," not "does not
match anything."
- **Be concrete and specific.** Name the function, the field, the bound. Avoid "handle", "process",
Expand All @@ -76,4 +78,6 @@ chat replies. Follow Strunk's *Elements of Style*:
- **Keep related words together** so the sentence reads in one pass.
- **One topic per paragraph**, opening with its point.

These rules serve clarity; when two of them conflict, clarity wins.

When you write or edit prose, apply these rules before you finish — do not leave a first draft.
23 changes: 12 additions & 11 deletions substrate/client/network/statement/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use sc_network_sync::{SyncEvent, SyncEventStream};
use sc_network_types::PeerId;
use sp_runtime::traits::Block as BlockT;
use sp_statement_store::{
FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult,
FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult, Topic,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -387,6 +387,7 @@ impl StatementHandlerPrototype {
executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
mut num_submission_workers: usize,
statements_per_second: u32,
configured_topics: &[Topic],
) -> error::Result<StatementHandler<N, S>> {
let sync_event_stream = sync.event_stream("statement-handler-sync");
let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
Expand Down Expand Up @@ -441,7 +442,7 @@ impl StatementHandlerPrototype {
);
}

let v2dht = V2DhtOrchestrator::new();
let v2dht = V2DhtOrchestrator::new(configured_topics);

let handler = StatementHandler {
protocol_name: self.protocol_name,
Expand Down Expand Up @@ -717,7 +718,7 @@ where
queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
statements_per_second: NonZeroU32,
) -> Self {
let v2dht = V2DhtOrchestrator::new();
let v2dht = V2DhtOrchestrator::new(&[]);
Self {
protocol_name,
notification_service,
Expand Down Expand Up @@ -2029,7 +2030,7 @@ mod tests {
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
v2dht: V2DhtOrchestrator::new(),
v2dht: V2DhtOrchestrator::new(&[]),
};
(handler, statement_store, network, notification_service, queue_receiver, peer_ids)
}
Expand Down Expand Up @@ -2265,7 +2266,7 @@ mod tests {
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
v2dht: V2DhtOrchestrator::new(),
v2dht: V2DhtOrchestrator::new(&[]),
};
(handler, statement_store, network, notification_service)
}
Expand Down Expand Up @@ -2309,7 +2310,7 @@ mod tests {
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
v2dht: V2DhtOrchestrator::new(),
v2dht: V2DhtOrchestrator::new(&[]),
};
(handler, statement_store, network, notification_service)
}
Expand Down Expand Up @@ -3727,7 +3728,7 @@ mod tests {
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
v2dht: V2DhtOrchestrator::new(),
v2dht: V2DhtOrchestrator::new(&[]),
};

// Add a statement so there's something to sync.
Expand Down Expand Up @@ -4083,7 +4084,7 @@ mod tests {
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(pending().fuse()),
v2dht: V2DhtOrchestrator::new(),
v2dht: V2DhtOrchestrator::new(&[]),
};

let peer1 = PeerId::random();
Expand Down Expand Up @@ -4149,7 +4150,7 @@ mod tests {
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(pending().fuse()),
v2dht: V2DhtOrchestrator::new(),
v2dht: V2DhtOrchestrator::new(&[]),
};

flag.store(false, std::sync::atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -4227,7 +4228,7 @@ mod tests {
dropped_statements_during_sync: true,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
v2dht: V2DhtOrchestrator::new(),
v2dht: V2DhtOrchestrator::new(&[]),
};

handler.start_sync_recovery();
Expand Down Expand Up @@ -4328,7 +4329,7 @@ mod tests {
dropped_statements_during_sync: dropped,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(pending().fuse()),
v2dht: V2DhtOrchestrator::new(),
v2dht: V2DhtOrchestrator::new(&[]),
}
};

Expand Down
24 changes: 17 additions & 7 deletions substrate/client/network/statement/src/v2dht/explicit_affinity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
//! drops. Expose `topics()` to read the current set. Source-keying can't be retrofitted cheaply,
//! so it comes first; the enum is the extension point for future sources; `topics()` also feeds
//! the peers-topology module (#11933).
//! 4. [ ] **Configured source.** Read topics from CLI at construction and `add_topics(Configured,
//! 4. [x] **Configured source.** Read topics from CLI at construction and `add_topics(Configured,
//! …)`. A static, one-time input; validated through `topics()` now, with advertising following
//! at step 6. First cross-crate step; needs step 3. Closes "CLI and configuration inputs."
//! (Optional: take the advertised-filter seed from config to match the light client —
Expand Down Expand Up @@ -106,8 +106,10 @@ pub(crate) struct ExplicitAffinity {

#[allow(dead_code)]
impl ExplicitAffinity {
pub(crate) fn new() -> Self {
Self { seed: rand::random(), local: HashMap::new() }
pub(crate) fn new(configured_topics: &[Topic]) -> Self {
let mut this = Self { seed: rand::random(), local: HashMap::new() };
this.add_topics(AffinitySource::Configured, configured_topics);
this
}

// === Local topics ===
Expand Down Expand Up @@ -191,9 +193,17 @@ mod tests {
affinity.topics().into_iter().collect()
}

#[test]
fn configured_topics_enter_the_set_at_construction() {
let affinity = ExplicitAffinity::new(&[topic(1), topic(2)]);
assert_eq!(topic_set(&affinity), HashSet::from([topic(1), topic(2)]));

assert!(ExplicitAffinity::new(&[]).topics().is_empty());
}

#[test]
fn topic_survives_until_last_source_drops() {
let mut affinity = ExplicitAffinity::new();
let mut affinity = ExplicitAffinity::new(&[]);
affinity.add_topics(AffinitySource::Configured, &[topic(1)]);
affinity.add_topics(AffinitySource::RpcSubscription, &[topic(1)]);

Expand All @@ -210,7 +220,7 @@ mod tests {

#[test]
fn topic_survives_until_last_holder_of_one_source_drops() {
let mut affinity = ExplicitAffinity::new();
let mut affinity = ExplicitAffinity::new(&[]);
// Two subscriptions on the same topic each hold a reference.
affinity.add_topics(AffinitySource::RpcSubscription, &[topic(1)]);
affinity.add_topics(AffinitySource::RpcSubscription, &[topic(1)]);
Expand All @@ -224,7 +234,7 @@ mod tests {

#[test]
fn remove_absent_is_noop() {
let mut affinity = ExplicitAffinity::new();
let mut affinity = ExplicitAffinity::new(&[]);
affinity.add_topics(AffinitySource::Configured, &[topic(1)]);

// Unheld topic and unheld source both leave the set untouched, without underflow.
Expand All @@ -235,7 +245,7 @@ mod tests {

#[test]
fn topics_lists_each_live_topic_once() {
let mut affinity = ExplicitAffinity::new();
let mut affinity = ExplicitAffinity::new(&[]);
affinity.add_topics(AffinitySource::Configured, &[topic(1), topic(2)]);
affinity.add_topics(AffinitySource::RpcSubscription, &[topic(2), topic(3)]);

Expand Down
6 changes: 3 additions & 3 deletions substrate/client/network/statement/src/v2dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod explicit_affinity;
use crate::{affinity::AffinityFilter, LOG_TARGET};
use explicit_affinity::ExplicitAffinity;
use sc_network_types::PeerId;
use sp_statement_store::SubmitResult;
use sp_statement_store::{SubmitResult, Topic};

/// Coordinates the v2 DHT-affinity statement gossip path.
#[allow(dead_code)]
Expand All @@ -34,8 +34,8 @@ pub(crate) struct V2DhtOrchestrator {

#[allow(dead_code)]
impl V2DhtOrchestrator {
pub(crate) fn new() -> Self {
Self { explicit_affinity: ExplicitAffinity::new() }
pub(crate) fn new(configured_topics: &[Topic]) -> Self {
Self { explicit_affinity: ExplicitAffinity::new(configured_topics) }
}

// === Peer-set events ===
Expand Down
Loading
Loading