Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 77 additions & 10 deletions client/blockchain-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,13 @@ where
}

// Catch up on any blocks that were imported but not processed
self.actor.catch_up_missed_blocks().await;
if let Err(e) = self.actor.catch_up_missed_blocks().await {
error!(
target: LOG_TARGET,
"Failed to catch up missed blocks during startup: {:?}. Finalized blocks will be retried on next finality notification.",
e
);
}

// Import notification stream to be notified of new blocks.
// The behaviour of this stream is:
Expand Down Expand Up @@ -1871,8 +1877,17 @@ where
match new_block_notification_kind {
NewBlockNotificationKind::NewBestBlock { new_best_block, .. } => {
// Process the new best block as a linear chain extension
self.process_sync_block(&new_best_block.hash, new_best_block.number)
.await;
// If processing fails, the block won't be marked as processed and will be retried on restart
if let Err(e) = self
.process_sync_block(&new_best_block.hash, new_best_block.number)
.await
{
error!(
target: LOG_TARGET,
"Failed to process sync block #{}: {:?}. Block will be retried on restart.",
new_best_block.number, e
);
}
}
NewBlockNotificationKind::NewNonBestBlock(_) => {
// Skip non-best blocks (uncle/stale blocks not on the canonical chain)
Expand All @@ -1884,7 +1899,14 @@ where
..
} => {
// Process the reorg
self.process_sync_reorg(&tree_route, new_best_block).await;
// If processing fails, the block won't be marked as processed and will be retried on restart
if let Err(e) = self.process_sync_reorg(&tree_route, new_best_block).await {
error!(
target: LOG_TARGET,
"Failed to process sync reorg: {:?}. Block will be retried on restart.",
e
);
}
}
}
}
Expand Down Expand Up @@ -1962,14 +1984,31 @@ where
.await;

// Provider-specific code to run at the start of every block import.
// If forest root changes fail, we log the error but continue processing the block.
match self.maybe_managed_provider {
Some(ManagedProvider::Bsp(_)) => {
self.bsp_init_block_processing(block_hash, block_number, tree_route.clone())
.await;
if let Err(e) = self
.bsp_init_block_processing(block_hash, block_number, tree_route.clone())
.await
{
error!(
target: LOG_TARGET,
"Failed to process BSP forest root changes for block #{}: {:?}",
block_number, e
);
Comment on lines +1994 to +1998
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log is misplaced I believe. Here you're calling bsp_init_block_processing inside of process_block_import. It doesn't seem intuitive that whatever error you get, you immediately make the assumption that "Failed to process BSP forest root changes"

}
}
Some(ManagedProvider::Msp(_)) => {
self.msp_init_block_processing(block_hash, block_number, tree_route.clone())
.await;
if let Err(e) = self
.msp_init_block_processing(block_hash, block_number, tree_route.clone())
.await
{
error!(
target: LOG_TARGET,
"Failed to process MSP forest root changes for block #{}: {:?}",
block_number, e
);
}
}
None => {
trace!(target: LOG_TARGET, "No Provider ID found. This node is not managing a Provider.");
Expand Down Expand Up @@ -2189,6 +2228,10 @@ where
}

/// Internal method to process a finality notification.
///
/// If finality event processing fails, the block is NOT marked as processed
/// (i.e., `last_finalised_block_processed` is not updated), ensuring the block
/// will be retried on the next finality notification or restart.
async fn process_finality_notification(
&mut self,
notification: FinalityNotification<OpaqueBlock>,
Expand Down Expand Up @@ -2249,12 +2292,36 @@ where
continue;
}

self.process_finality_events(intermediate_hash);
if let Err(e) = self.process_finality_events(intermediate_hash) {
error!(
target: LOG_TARGET,
"Failed to process finality events for intermediate block #{} (0x{:x}): {:?}. Block will be retried on the next finality notification or restart.",
intermediate_number, intermediate_hash, e
);
observe_histogram!(metrics: self.metrics.as_ref(), block_processing_seconds, labels: &["finalized_block", STATUS_FAILURE], start.elapsed().as_secs_f64());
return;
}

// Update last_finalised_block_processed after each successful intermediate block
// so we don't reprocess it on restart or on later finality notifications if a later block fails
self.last_finalised_block_processed = MinimalBlockInfo {
number: intermediate_number,
hash: *intermediate_hash,
};
self.update_last_finalised_block_info(self.last_finalised_block_processed);
}
}

// Process finality events for the newly finalised block itself
self.process_finality_events(&block_hash);
if let Err(e) = self.process_finality_events(&block_hash) {
error!(
target: LOG_TARGET,
"Failed to process finality events for block #{} (0x{:x}): {:?}. Block will be retried on the next finality notification or restart.",
block_number, block_hash, e
);
observe_histogram!(metrics: self.metrics.as_ref(), block_processing_seconds, labels: &["finalized_block", STATUS_FAILURE], start.elapsed().as_secs_f64());
return;
}

