Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions crates/subcoin-snapcake/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,19 @@ use subcoin_runtime::RuntimeApi;
use subcoin_runtime::interface::OpaqueBlock as Block;
use subcoin_service::{GenesisBlockBuilder, TransactionAdapter};
use syncing_strategy::TargetBlock;
use tokio::sync::oneshot;

type FullClient = sc_service::TFullClient<Block, RuntimeApi, WasmExecutor>;

/// Parameters specific to snapcake's state sync operation.
struct SnapcakeSyncParams {
bitcoin_network: bitcoin::Network,
skip_proof: bool,
snapshot_dir: PathBuf,
sync_target: TargetBlock<Block>,
shutdown_tx: oneshot::Sender<()>,
}

fn main() -> sc_cli::Result<()> {
let app = App::parse();

Expand Down Expand Up @@ -125,46 +135,64 @@ fn start_snapcake_node(

let client = Arc::new(client);

// Create channel for signaling shutdown after state sync completion.
let (shutdown_tx, shutdown_rx) = oneshot::channel();

let sync_params = SnapcakeSyncParams {
bitcoin_network,
skip_proof,
snapshot_dir,
sync_target,
shutdown_tx,
};

match config.network.network_backend {
NetworkBackendType::Libp2p => {
start_substrate_network::<sc_network::NetworkWorker<Block, <Block as BlockT>::Hash>>(
&mut config,
client,
&mut task_manager,
bitcoin_network,
skip_proof,
snapshot_dir,
sync_target,
)?
}
NetworkBackendType::Libp2p => start_substrate_network::<
sc_network::NetworkWorker<Block, <Block as BlockT>::Hash>,
>(
&mut config, client, &mut task_manager, sync_params
)?,
NetworkBackendType::Litep2p => {
start_substrate_network::<sc_network::Litep2pNetworkBackend>(
&mut config,
client,
&mut task_manager,
bitcoin_network,
skip_proof,
snapshot_dir,
sync_target,
sync_params,
)?;
}
}

// Spawn a task that waits for shutdown signal and triggers graceful exit.
task_manager
.spawn_handle()
.spawn("shutdown-handler", None, async move {
if shutdown_rx.await.is_ok() {
tracing::info!("🎉 Snapshot generation complete, shutting down...");
// Exit cleanly after state sync completion.
// This is intentional as snapcake's sole purpose is to generate the snapshot.
std::process::exit(0);
}
});

Ok(task_manager)
}

fn start_substrate_network<N>(
config: &mut Configuration,
client: Arc<FullClient>,
task_manager: &mut sc_service::TaskManager,
bitcoin_network: bitcoin::Network,
skip_proof: bool,
snapshot_dir: PathBuf,
sync_target: TargetBlock<Block>,
sync_params: SnapcakeSyncParams,
) -> Result<(), sc_service::error::Error>
where
N: sc_network::NetworkBackend<Block, <Block as BlockT>::Hash>,
{
let SnapcakeSyncParams {
bitcoin_network,
skip_proof,
snapshot_dir,
sync_target,
shutdown_tx,
} = sync_params;
let mut net_config = sc_network::config::FullNetworkConfiguration::<
Block,
<Block as BlockT>::Hash,
Expand Down Expand Up @@ -216,6 +244,7 @@ where
skip_proof,
snapshot_dir,
sync_target,
shutdown_tx,
)?;

let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
Expand Down
13 changes: 11 additions & 2 deletions crates/subcoin-snapcake/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ pub struct SnapcakeNetworkParams {
/// This flag is primarily for the testing purpose.
#[arg(long)]
pub block_hash: Option<sp_core::H256>,

/// Enable local network discovery via mDNS.
///
/// By default, local peer discovery is disabled. Use this flag when running
/// multiple nodes on a local network for development or testing.
#[arg(long)]
pub discover_local: bool,
}

impl SnapcakeNetworkParams {
Expand All @@ -83,6 +90,7 @@ impl SnapcakeNetworkParams {
bootnodes,
port,
network_backend,
discover_local,
..
} = self;

Expand All @@ -98,15 +106,16 @@ impl SnapcakeNetworkParams {
out_peers: 8,
in_peers: 32,
in_peers_light: 100,
no_mdns: true,
// Enable mDNS when local discovery is requested
no_mdns: !discover_local,
max_parallel_downloads: 8,
node_key_params: NodeKeyParams {
node_key: None,
node_key_type: NodeKeyType::Ed25519,
node_key_file: None,
unsafe_force_node_key_generation: false,
},
discover_local: false,
discover_local,
kademlia_disjoint_query_paths: false,
kademlia_replication_factor: NonZero::new(20).expect("20 is not zero"),
ipfs_server: false,
Expand Down
58 changes: 38 additions & 20 deletions crates/subcoin-snapcake/src/snapshot_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,21 @@ enum SnapshotStore {
}

impl SnapshotStore {
/// Append a UTXO to the store.
fn push(&mut self, utxo: Utxo) -> std::io::Result<()> {
/// Append a batch of UTXOs to the store.
fn push_batch(&mut self, utxos: &[Utxo]) -> std::io::Result<usize> {
match self {
Self::InMem(list) => {
list.push(utxo);
Ok(())
let count = utxos.len();
list.extend(utxos.iter().cloned());
Ok(count)
}
Self::Csv(path) => {
for utxo in utxos {
Self::append_to_csv(path, utxo.clone())?;
}
Ok(utxos.len())
}
Self::Csv(path) => Self::append_to_csv(path, utxo),
Self::Rocksdb(db) => Self::insert_to_rocksdb(db, utxo)
Self::Rocksdb(db) => Self::insert_batch_to_rocksdb(db, utxos)
.map_err(|err| std::io::Error::other(format!("rocksdb: {err:?}"))),
}
}
Expand All @@ -76,24 +82,34 @@ impl SnapshotStore {
Ok(())
}

fn insert_to_rocksdb(db: &DB, utxo: Utxo) -> Result<(), rocksdb::Error> {
fn insert_batch_to_rocksdb(db: &DB, utxos: &[Utxo]) -> Result<usize, rocksdb::Error> {
if utxos.is_empty() {
return Ok(0);
}

let mut coin_count =
Self::read_rocksdb_count(db).expect("Failed to read count from Rocksdb") as u64;

let Utxo { txid, vout, coin } = utxo;
let mut batch = rocksdb::WriteBatch::default();
let batch_size = utxos.len();

let mut key = Vec::with_capacity(32 + 4);
key.extend(txid.to_byte_array()); // Raw bytes of Txid
key.extend(vout.to_be_bytes()); // Ensure vout is big-endian for consistent ordering
for utxo in utxos {
let Utxo { txid, vout, coin } = utxo;

let value = bincode::serialize(&coin).expect("Failed to serialize Coin"); // Serialize coin data
let mut key = Vec::with_capacity(32 + 4);
key.extend(txid.to_byte_array());
key.extend(vout.to_be_bytes());

coin_count += 1;
let value = bincode::serialize(coin).expect("Failed to serialize Coin");
batch.put(&key, value);
}

db.put(&key, value)?;
db.put(COUNT_KEY, coin_count.to_le_bytes())?;
coin_count += batch_size as u64;
batch.put(COUNT_KEY, coin_count.to_le_bytes());

Ok(())
db.write(batch)?;

Ok(batch_size)
}

/// Count total UTXO entries in the store.
Expand Down Expand Up @@ -333,11 +349,13 @@ impl SnapshotManager {
}
}

/// Adds a UTXO to the store.
pub fn store_utxo(&mut self, utxo: Utxo) {
/// Adds a batch of UTXOs to the store.
///
/// Returns the number of UTXOs stored.
pub fn store_utxos_batch(&mut self, utxos: &[Utxo]) -> usize {
self.store
.push(utxo)
.expect("Failed to add UTXO to the store");
.push_batch(utxos)
.expect("Failed to add UTXOs batch to the store")
}

/// Generates a snapshot and ensures the total UTXO count matches the expected count.
Expand Down
26 changes: 20 additions & 6 deletions crates/subcoin-snapcake/src/state_sync_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,27 @@ where
fn process_state_response(&mut self, response: &StateResponse) {
let mut complete = false;

let key_values = response.entries.iter().flat_map(|key_vlaue_state_entry| {
if key_vlaue_state_entry.complete {
// Count total entries for pre-allocation
let total_entries: usize = response
.entries
.iter()
.map(|entry| entry.entries.len())
.sum();

let key_values = response.entries.iter().flat_map(|key_value_state_entry| {
if key_value_state_entry.complete {
complete = true;
}

key_vlaue_state_entry
key_value_state_entry
.entries
.iter()
.map(|state_entry| (&state_entry.key, &state_entry.value))
});

// Pre-allocate with capacity (upper bound, not all entries are UTXOs)
let mut utxos_batch = Vec::with_capacity(total_entries);

for (key, value) in key_values {
if key.len() > 32 {
// Attempt to decode the Coin storage item.
Expand All @@ -96,13 +106,17 @@ where

self.muhash.insert(&data);

self.snapshot_manager.store_utxo(Utxo { txid, vout, coin });

self.received_coins += 1;
utxos_batch.push(Utxo { txid, vout, coin });
}
}
}

// Batch write all UTXOs from this response
if !utxos_batch.is_empty() {
let batch_size = self.snapshot_manager.store_utxos_batch(&utxos_batch);
self.received_coins += batch_size;
}

if self
.last_progress_print_time
.is_none_or(|last_time| last_time.elapsed() > DOWNLOAD_PROGRESS_LOG_INTERVAL)
Expand Down
26 changes: 20 additions & 6 deletions crates/subcoin-snapcake/src/syncing_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use subcoin_primitives::runtime::SubcoinApi;
use subcoin_service::network_request_handler::{
VersionedNetworkRequest, VersionedNetworkResponse, v1,
};
use tokio::sync::oneshot;

const LOG_TARGET: &str = "sync::snapcake";

Expand All @@ -39,6 +40,11 @@ const SUBCOIN_STRATEGY_KEY: StrategyKey = StrategyKey::new("Subcoin");
/// Maximum blocks per response.
const MAX_BLOCKS_IN_RESPONSE: usize = 128;

/// Number of confirmations required for a block to be considered finalized.
///
/// In Bitcoin, 6 confirmations is the widely accepted threshold for transaction finality.
const CONFIRMATION_DEPTH: usize = 6;

/// Corresponding `ChainSync` mode.
fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
match sync_mode {
Expand Down Expand Up @@ -72,6 +78,7 @@ pub fn build_snapcake_syncing_strategy<Block, Client, Net>(
skip_proof: bool,
snapshot_dir: PathBuf,
sync_target: TargetBlock<Block>,
shutdown_tx: oneshot::Sender<()>,
) -> Result<Box<dyn SyncingStrategy<Block>>, sc_service::Error>
where
Block: BlockT,
Expand Down Expand Up @@ -136,6 +143,7 @@ where
snapshot_dir,
sync_target,
subcoin_network_request_protocol_name,
shutdown_tx,
)?))
}

Expand Down Expand Up @@ -176,6 +184,8 @@ pub struct SnapcakeSyncingStrategy<B: BlockT, Client> {
sync_target: TargetBlock<B>,
/// Subcoin network request protocol name.
subcoin_network_request_protocol_name: ProtocolName,
/// Channel to signal shutdown after state sync completion.
shutdown_tx: Option<oneshot::Sender<()>>,
}

impl<B, Client> SnapcakeSyncingStrategy<B, Client>
Expand All @@ -197,6 +207,7 @@ where
snapshot_dir: PathBuf,
sync_target: TargetBlock<B>,
subcoin_network_request_protocol_name: ProtocolName,
shutdown_tx: oneshot::Sender<()>,
) -> Result<Self, sp_blockchain::Error> {
if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
tracing::info!(
Expand Down Expand Up @@ -234,6 +245,7 @@ where
snapshot_dir,
sync_target,
subcoin_network_request_protocol_name,
shutdown_tx: Some(shutdown_tx),
})
}

Expand Down Expand Up @@ -304,11 +316,11 @@ where
TargetBlock::LastFinalized
) {
// TODO: validate_blocks
if blocks.len() < 6 {
if blocks.len() < CONFIRMATION_DEPTH {
tracing::error!(target: LOG_TARGET, "No finalized block in the block response: {blocks:?}");
return;
}
5
CONFIRMATION_DEPTH - 1
} else {
0
};
Expand Down Expand Up @@ -423,8 +435,8 @@ where
from: FromBlock::Hash(best_hash),
direction: Direction::Descending,
// Attempt to download the most recent finalized block, i.e.,
// `peer_best_block - confirmation_depth(6)`.
max: Some(6),
// `peer_best_block - CONFIRMATION_DEPTH`.
max: Some(CONFIRMATION_DEPTH as u32),
};
self.pending_header_requests.push((peer_id, request));
}
Expand Down Expand Up @@ -777,8 +789,10 @@ where
tracing::info!("✅ State sync is complete");
self.state.take();
self.state_sync_complete = true;
// Exit the entire program directly once the state sync is complete.
std::process::exit(0);
// Signal shutdown to trigger graceful exit.
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
}

if actions.iter().any(SyncingAction::is_finished) {
Expand Down