Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,16 @@ impl BlockChainServer {
/// Build and publish a block for the given slot and validator.
fn propose_block(&mut self, slot: u64, validator_id: u64) {
info!(%slot, %validator_id, "We are the proposer for this slot");
let _block_timing = metrics::time_block_building();

// Build the block with attestation signatures
let Ok((block, attestation_signatures, post_checkpoints)) =
store::produce_block_with_signatures(&mut self.store, slot, validator_id)
.inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block"))
store::produce_block_with_signatures(&mut self.store, slot, validator_id).inspect_err(
|err| {
metrics::inc_block_building_failures();
error!(%slot, %validator_id, %err, "Failed to build block");
},
)
else {
return;
};
Expand Down Expand Up @@ -278,10 +283,13 @@ impl BlockChainServer {

// Process the block locally before publishing
if let Err(err) = self.process_block(signed_block.clone()) {
metrics::inc_block_building_failures();
error!(%slot, %validator_id, %err, "Failed to process built block");
return;
};

metrics::inc_block_building_success();

// Publish to gossip network
if let Some(ref p2p) = self.p2p {
let _ = p2p
Expand Down
76 changes: 75 additions & 1 deletion crates/blockchain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,55 @@ static LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS: std::sync::LazyLock<H
register_histogram!(
"lean_committee_signatures_aggregation_time_seconds",
"Time taken to aggregate committee signatures",
vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0]
vec![0.05, 0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0]
)
.unwrap()
});

static LEAN_BLOCK_AGGREGATED_PAYLOADS: std::sync::LazyLock<Histogram> =
std::sync::LazyLock::new(|| {
register_histogram!(
"lean_block_aggregated_payloads",
"Number of aggregated_payloads in a block",
vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0]
)
.unwrap()
});

static LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS: std::sync::LazyLock<Histogram> =
std::sync::LazyLock::new(|| {
register_histogram!(
"lean_block_building_payload_aggregation_time_seconds",
"Time taken to build aggregated_payloads during block building",
vec![0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0]
)
.unwrap()
});

static LEAN_BLOCK_BUILDING_TIME_SECONDS: std::sync::LazyLock<Histogram> =
std::sync::LazyLock::new(|| {
register_histogram!(
"lean_block_building_time_seconds",
"Time taken to build a block",
vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0]
)
.unwrap()
});

static LEAN_BLOCK_BUILDING_SUCCESS_TOTAL: std::sync::LazyLock<IntCounter> =
std::sync::LazyLock::new(|| {
register_int_counter!(
"lean_block_building_success_total",
"Successful block builds"
)
.unwrap()
});

static LEAN_BLOCK_BUILDING_FAILURES_TOTAL: std::sync::LazyLock<IntCounter> =
std::sync::LazyLock::new(|| {
register_int_counter!("lean_block_building_failures_total", "Failed block builds").unwrap()
});

static LEAN_FORK_CHOICE_REORG_DEPTH: std::sync::LazyLock<Histogram> =
std::sync::LazyLock::new(|| {
register_histogram!(
Expand Down Expand Up @@ -314,6 +358,11 @@ pub fn init() {
std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_BUILDING_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_BLOCK_AGGREGATED_PAYLOADS);
std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_SUCCESS_TOTAL);
std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_FAILURES_TOTAL);
std::sync::LazyLock::force(&LEAN_FORK_CHOICE_REORG_DEPTH);
}

Expand Down Expand Up @@ -476,3 +525,28 @@ pub fn set_attestation_committee_count(count: u64) {
pub fn observe_fork_choice_reorg_depth(depth: u64) {
LEAN_FORK_CHOICE_REORG_DEPTH.observe(depth as f64);
}

/// Observe the number of aggregated payloads in a produced block.
pub fn observe_block_aggregated_payloads(count: usize) {
LEAN_BLOCK_AGGREGATED_PAYLOADS.observe(count as f64);
}

/// Start timing payload aggregation during block building. Records duration when the guard is dropped.
pub fn time_block_building_payload_aggregation() -> TimingGuard {
TimingGuard::new(&LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS)
}

/// Start timing block building. Records duration when the guard is dropped.
pub fn time_block_building() -> TimingGuard {
TimingGuard::new(&LEAN_BLOCK_BUILDING_TIME_SECONDS)
}

/// Increment the successful block builds counter.
pub fn inc_block_building_success() {
LEAN_BLOCK_BUILDING_SUCCESS_TOTAL.inc();
}

/// Increment the failed block builds counter.
pub fn inc_block_building_failures() {
LEAN_BLOCK_BUILDING_FAILURES_TOTAL.inc();
}
4 changes: 4 additions & 0 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,8 @@ pub fn produce_block_with_signatures(
&aggregated_payloads,
)?;

metrics::observe_block_aggregated_payloads(signatures.len());

Ok((block, signatures, post_checkpoints))
}