// Cleanup the pending transaction store for the last finalised block processed.
// Transactions with a nonce below the on-chain nonce of this block are finalised.
Expand Down
68 changes: 41 additions & 27 deletions client/blockchain-service/src/handler_bsp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Result;
use log::{debug, error, info, trace, warn};
use shc_common::traits::StorageEnableRuntime;
use std::{collections::HashSet, sync::Arc};
Expand Down Expand Up @@ -53,19 +54,19 @@ where
/// This is called for each sync block to apply `MutationsAppliedForProvider` events
/// before state pruning can occur. This ensures the local forest stays in sync with
/// the on-chain state even when the node has been offline for a long period.
///
/// Returns an error if there's a failure during event processing that should prevent
/// the block from being marked as processed (e.g., API errors when fetching events).
pub(crate) async fn process_bsp_sync_mutations(
&mut self,
block_hash: &Runtime::Hash,
bsp_id: BackupStorageProviderId<Runtime>,
) {
) -> Result<()> {
// Get all events for the block
let events = match get_events_at_block::<Runtime>(&self.client, block_hash) {
Ok(events) => events,
Err(e) => {
warn!(target: LOG_TARGET, "Failed to get events during sync: {:?}", e);
return;
}
};
let events = get_events_at_block::<Runtime>(&self.client, block_hash).map_err(|e| {
warn!(target: LOG_TARGET, "Failed to get events during sync: {:?}", e);
e
})?;

// Apply any mutations in the block that are relevant to this BSP
for ev in events {
Expand Down Expand Up @@ -100,6 +101,8 @@ where
}
}
}

Ok(())
}

/// Handles the initial sync of a BSP, after coming out of syncing mode.
Expand All @@ -125,19 +128,23 @@ where
/// Steps:
/// 1. Catch up to Forest root changes in this BSP's Forest.
/// 2. In blocks that are a multiple of `BlockchainServiceConfig::check_for_pending_proofs_period`, catch up to proof submissions for the current tick.
///
/// Returns an error if there's a failure during forest root change processing.
pub(crate) async fn bsp_init_block_processing<Block>(
&mut self,
block_hash: &Runtime::Hash,
block_number: &BlockNumber<Runtime>,
tree_route: TreeRoute<Block>,
) where
) -> Result<()>
where
Block: BlockT<Hash = Runtime::Hash>,
{
self.forest_root_changes_catchup(&tree_route).await;
self.forest_root_changes_catchup(&tree_route).await?;
let block_number: U256 = (*block_number).into();
if block_number % self.config.check_for_pending_proofs_period == Zero::zero() {
self.proof_submission_catch_up(block_hash);
}
Ok(())
}

/// Processes new block imported events that are only relevant for a BSP.
Expand Down Expand Up @@ -579,16 +586,21 @@ where
}
}

/// Processes forest root changing events for BSP.
///
/// Returns an error if there's a failure during event processing that should prevent
/// the block from being marked as processed (e.g., failed to apply mutations and verify root).
pub(crate) async fn bsp_process_forest_root_changing_events(
&mut self,
event: StorageEnableEvents<Runtime>,
revert: bool,
) {
) -> Result<()> {
let managed_bsp_id = match &self.maybe_managed_provider {
Some(ManagedProvider::Bsp(bsp_handler)) => &bsp_handler.bsp_id,
_ => {
error!(target: LOG_TARGET, "`bsp_process_forest_root_changing_events` should only be called if the node is managing a BSP. Found [{:?}] instead.", self.maybe_managed_provider);
return;
let err_msg = format!("`bsp_process_forest_root_changing_events` should only be called if the node is managing a BSP. Found [{:?}] instead.", self.maybe_managed_provider);
error!(target: LOG_TARGET, "{}", err_msg);
return Err(anyhow::anyhow!(err_msg));
}
};

Expand All @@ -604,7 +616,7 @@ where
// Check if the `provider_id` is the BSP that this node is managing.
if provider_id != *managed_bsp_id {
debug!(target: LOG_TARGET, "Provider ID [{:?}] is not the BSP ID [{:?}] that this node is managing. Skipping mutations applied event.", provider_id, managed_bsp_id);
return;
return Ok(());
}

info!(target: LOG_TARGET, "🪾 Applying mutations to BSP [{:?}]", provider_id);
Expand Down Expand Up @@ -634,24 +646,26 @@ where
// For file deletions, we will remove the file from the File Storage only after finality is reached.
// This gives us the opportunity to put the file back in the Forest if this block is re-orged.
let current_forest_key = CURRENT_FOREST_KEY.to_vec();
if let Err(e) = self
.apply_forest_mutations_and_verify_root(
current_forest_key,
&mutations,
revert,
old_root,
new_root,
)
.await
{
error!(target: LOG_TARGET, "CRITICAL ❗️❗️ Failed to apply mutations and verify root for BSP [{:?}]. \nError: {:?}", provider_id, e);
return;
};
self.apply_forest_mutations_and_verify_root(
current_forest_key,
&mutations,
revert,
old_root,
new_root,
)
.await
.map_err(|e| {
let err_msg = format!("CRITICAL ❗️❗️ Failed to apply mutations and verify root for BSP [{:?}]. \nError: {:?}", provider_id, e);
error!(target: LOG_TARGET, "{}", err_msg);
anyhow::anyhow!(err_msg)
})?;

info!(target: LOG_TARGET, "🌳 New local Forest root matches the one in the block for BSP [{:?}]", provider_id);
}
_ => {}
}

Ok(())
}

/// Verifies that the local BSP forest root matches the on-chain root.
Expand Down
Loading
Loading