Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 166 additions & 13 deletions crates/walrus-service/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2513,7 +2513,7 @@ impl StorageNode {
self.inner.set_node_status(NodeStatus::RecoverMetadata)?;
}
self.shard_sync_handler
.start_sync_shards(shards_gained.to_vec(), new_node_joining_committee)
.start_sync_shards(shards_gained.to_vec())
.await?;
}

Expand Down Expand Up @@ -8128,7 +8128,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(shard_indices, wipe_metadata_before_transfer_in_dst)
.start_sync_shards(shard_indices)
.await?;

// Waits for the shard to be synced.
Expand Down Expand Up @@ -8258,7 +8258,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)], wipe_metadata_before_transfer_in_dst)
.start_sync_shards(vec![ShardIndex(0)])
.await?;
wait_for_shard_in_active_state(shard_storage_dst.as_ref()).await?;
check_all_blobs_are_synced(
Expand Down Expand Up @@ -8337,7 +8337,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)], wipe_metadata_before_transfer_in_dst)
.start_sync_shards(vec![ShardIndex(0)])
.await?;
wait_for_shard_in_active_state(shard_storage_dst.as_ref()).await?;
check_all_blobs_are_synced(
Expand Down Expand Up @@ -8439,7 +8439,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)], false)
.start_sync_shards(vec![ShardIndex(0)])
.await?;

// Waits for the shard sync process to stop.
Expand Down Expand Up @@ -8524,7 +8524,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)], false)
.start_sync_shards(vec![ShardIndex(0)])
.await?;

// Waits for the shard sync process to stop.
Expand Down Expand Up @@ -8641,7 +8641,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)], false)
.start_sync_shards(vec![ShardIndex(0)])
.await?;

// Waits for the shard sync process to stop.
Expand Down Expand Up @@ -8692,7 +8692,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)], false)
.start_sync_shards(vec![ShardIndex(0)])
.await?;

// Waits for the shard sync process to stop.
Expand Down Expand Up @@ -8822,7 +8822,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)], false)
.start_sync_shards(vec![ShardIndex(0)])
.await?;

// Waits for the shard sync process to stop.
Expand Down Expand Up @@ -8905,7 +8905,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)], false)
.start_sync_shards(vec![ShardIndex(0)])
.await?;
// Waits for the shard sync process to stop.
wait_until_no_sync_tasks(&cluster.nodes[1].storage_node.shard_sync_handler).await?;
Expand Down Expand Up @@ -8998,7 +8998,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)], true)
.start_sync_shards(vec![ShardIndex(0)])
.await?;

// Waits for the shard sync process to stop.
Expand Down Expand Up @@ -9042,6 +9042,159 @@ mod tests {
Ok(())
}

// Tests that gaining a new shard in a later epoch while blob metadata recovery from a
// previous epoch is still in progress neither cancels the metadata recovery nor orphans
// the shards gained in the previous epoch.
#[walrus_simtest]
async fn sync_shard_new_epoch_does_not_cancel_metadata_recovery() -> TestResult {
use std::sync::atomic::{AtomicBool, Ordering};

let assignment: &[&[u16]] = &[&[0, 1], &[2, 3]];
let (cluster, blob_details, storage_dst, shard_storage_set) =
setup_cluster_for_shard_sync_tests(Some(assignment), None, false).await?;

assert_eq!(shard_storage_set.shard_storage.len(), 2);
let shard_storage_dst_0 = shard_storage_set.shard_storage[0].clone();
let shard_storage_dst_1 = shard_storage_set.shard_storage[1].clone();

// Pause metadata recovery so that it is still in progress when the node gains
// another shard in the next epoch.
let metadata_sync_entered = Arc::new(Notify::new());
let metadata_sync_entered_clone = metadata_sync_entered.clone();
let release_metadata_sync = Arc::new(AtomicBool::new(false));
let release_metadata_sync_clone = release_metadata_sync.clone();
register_fail_point_async("fail_point_shard_sync_recovery_metadata_pause", move || {
let entered = metadata_sync_entered_clone.clone();
let release = release_metadata_sync_clone.clone();
async move {
entered.notify_one();
while !release.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
});

storage_dst.clear_metadata_in_test()?;
storage_dst.set_node_status(NodeStatus::RecoverMetadata)?;

let shard_sync_handler = &cluster.nodes[1].storage_node.shard_sync_handler;

// The node joins the committee and gains shard 0; metadata must be recovered first.
shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)])
.await?;

// Wait until metadata recovery is in progress, parked at the pause fail point.
tokio::time::timeout(Duration::from_secs(30), metadata_sync_entered.notified())
.await
.map_err(|_| anyhow::anyhow!("metadata recovery did not start"))?;