Expand Down Expand Up @@ -1037,6 +1039,8 @@ fn build_block(
let mut accumulated_proof_bytes: usize = 0;

if !aggregated_payloads.is_empty() {
let _payload_timing = metrics::time_block_building_payload_aggregation();

// Genesis edge case: when building on genesis (slot 0),
// process_block_header will set latest_justified.root = parent_root.
// Derive this upfront so attestation filtering matches.
Expand Down
5 changes: 4 additions & 1 deletion crates/net/p2p/src/gossipsub/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::{
attestation_subnet_topic,
},
};
use crate::P2PServer;
use crate::{P2PServer, metrics};

pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
let Event::Message {
Expand All @@ -29,6 +29,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
let topic_kind = message.topic.as_str().split("/").nth(3);
match topic_kind {
Some(BLOCK_TOPIC_KIND) => {
metrics::observe_gossip_block_size(message.data.len());
let Ok(uncompressed_data) = decompress_message(&message.data)
.inspect_err(|err| error!(%err, "Failed to decompress gossipped block"))
else {
Expand Down Expand Up @@ -60,6 +61,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
}
}
Some(AGGREGATION_TOPIC_KIND) => {
metrics::observe_gossip_aggregation_size(message.data.len());
let Ok(uncompressed_data) = decompress_message(&message.data)
.inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation"))
else {
Expand Down Expand Up @@ -89,6 +91,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
}
}
Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => {
metrics::observe_gossip_attestation_size(message.data.len());
let Ok(uncompressed_data) = decompress_message(&message.data)
.inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation"))
else {
Expand Down
48 changes: 48 additions & 0 deletions crates/net/p2p/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,54 @@ pub fn notify_peer_connected(peer_id: &Option<PeerId>, direction: &str, result:
}
}

static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock<ethlambda_metrics::Histogram> = LazyLock::new(|| {
ethlambda_metrics::register_histogram!(
"lean_gossip_block_size_bytes",
"Bytes size of a gossip block message",
vec![
10000.0, 50000.0, 100000.0, 250000.0, 500000.0, 1000000.0, 2000000.0, 5000000.0
]
)
.unwrap()
});

static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock<ethlambda_metrics::Histogram> =
LazyLock::new(|| {
ethlambda_metrics::register_histogram!(
"lean_gossip_attestation_size_bytes",
"Bytes size of a gossip attestation message",
vec![512.0, 1024.0, 2048.0, 4096.0, 8192.0, 16384.0]
)
.unwrap()
});

static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock<ethlambda_metrics::Histogram> =
LazyLock::new(|| {
ethlambda_metrics::register_histogram!(
"lean_gossip_aggregation_size_bytes",
"Bytes size of a gossip aggregated attestation message",
vec![
1024.0, 4096.0, 16384.0, 65536.0, 131072.0, 262144.0, 524288.0, 1048576.0
]
)
.unwrap()
});

/// Observe the compressed size of a gossip block message.
pub fn observe_gossip_block_size(bytes: usize) {
LEAN_GOSSIP_BLOCK_SIZE_BYTES.observe(bytes as f64);
}

/// Observe the compressed size of a gossip attestation message.
pub fn observe_gossip_attestation_size(bytes: usize) {
LEAN_GOSSIP_ATTESTATION_SIZE_BYTES.observe(bytes as f64);
}

/// Observe the compressed size of a gossip aggregated attestation message.
pub fn observe_gossip_aggregation_size(bytes: usize) {
LEAN_GOSSIP_AGGREGATION_SIZE_BYTES.observe(bytes as f64);
}

/// Notify that a peer disconnected.
///
/// Decrements the connected peer count and increments the disconnection event counter.
Expand Down
Loading