Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 57 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ summit-finalizer = {path = "finalizer"}
summit-rpc = {path = "rpc"}
summit-orchestrator = {path = "orchestrator"}

commonware-consensus = "2026.3.0"
commonware-cryptography = "2026.3.0"
commonware-storage = "2026.3.0"
commonware-runtime = "2026.3.0"
commonware-codec = "2026.3.0"
commonware-p2p = "2026.3.0"
commonware-broadcast = "2026.3.0"
commonware-utils = "2026.3.0"
commonware-resolver = "2026.3.0"
commonware-macros = "2026.3.0"
commonware-math = "2026.3.0"
commonware-parallel = "2026.3.0"
commonware-consensus = "2026.4.0"
commonware-cryptography = "2026.4.0"
commonware-storage = "2026.4.0"
commonware-runtime = "2026.4.0"
commonware-codec = "2026.4.0"
commonware-p2p = "2026.4.0"
commonware-broadcast = "2026.4.0"
commonware-utils = "2026.4.0"
commonware-resolver = "2026.4.0"
commonware-macros = "2026.4.0"
commonware-math = "2026.4.0"
commonware-parallel = "2026.4.0"

alloy-consensus = "1.0.12"
alloy-eips = { version = "1.0.19", features = ["ssz"] }
Expand Down
4 changes: 3 additions & 1 deletion application/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ impl<P: PublicKey> CertifiableAutomaton for Mailbox<P> {

impl<P: PublicKey> Relay for Mailbox<P> {
type Digest = Digest;
type PublicKey = P;
type Plan = commonware_consensus::simplex::Plan<P>;

async fn broadcast(&mut self, digest: Self::Digest) {
async fn broadcast(&mut self, digest: Self::Digest, _plan: Self::Plan) {
self.sender
.send(Message::Broadcast { payload: digest })
.await
Expand Down
22 changes: 12 additions & 10 deletions finalizer/benches/consensus_state_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,19 @@ fn main() {

executor.start(|context| async move {
let config = Config {
log_partition: "bench-log".to_string(),
log_write_buffer: NZUsize!(64 * 1024),
log_compression: None,
log_codec_config: ((), ()),
log_items_per_section: NZU64!(4),
log: commonware_storage::journal::contiguous::variable::Config {
partition: "bench-log".to_string(),
write_buffer: NZUsize!(64 * 1024),
compression: None,
codec_config: ((), ()),
items_per_section: NZU64!(4),
page_cache: CacheRef::from_pooler(
&context,
std::num::NonZero::new(77u16).unwrap(),
NZUsize!(9),
),
},
translator: EightCap,
page_cache: CacheRef::from_pooler(
&context,
std::num::NonZero::new(77u16).unwrap(),
NZUsize!(9),
),
};

let mut db =
Expand Down
25 changes: 19 additions & 6 deletions finalizer/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,15 @@ impl<
) {
let (tx, rx) = mpsc::channel(cfg.mailbox_size);
let state_cfg = StateConfig {
log_partition: format!("{}-finalizer_state-log", cfg.db_prefix),
log_write_buffer: WRITE_BUFFER,
log_compression: None,
log_codec_config: ((), ()),
log_items_per_section: NZU64!(262_144),
log: commonware_storage::journal::contiguous::variable::Config {
partition: format!("{}-finalizer_state-log", cfg.db_prefix),
write_buffer: WRITE_BUFFER,
compression: None,
codec_config: ((), ()),
items_per_section: NZU64!(262_144),
page_cache: cfg.page_cache,
},
translator: EightCap,
page_cache: cfg.page_cache,
};

let db = FinalizerState::<R, V>::new(
Expand Down Expand Up @@ -267,6 +269,9 @@ impl<
self.context.sleep(std::time::Duration::from_secs(5)).await;
} else {
error!(target: "critical", "finalizer started with invalid forkchoice: {forkchoice:?}, height: {}, epoch: {}", self.canonical_state.get_latest_height(), self.canonical_state.get_epoch());
#[cfg(feature = "prom")]
counter!("critical_errors_total", "reason" => "invalid_forkchoice", "severity" => "critical")
.increment(1);
panic!(
"finalizer started with invalid forkchoice: {forkchoice:?}, height: {}, epoch: {}",
self.canonical_state.get_latest_height(),
Expand Down Expand Up @@ -359,6 +364,8 @@ impl<
// orchestrator, the finalizer should never receive a GetEpochGenesisHash request for the wrong epoch.
if epoch != self.canonical_state.get_epoch() {
error!(target: "critical", "Finalizer received epoch genesis hash request from a different epoch. This should not happen and is a bug. Our epoch: {}, requested epoch {}", self.canonical_state.get_epoch(), epoch);
#[cfg(feature = "prom")]
counter!("critical_errors_total", "reason" => "epoch_mismatch", "severity" => "critical").increment(1);
}
let _ = response.send(self.canonical_state.get_epoch_genesis_hash());
},
Expand Down Expand Up @@ -1379,6 +1386,8 @@ async fn execute_block<
?eth_block_hash,
"block validation failed, not executing but keeping in chain"
);
#[cfg(feature = "prom")]
counter!("critical_errors_total", "reason" => "block_validation_failed", "severity" => "critical").increment(1);
}

state.set_latest_height(new_height);
Expand Down Expand Up @@ -1482,6 +1491,8 @@ async fn parse_execution_requests<
// The deposit contract verifies that the withdrawal credentials
// follow the expected format, so this should never happen.
error!(target: "critical", reason = "failed to parse withdrawal credentials (this is not a Summit error)", ?deposit_request);
#[cfg(feature = "prom")]
counter!("critical_errors_total", "reason" => "invalid_withdrawal_credentials", "severity" => "critical").increment(1);
warn!(
"Failed to parse withdrawal credentials: {e}"
);
Expand Down Expand Up @@ -1514,6 +1525,8 @@ async fn parse_execution_requests<
// The deposit contract verifies that the withdrawal credentials
// follow the expected format, so this should never happen.
error!(target: "critical", reason = "failed to parse withdrawal credentials (this is not a Summit error)", ?deposit_request);
#[cfg(feature = "prom")]
counter!("critical_errors_total", "reason" => "invalid_withdrawal_credentials", "severity" => "critical").increment(1);
warn!("Failed to parse withdrawal credentials: {e}");
continue;
}
Expand Down
55 changes: 31 additions & 24 deletions finalizer/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ impl<E: Clock + Storage + Metrics, V: Variant> FinalizerState<E, V> {
/// Log a database error and initiate graceful shutdown.
fn handle_db_error(&self, e: impl std::fmt::Display, op: &str) {
error!(target: "critical", %e, op, "fatal database error, initiating shutdown");
#[cfg(feature = "prom")]
metrics::counter!("critical_errors_total", "reason" => "fatal_db_error", "severity" => "critical").increment(1);
self.cancellation_token.cancel();
}

Expand Down Expand Up @@ -101,7 +103,7 @@ impl<E: Clock + Storage + Metrics, V: Variant> FinalizerState<E, V> {
let key = Self::pad_key(&LATEST_CONSENSUS_STATE_EPOCH_KEY);
if let Err(e) = self
.store
.write_batch([(key, Some(Value::U64(epoch)))])
.apply_batch([(key, Some(Value::U64(epoch)))].into())
.await
{
self.handle_db_error(e, "set_latest_consensus_state_epoch");
Expand All @@ -125,7 +127,7 @@ impl<E: Clock + Storage + Metrics, V: Variant> FinalizerState<E, V> {
let key = Self::pad_key(&LATEST_FINALIZED_HEADER_EPOCH_KEY);
if let Err(e) = self
.store
.write_batch([(key, Some(Value::U64(epoch)))])
.apply_batch([(key, Some(Value::U64(epoch)))].into())
.await
{
self.handle_db_error(e, "set_latest_finalized_header_epoch");
Expand All @@ -149,7 +151,7 @@ impl<E: Clock + Storage + Metrics, V: Variant> FinalizerState<E, V> {
let key = Self::pad_key(&LATEST_CHECKPOINT_EPOCH_KEY);
if let Err(e) = self
.store
.write_batch([(key, Some(Value::U64(epoch)))])
.apply_batch([(key, Some(Value::U64(epoch)))].into())
.await
{
self.handle_db_error(e, "set_latest_checkpoint_epoch");
Expand All @@ -161,7 +163,7 @@ impl<E: Clock + Storage + Metrics, V: Variant> FinalizerState<E, V> {
let key = Self::make_consensus_state_key(epoch);
if let Err(e) = self
.store
.write_batch([(key, Some(Value::ConsensusState(Box::new(state.clone()))))])
.apply_batch([(key, Some(Value::ConsensusState(Box::new(state.clone()))))].into())
.await
{
self.handle_db_error(e, "store_consensus_state");
Expand Down Expand Up @@ -202,7 +204,7 @@ impl<E: Clock + Storage + Metrics, V: Variant> FinalizerState<E, V> {
pub async fn delete_consensus_state(&mut self, epoch: u64) {
if let Err(e) = self
.store
.write_batch([(Self::make_consensus_state_key(epoch), None)])
.apply_batch([(Self::make_consensus_state_key(epoch), None)].into())
.await
{
self.handle_db_error(e, "delete_consensus_state");
Expand All @@ -220,13 +222,16 @@ impl<E: Clock + Storage + Metrics, V: Variant> FinalizerState<E, V> {
let key = Self::make_checkpoint_key(epoch);
if let Err(e) = self
.store
.write_batch([(
key,
Some(Value::Checkpoint(Box::new((
checkpoint.clone(),
last_block,
)))),
)])
.apply_batch(
[(
key,
Some(Value::Checkpoint(Box::new((
checkpoint.clone(),
last_block,
)))),
)]
.into(),
)
.await
{
self.handle_db_error(e, "store_finalized_checkpoint");
Expand Down Expand Up @@ -268,7 +273,7 @@ impl<E: Clock + Storage + Metrics, V: Variant> FinalizerState<E, V> {
let key = Self::make_finalized_header_key(epoch);
if let Err(e) = self
.store
.write_batch([(key, Some(Value::FinalizedHeader(Box::new(header.clone()))))])
.apply_batch([(key, Some(Value::FinalizedHeader(Box::new(header.clone()))))].into())
.await
{
self.handle_db_error(e, "store_finalized_header");
Expand Down Expand Up @@ -307,7 +312,7 @@ impl<E: Clock + Storage + Metrics, V: Variant> FinalizerState<E, V> {

// Commit all pending changes to the database
pub async fn commit(&mut self) {
if let Err(e) = self.store.commit(None).await {
if let Err(e) = self.store.commit().await {
self.handle_db_error(e, "commit");
}
}
Expand Down Expand Up @@ -408,17 +413,19 @@ mod tests {
context: E,
) -> FinalizerState<E, V> {
let config = Config {
log_partition: format!("{}-log", partition),
log_write_buffer: NZUsize!(64 * 1024),
log_compression: None,
log_codec_config: ((), ()),
log_items_per_section: NZU64!(4),
log: commonware_storage::journal::contiguous::variable::Config {
partition: format!("{}-log", partition),
write_buffer: NZUsize!(64 * 1024),
compression: None,
codec_config: ((), ()),
items_per_section: NZU64!(4),
page_cache: CacheRef::from_pooler(
&context,
std::num::NonZero::new(77u16).unwrap(),
NZUsize!(9),
),
},
translator: EightCap,
page_cache: CacheRef::from_pooler(
&context,
std::num::NonZero::new(77u16).unwrap(),
NZUsize!(9),
),
};
FinalizerState::<E, V>::new(context, config, CancellationToken::new()).await
}
Expand Down
2 changes: 2 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ tikv-jemalloc-ctl = { version = "0.6", optional = true, features = ["stats"] }

[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.17.0", optional = true }
libc = { version = "0.2", optional = true }

[dev-dependencies]
commonware-macros.workspace = true
Expand Down Expand Up @@ -146,6 +147,7 @@ prom = [
"http",
"eyre",
"procfs",
"libc",
"summit-types/prom",
]
tokio-console = ["console-subscriber"]
Expand Down
54 changes: 54 additions & 0 deletions node/src/prom/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl Default for HooksBuilder {
Box::new(|| Collector::default().collect()),
Box::new(collect_memory_stats),
Box::new(collect_io_stats),
Box::new(collect_disk_stats),
],
}
}
Expand Down Expand Up @@ -175,3 +176,56 @@ fn collect_io_stats() {

#[cfg(not(target_os = "linux"))]
const fn collect_io_stats() {}

#[cfg(target_os = "linux")]
fn collect_disk_stats() {
use metrics::gauge;
use std::ffi::CString;
use std::fs;
use std::mem::MaybeUninit;
use tracing::error;

let Ok(contents) = fs::read_to_string("/proc/mounts")
.map_err(|error| error!(%error, "Failed to read /proc/mounts"))
else {
return;
};

for line in contents.lines() {
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() < 3 {
continue;
}
let device = fields[0];
if !device.starts_with('/') {
continue;
}
let mount_point = fields[1];

let Some(c_path) = CString::new(mount_point).ok() else {
continue;
};

let mut buf = MaybeUninit::<libc::statvfs>::uninit();
let ret = unsafe { libc::statvfs(c_path.as_ptr(), buf.as_mut_ptr()) };
if ret != 0 {
continue;
}
let stat = unsafe { buf.assume_init() };

let block_size = stat.f_frsize;
let total = stat.f_blocks * block_size;
let free = stat.f_bfree * block_size;
let available = stat.f_bavail * block_size;
let used = total.saturating_sub(free);

gauge!("disk.total_bytes", "mountpoint" => mount_point.to_string()).set(total as f64);
gauge!("disk.free_bytes", "mountpoint" => mount_point.to_string()).set(free as f64);
gauge!("disk.available_bytes", "mountpoint" => mount_point.to_string())
.set(available as f64);
gauge!("disk.used_bytes", "mountpoint" => mount_point.to_string()).set(used as f64);
}
}

#[cfg(not(target_os = "linux"))]
const fn collect_disk_stats() {}
Loading
Loading