// Node 1 no longer owns its original shards 2 and 3 in the current committee (they
// were handed to node 0 when shards 0 and 1 were reassigned), but their storages
// still exist. Lock shard 3 as if it were being transferred out, to verify that the
// restarted sync-shards task does not clobber shards the node does not own.
let shard_storage_dst_3 = storage_dst
.shard_storage(ShardIndex(3))
.await
.expect("shard storage should exist");
shard_storage_dst_3.lock_shard_for_epoch_change().await?;

// The node gains shard 1 in the next epoch while metadata recovery is still in
// progress; this aborts and replaces the previous sync-shards task.
shard_sync_handler
.start_sync_shards(vec![ShardIndex(1)])
.await?;

// Release the paused metadata recovery.
release_metadata_sync.store(true, Ordering::SeqCst);

wait_until_no_sync_tasks(shard_sync_handler).await?;

// Metadata recovery must have completed and both shards must be fully synced.
assert_eq!(storage_dst.node_status()?, NodeStatus::Active);
assert_eq!(shard_storage_dst_0.status().await?, ShardStatus::Active);
assert_eq!(shard_storage_dst_1.status().await?, ShardStatus::Active);

check_all_blobs_are_synced(&blob_details, &storage_dst, &shard_storage_dst_0, &[])?;
assert_eq!(
shard_storage_dst_1.sliver_count(SliverType::Primary),
Ok(23)
);
assert_eq!(
shard_storage_dst_1.sliver_count(SliverType::Secondary),
Ok(23)
);

// The unowned, locked shard must not have been touched by the sync-shards task.
assert_eq!(
shard_storage_dst_3.status().await?,
ShardStatus::LockedToMove
);

clear_fail_point("fail_point_shard_sync_recovery_metadata_pause");

Ok(())
}

// Tests that the sync-shards task in metadata recovery only syncs shards the node owns
// in the current committee. The node may still hold storages for shards it no longer
// owns (for example, shards locked for transfer to another node during the same epoch
// change); those must not have their status clobbered or be synced.
#[walrus_simtest]
async fn sync_shard_recover_metadata_skips_unowned_shards() -> TestResult {
let assignment: &[&[u16]] = &[&[0, 1], &[2, 3]];
let (cluster, blob_details, storage_dst, shard_storage_set) =
setup_cluster_for_shard_sync_tests(Some(assignment), None, false).await?;

assert_eq!(shard_storage_set.shard_storage.len(), 2);
let shard_storage_dst_0 = shard_storage_set.shard_storage[0].clone();
let shard_storage_dst_1 = shard_storage_set.shard_storage[1].clone();

// Node 1 no longer owns its original shards 2 and 3 in the current committee (they
// were handed to node 0 when shards 0 and 1 were reassigned), but their storages
// still exist. Lock shard 3 as if it were being transferred out.
let shard_storage_dst_3 = storage_dst
.shard_storage(ShardIndex(3))
.await
.expect("shard storage should exist");
shard_storage_dst_3.lock_shard_for_epoch_change().await?;

storage_dst.clear_metadata_in_test()?;
storage_dst.set_node_status(NodeStatus::RecoverMetadata)?;

cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0), ShardIndex(1)])
.await?;

wait_until_no_sync_tasks(&cluster.nodes[1].storage_node.shard_sync_handler).await?;

// The owned shards must be synced and the node must be active.
assert_eq!(storage_dst.node_status()?, NodeStatus::Active);
assert_eq!(shard_storage_dst_0.status().await?, ShardStatus::Active);
assert_eq!(shard_storage_dst_1.status().await?, ShardStatus::Active);
check_all_blobs_are_synced(&blob_details, &storage_dst, &shard_storage_dst_0, &[])?;

// The unowned shards must not have been touched: shard 3 stays locked, and shard 2
// keeps its status.
assert_eq!(
shard_storage_dst_3.status().await?,
ShardStatus::LockedToMove
);
assert_eq!(
storage_dst
.shard_storage(ShardIndex(2))
.await
.expect("shard storage should exist")
.status()
.await?,
ShardStatus::Active
);

Ok(())
}

#[walrus_simtest]
async fn finish_epoch_change_start_should_not_block_event_processing() -> TestResult {
walrus_test_utils::init_tracing();
Expand Down Expand Up @@ -9314,7 +9467,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)], false)
.start_sync_shards(vec![ShardIndex(0)])
.await?;

// Shard recovery should be completed, and all the data should be synced.
Expand Down Expand Up @@ -9386,7 +9539,7 @@ mod tests {
cluster.nodes[1]
.storage_node
.shard_sync_handler
.start_sync_shards(vec![ShardIndex(0)], true)
.start_sync_shards(vec![ShardIndex(0)])
.await?;

tokio::time::sleep(Duration::from_secs(1)).await;
Expand Down
Loading
Loading