Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions cmd/util/cmd/verify_execution_result/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,27 +88,58 @@ func run(*cobra.Command, []string) {
lg.Info().Msgf("look for 'could not verify' in the log for any mismatch, or try again with --stop_on_mismatch true to stop on first mismatch")
}

var totalStats verifier.BlockVerificationStats

if flagFromTo != "" {
from, to, err := parseFromTo(flagFromTo)
if err != nil {
lg.Fatal().Err(err).Msg("could not parse from_to")
}

lg.Info().Msgf("verifying range from %d to %d", from, to)
err = verifier.VerifyRange(lockManager, from, to, chainID, flagDatadir, flagChunkDataPackDir, flagWorkerCount, flagStopOnMismatch, flagtransactionFeesDisabled, flagScheduledTransactionsEnabled)
totalStats, err = verifier.VerifyRange(
lockManager,
from,
to,
chainID,
flagDatadir,
flagChunkDataPackDir,
flagWorkerCount,
flagStopOnMismatch,
flagtransactionFeesDisabled,
flagScheduledTransactionsEnabled,
)
if err != nil {
lg.Fatal().Err(err).Msgf("could not verify range from %d to %d", from, to)
}
lg.Info().Msgf("finished verified range from %d to %d", from, to)
lg.Info().Msgf("finished verifying range from %d to %d", from, to)
} else {
lg.Info().Msgf("verifying last %d sealed blocks", flagLastK)
err := verifier.VerifyLastKHeight(lockManager, flagLastK, chainID, flagDatadir, flagChunkDataPackDir, flagWorkerCount, flagStopOnMismatch, flagtransactionFeesDisabled, flagScheduledTransactionsEnabled)
var err error
totalStats, err = verifier.VerifyLastKHeight(
lockManager,
flagLastK,
chainID,
flagDatadir,
flagChunkDataPackDir,
flagWorkerCount,
flagStopOnMismatch,
flagtransactionFeesDisabled,
flagScheduledTransactionsEnabled,
)
if err != nil {
lg.Fatal().Err(err).Msg("could not verify last k height")
}

lg.Info().Msgf("finished verified last %d sealed blocks", flagLastK)
lg.Info().Msgf("finished verifying last %d sealed blocks", flagLastK)
}

lg.Info().Msgf("matching chunks: %d/%d. matching transactions: %d/%d",
totalStats.MatchedChunkCount,
totalStats.MatchedChunkCount+totalStats.MismatchedChunkCount,
totalStats.MatchedTransactionCount,
totalStats.MatchedTransactionCount+totalStats.MismatchedTransactionCount,
)
}

