Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions prdoc/pr_12268.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: 'remote-ext: fix silent state corruption from lagging RPC providers'
doc:
- audience: Node Dev
description: "Fixes #12264 by treating per-item RPC errors as batch failures instead\
\ of empty values, verifying the computed storage root against the block header\
\ before caching a snapshot, and excluding RPC providers that lack the target\
\ block up front.\r\n"
crates:
- name: frame-remote-externalities
bump: patch
26 changes: 26 additions & 0 deletions substrate/utils/frame/remote-externalities/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,32 @@ impl ConnectionManager {
self.clients.len()
}

/// Drop the clients for which `is_available` returns `false`.
///
/// Used to exclude providers that lack the target block. Returns the number of clients removed,
/// or an error if no client would remain.
pub(crate) async fn retain_available<F, Fut>(&mut self, mut is_available: F) -> Result<usize>
where
F: FnMut(Client) -> Fut,
Fut: Future<Output = bool>,
{
let mut kept = Vec::with_capacity(self.clients.len());
for client_arc in self.clients.iter() {
let client = client_arc.lock().await.clone();
if is_available(client).await {
kept.push(client_arc.clone());
}
}

if kept.is_empty() {
return Err("No RPC provider has the target block");
}

let removed = self.clients.len() - kept.len();
self.clients = kept;
Ok(removed)
}

/// Get a usable client for a specific worker.
/// Distributes workers across available clients.
pub(crate) async fn get(&self, worker_index: usize) -> Client {
Expand Down
70 changes: 65 additions & 5 deletions substrate/utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use sp_core::{
},
};
use sp_runtime::{
traits::{Block as BlockT, HashingFor},
traits::{Block as BlockT, HashingFor, Header as HeaderT},
StateVersion,
};
use sp_state_machine::TestExternalities;
Expand Down Expand Up @@ -146,6 +146,15 @@ impl<B: BlockT> Builder<B> {
fn conn_manager(&self) -> Result<&ConnectionManager> {
self.conn_manager.as_ref().ok_or("connection manager must be initialized; qed")
}

/// Whether the configured scrape covers the entire top trie.
///
/// Only a complete scrape yields a storage root that matches the block header's state root.
/// This is signalled by the empty prefix being queued for download (see
/// `init_remote_client`); partial scrapes (specific pallets/keys) never match.
fn is_complete_scrape(&self) -> bool {
self.as_online().hashed_prefixes.iter().any(|p| p.is_empty())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this also work for child tries?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. With child_trie on, a complete scrape pulls every child trie (load_child_remote) and inserts it via insert_child. into_raw_snapshot recomputes each child root and folds it into the top trie, so the computed_root we compare to the header's state_root already covers child data. A stale child trie changes the top root and fails the check.

}
}

// RPC methods
Expand Down Expand Up @@ -243,6 +252,36 @@ where
.map_err(|_| "rpc finalized_head failed on all clients")
}

/// Drop any RPC provider that does not have block `at`.
async fn retain_providers_with_block(&mut self, at: B::Hash) -> Result<()> {
let conn_manager = self
.conn_manager
.as_mut()
.ok_or("connection manager must be initialized; qed")?;
let removed = conn_manager
.retain_available(move |client| async move {
matches!(
with_timeout(
ChainApi::<(), _, B::Header, ()>::header(
client.ws_client.as_ref(),
Some(at)
),
RPC_TIMEOUT,
)
.await,
Ok(Ok(Some(_)))
)
})
.await?;
if removed > 0 {
warn!(
target: LOG_TARGET,
"⚠️ excluded {removed} RPC provider(s) that do not have the target block {at:?}",
);
}
Ok(())
}

/// Get keys with `prefix` at `block` using parallel workers.
async fn rpc_get_keys_parallel(
&self,
Expand Down Expand Up @@ -453,10 +492,7 @@ where
for item in batch_response.into_iter() {
match item {
Ok(x) => all_data.push(x),
Err(e) => {
warn!(target: LOG_TARGET, "Value worker {worker_index}: batch item error: {}", e.message());
all_data.push(None);
},
Err(e) => return Err(format!("batch item error: {}", e.message())),
}
}
bar.inc(batch_response_len as u64);
Expand Down Expand Up @@ -893,6 +929,11 @@ where
self.as_online_mut().at = Some(at);
}

// Drop any provider that does not have the target block. A lagging provider would otherwise
// return `UnknownBlock` for every request, dropping keys or causing endless retries.
let at = self.as_online().at_expected();
self.retain_providers_with_block(at).await?;

// Then, a few transformation that we want to perform in the online config:
let online_config = self.as_online_mut();
online_config.pallets.iter().for_each(|p| {
Expand Down Expand Up @@ -972,6 +1013,25 @@ where
let header = self.load_header().await?;
let (raw_storage, computed_root) = pending_ext.into_raw_snapshot();

// Verify the downloaded state against the header's state root.
if self.is_complete_scrape() {
let expected_root = *header.state_root();
if computed_root != expected_root {
error!(
target: LOG_TARGET,
"❌ storage root mismatch: computed {computed_root:?}, expected {expected_root:?} \
(from header). The downloaded state is incomplete or corrupted.",
);
return Err("storage root mismatch: downloaded state is incomplete or corrupted");
}
info!(target: LOG_TARGET, "✅ storage root verified against header: {computed_root:?}");
} else {
debug!(
target: LOG_TARGET,
"skipping storage root verification for partial scrape (no full-state prefix)",
);
}

// If we need to save a snapshot, save the raw storage and root hash to the snapshot.
if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
let snapshot =
Expand Down
Loading