func parseFromTo(fromTo string) (from, to uint64, err error) {
Expand Down
181 changes: 147 additions & 34 deletions engine/verification/verifier/verifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ func VerifyLastKHeight(
stopOnMismatch bool,
transactionFeesDisabled bool,
scheduledTransactionsEnabled bool,
) (err error) {
) (
totalStats BlockVerificationStats,
err error,
) {
closer, storages, chunkDataPacks, state, verifier, err := initStorages(lockManager, chainID, protocolDataDir, chunkDataPackDir, transactionFeesDisabled, scheduledTransactionsEnabled)
if err != nil {
return fmt.Errorf("could not init storages: %w", err)
return BlockVerificationStats{}, fmt.Errorf("could not init storages: %w", err)
}
defer func() {
closerErr := closer()
Expand All @@ -55,14 +58,18 @@ func VerifyLastKHeight(

lastSealed, err := state.Sealed().Head()
if err != nil {
return fmt.Errorf("could not get last sealed height: %w", err)
return BlockVerificationStats{}, fmt.Errorf("could not get last sealed height: %w", err)
}

root := state.Params().SealedRoot().Height

// preventing overflow
if k > lastSealed.Height+1 {
return fmt.Errorf("k is greater than the number of sealed blocks, k: %d, last sealed height: %d", k, lastSealed.Height)
return BlockVerificationStats{}, fmt.Errorf(
"k is greater than the number of sealed blocks, k: %d, last sealed height: %d",
k,
lastSealed.Height,
)
}

from := lastSealed.Height - k + 1
Expand All @@ -78,12 +85,23 @@ func VerifyLastKHeight(

log.Info().Msgf("verifying blocks from %d to %d", from, to)

err = verifyConcurrently(from, to, nWorker, stopOnMismatch, storages.Headers, chunkDataPacks, storages.Results, state, verifier, verifyHeight)
totalStats, err = verifyConcurrently(
from,
to,
nWorker,
stopOnMismatch,
storages.Headers,
chunkDataPacks,
storages.Results,
state,
verifier,
verifyHeight,
)
if err != nil {
return err
return totalStats, err
}

return nil
return totalStats, nil
}

// VerifyRange verifies all chunks in the results of the blocks in the given range.
Expand All @@ -97,10 +115,13 @@ func VerifyRange(
stopOnMismatch bool,
transactionFeesDisabled bool,
scheduledTransactionsEnabled bool,
) (err error) {
) (
totalStats BlockVerificationStats,
err error,
) {
closer, storages, chunkDataPacks, state, verifier, err := initStorages(lockManager, chainID, protocolDataDir, chunkDataPackDir, transactionFeesDisabled, scheduledTransactionsEnabled)
if err != nil {
return fmt.Errorf("could not init storages: %w", err)
return BlockVerificationStats{}, fmt.Errorf("could not init storages: %w", err)
}
defer func() {
closerErr := closer()
Expand All @@ -114,15 +135,30 @@ func VerifyRange(
root := state.Params().SealedRoot().Height

if from <= root {
return fmt.Errorf("cannot verify blocks before the root block, from: %d, root: %d", from, root)
return BlockVerificationStats{}, fmt.Errorf(
"cannot verify blocks before the root block, from: %d, root: %d",
from,
root,
)
}

err = verifyConcurrently(from, to, nWorker, stopOnMismatch, storages.Headers, chunkDataPacks, storages.Results, state, verifier, verifyHeight)
totalStats, err = verifyConcurrently(
from,
to,
nWorker,
stopOnMismatch,
storages.Headers,
chunkDataPacks,
storages.Results,
state,
verifier,
verifyHeight,
)
if err != nil {
return err
return totalStats, err
}

return nil
return totalStats, nil
}

func verifyConcurrently(
Expand All @@ -134,17 +170,29 @@ func verifyConcurrently(
results storage.ExecutionResults,
state protocol.State,
verifier module.ChunkVerifier,
verifyHeight func(uint64, storage.Headers, storage.ChunkDataPacks, storage.ExecutionResults, protocol.State, module.ChunkVerifier, bool) error,
) error {
verifyHeight func(
height uint64,
headers storage.Headers,
chunkDataPacks storage.ChunkDataPacks,
results storage.ExecutionResults,
state protocol.State,
verifier module.ChunkVerifier,
stopOnMismatch bool,
) (BlockVerificationStats, error),
) (BlockVerificationStats, error) {
Comment thread
coderabbitai[bot] marked this conversation as resolved.

tasks := make(chan uint64, int(nWorker))
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Ensure cancel is called to release resources

var lowestErr error
var lowestErrHeight = ^uint64(0) // Initialize to max value of uint64
var mu sync.Mutex // To protect access to lowestErr and lowestErrHeight
var (
lowestErr error
lowestErrHeight = ^uint64(0) // Initialize to max value of uint64
totalStats BlockVerificationStats
mu sync.Mutex // To protect access to variables above and blocksStats
)

lg := util.LogProgress(
logProgress := util.LogProgress(
log.Logger,
util.DefaultLogProgressConfig(
fmt.Sprintf("verifying heights progress for [%v:%v]", from, to),
Expand All @@ -162,27 +210,45 @@ func verifyConcurrently(
if !ok {
return // Exit if the tasks channel is closed
}

log.Info().Uint64("height", height).Msg("verifying height")
err := verifyHeight(height, headers, chunkDataPacks, results, state, verifier, stopOnMismatch)

blockStats, err := verifyHeight(
height,
headers,
chunkDataPacks,
results,
state,
verifier,
stopOnMismatch,
)

mu.Lock()

totalStats.MatchedChunkCount += blockStats.MatchedChunkCount
totalStats.MismatchedChunkCount += blockStats.MismatchedChunkCount
totalStats.MatchedTransactionCount += blockStats.MatchedTransactionCount
totalStats.MismatchedTransactionCount += blockStats.MismatchedTransactionCount

if err != nil {
log.Error().Uint64("height", height).Err(err).Msg("error encountered while verifying height")

// when encountered an error, the error might not be from the lowest height that had
// error, so we need to first cancel the context to stop worker from processing further tasks
// and wait until all workers are done, which will ensure all the heights before this height
// that had error are processed. Then we can safely update the lowestErr and lowestErrHeight
mu.Lock()
if height < lowestErrHeight {
lowestErr = err
lowestErrHeight = height
cancel() // Cancel context to stop further task dispatch
}
mu.Unlock()
} else {
log.Info().Uint64("height", height).Msg("verified height successfully")
}

lg(1) // log progress
mu.Unlock()

logProgress(1)
}
}
}
Expand Down Expand Up @@ -215,10 +281,10 @@ func verifyConcurrently(
// Check if there was an error
if lowestErr != nil {
log.Error().Uint64("height", lowestErrHeight).Err(lowestErr).Msg("error encountered while verifying height")
return fmt.Errorf("could not verify height %d: %w", lowestErrHeight, lowestErr)
return totalStats, fmt.Errorf("could not verify height %d: %w", lowestErrHeight, lowestErr)
}

return nil
return totalStats, nil
}

func initStorages(
Expand Down Expand Up @@ -273,6 +339,13 @@ func initStorages(
return closer, storages, chunkDataPacks, state, verifier, nil
}

type BlockVerificationStats struct {
MatchedChunkCount uint64
MismatchedChunkCount uint64
MatchedTransactionCount uint64
MismatchedTransactionCount uint64
}

// verifyHeight verifies all chunks in the results of the block at the given height.
// Note: it returns nil if the block is not executed.
func verifyHeight(
Expand All @@ -283,10 +356,13 @@ func verifyHeight(
state protocol.State,
verifier module.ChunkVerifier,
stopOnMismatch bool,
) error {
) (
stats BlockVerificationStats,
err error,
) {
header, err := headers.ByHeight(height)
if err != nil {
return fmt.Errorf("could not get block header by height %d: %w", height, err)
return BlockVerificationStats{}, fmt.Errorf("could not get block header by height %d: %w", height, err)
}

blockID := header.ID()
Expand All @@ -295,34 +371,71 @@ func verifyHeight(
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
log.Warn().Uint64("height", height).Hex("block_id", blockID[:]).Msg("execution result not found")
return nil
return BlockVerificationStats{}, nil
}

return fmt.Errorf("could not get execution result by block ID %s: %w", blockID, err)
return BlockVerificationStats{}, fmt.Errorf("could not get execution result by block ID %s: %w", blockID, err)
}
snapshot := state.AtBlockID(blockID)

for i, chunk := range result.Chunks {
chunkDataPack, err := chunkDataPacks.ByChunkID(chunk.ID())
if err != nil {
return fmt.Errorf("could not get chunk data pack by chunk ID %s: %w", chunk.ID(), err)
return BlockVerificationStats{}, fmt.Errorf("could not get chunk data pack by chunk ID %s: %w", chunk.ID(), err)
}

vcd, err := convert.FromChunkDataPack(chunk, chunkDataPack, header, snapshot, result)
if err != nil {
return err
return BlockVerificationStats{}, err
}

chunkTransactionCount := vcd.Chunk.NumberOfTransactions

_, err = verifier.Verify(vcd)
if err != nil {
var collectionID flow.Identifier
if chunkDataPack.Collection != nil {
collectionID = chunkDataPack.Collection.ID()
}

if stopOnMismatch {
return fmt.Errorf("could not verify chunk (index: %v) at block %v (%v): %w", i, height, blockID, err)
return BlockVerificationStats{
MismatchedChunkCount: 1,
MismatchedTransactionCount: chunkTransactionCount,
}, fmt.Errorf(
"could not verify chunk (index: %v, ID: %v) at block %v (%v): %w",
i,
collectionID,
height,
blockID,
err,
)
}

if vcd.IsSystemChunk {
log.Warn().Err(err).Msgf(
"could not verify system chunk (index: %v, ID: %v) at block %v (%v)",
i, collectionID, height, blockID,
)
} else {

log.Error().Err(err).Msgf(
"could not verify chunk (index: %v, ID: %v) at block %v (%v)",
i, collectionID, height, blockID,
)
}

log.Error().Err(err).Msgf("could not verify chunk (index: %v) at block %v (%v)", i, height, blockID)
stats.MismatchedChunkCount++
stats.MismatchedTransactionCount += chunkTransactionCount
} else {
log.Info().Msgf("verified chunk (index: %v) at block %v (%v) successfully", i, height, blockID)

stats.MatchedChunkCount++
stats.MatchedTransactionCount += chunkTransactionCount
}
}
return nil

return stats, nil
}

func makeVerifier(
Expand Down
Loading
Loading