Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

"github.com/onflow/flow-go/fvm/systemcontracts"
"github.com/onflow/flow-go/model/access"
"github.com/onflow/flow-go/model/access/systemcollection"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/state_synchronization/indexer/extended/events"
Expand Down Expand Up @@ -53,7 +54,9 @@
store storage.ScheduledTransactionsIndexBootstrapper
metrics module.ExtendedIndexingMetrics

scheduledExecutorAddr flow.Address
// executorAddr resolves the executor authorizer address for a given block height.
// v0 system collections use FlowServiceAccount; v1 uses ScheduledTransactionExecutor.
executorAddr *access.Versioned[flow.Address]

scheduledEventType flow.EventType
pendingExecutionType flow.EventType
Expand Down Expand Up @@ -113,12 +116,24 @@
scheduler := sc.FlowTransactionScheduler
prefix := fmt.Sprintf("A.%s.%s.", scheduler.Address.Hex(), scheduler.Name)

// Build a height-versioned executor address that matches the system collection builder versions.
// v0 uses FlowServiceAccount as the executor authorizer; v1 uses ScheduledTransactionExecutor.
versionMapper, ok := systemcollection.ChainHeightVersions[chainID]
if !ok {
versionMapper = access.NewStaticHeightVersionMapper(access.LatestBoundary)
}
executorAddr := access.NewVersioned(map[access.Version]flow.Address{
systemcollection.Version0: sc.FlowServiceAccount.Address,

Check failure on line 126 in module/state_synchronization/indexer/extended/scheduled_transactions.go

View workflow job for this annotation

GitHub Actions / Lint (./)

File is not properly formatted (goimports)
systemcollection.Version1: sc.ScheduledTransactionExecutor.Address,
access.VersionLatest: sc.ScheduledTransactionExecutor.Address,
}, versionMapper)

return &ScheduledTransactions{
log: log.With().Str("component", "scheduled_tx_indexer").Logger(),
store: store,
metrics: metrics,
requester: NewScheduledTransactionRequester(scriptExecutor, chainID),
scheduledExecutorAddr: sc.ScheduledTransactionExecutor.Address,
log: log.With().Str("component", "scheduled_tx_indexer").Logger(),
store: store,
metrics: metrics,
requester: NewScheduledTransactionRequester(scriptExecutor, chainID),
executorAddr: executorAddr,
scheduledEventType: flow.EventType(prefix + "Scheduled"),
pendingExecutionType: flow.EventType(prefix + "PendingExecution"),
executedEventType: flow.EventType(prefix + "Executed"),
Expand Down Expand Up @@ -384,13 +399,9 @@
// start searching from the system transaction that adds the scheduled transactions into the
// system collection to reduce overhead.
for _, tx := range data.Transactions[*pendingEventTxIndex:] {
if !s.isExecutorTransaction(tx) {
if !s.isExecutorTransaction(tx, data.Header.Height) {
continue
}
// the executor transaction must have a scheduled tx ID argument.
if len(tx.Arguments) < 1 {
return nil, fmt.Errorf("executor transaction %s has no scheduled tx ID argument", tx.ID())
}

id, err := decodeScheduledTxIDArg(tx.Arguments[0])
if err != nil {
Expand Down Expand Up @@ -424,11 +435,14 @@
}

// isExecutorTransaction returns true if the transaction was submitted by the scheduled executor
// account: sole authorizer is the scheduled executor address and payer is the empty address.
func (s *ScheduledTransactions) isExecutorTransaction(tx *flow.TransactionBody) bool {
// account for the given block height: sole authorizer matches the height-appropriate executor
// address, payer is the empty address, and the transaction has at least one argument (the
// scheduled tx ID).
func (s *ScheduledTransactions) isExecutorTransaction(tx *flow.TransactionBody, height uint64) bool {
return tx.Payer == flow.EmptyAddress &&
len(tx.Authorizers) == 1 &&
tx.Authorizers[0] == s.scheduledExecutorAddr
len(tx.Arguments) >= 1 &&
tx.Authorizers[0] == s.executorAddr.ByHeight(height)
}
Comment on lines 437 to 446
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Look for v0 system collection builder and enumerate transactions it emits,
# focusing on any that are authorized by the service account and carry arguments.
fd -t f 'system_collection' model/access/systemcollection/
echo "---- v0 builder ----"
fd -t f 'builder' model/access/systemcollection/ | xargs -r rg -n -C2 'Authorizers|Arguments|ServiceAccount|Payer'
echo "---- cross-reference ----"
rg -nP --type=go -C3 'AddAuthorizer|SetPayer' -g '!**/*_test.go' model/

Repository: onflow/flow-go

Length of output: 3928


🏁 Script executed:

#!/bin/bash
# Examine the full implementations of system transactions in v0
echo "=== ProcessCallbacksTransaction (v0) ==="
sed -n '70,85p' model/access/systemcollection/system_collection_v0.go

echo "=== ProcessScheduledTransactionsTransaction (v0) ==="
sed -n '40,51p' model/access/systemcollection/system_collection_v0.go

echo "=== SystemChunkTransaction (v0) ==="
sed -n '145,160p' model/access/systemcollection/system_collection_v0.go

echo "=== Check if Payer is ever set in v0 builders ==="
rg -n 'SetPayer' model/access/systemcollection/system_collection_v0.go

echo "=== Executor address initialization ==="
rg -n 'executorAddr' module/state_synchronization/indexer/extended/scheduled_transactions.go | head -20

Repository: onflow/flow-go

Length of output: 2170


🏁 Script executed:

#!/bin/bash
# Check the default Payer value in TransactionBodyBuilder
echo "=== TransactionBodyBuilder Build method ==="
rg -n 'type TransactionBodyBuilder' model/flow/transaction_body_builder.go -A 20

echo "=== TransactionBodyBuilder.Build() method ==="
rg -n 'func.*Build.*TransactionBody' model/flow/transaction_body_builder.go -A 10

echo "=== Executor address initialization for v0 vs v1 ==="
sed -n '125,135p' module/state_synchronization/indexer/extended/scheduled_transactions.go

echo "=== FlowServiceAccount address ==="
rg -n 'FlowServiceAccount' systemcontracts/ | head -10

Repository: onflow/flow-go

Length of output: 6130


🏁 Script executed:

#!/bin/bash
# Check the argument decoding logic at line 406
echo "=== Context around line 406 ==="
sed -n '400,420p' module/state_synchronization/indexer/extended/scheduled_transactions.go

echo "=== Full ProcessScheduledTransactions function signature ==="
rg -n 'func.*ProcessScheduledTransactions' module/state_synchronization/indexer/extended/scheduled_transactions.go -A 30 | head -40

Repository: onflow/flow-go

Length of output: 1043


Residual risk is real: ProcessCallbacksTransaction would be misidentified as executor transaction in v0 blocks.

The combination Payer == EmptyAddress && len(Authorizers) == 1 && Authorizers[0] == executor(height) && len(Arguments) >= 1 correctly handles ProcessScheduledTransactionsTransaction (zero args) and SystemChunkTransaction (zero args), and the height-aware executor address correctly maps to FlowServiceAccount in v0 blocks.

However, ProcessCallbacksTransaction in v0 satisfies all four conditions:

  • Payer: EmptyAddress (never set via SetPayer)
  • Authorizers: single FlowServiceAccount.Address
  • Arguments: ≥1 (contains callback event ID)
  • For v0 blocks, executorAddr.ByHeight() resolves to FlowServiceAccount.Address

When misidentified as an executor transaction, the code attempts to decode Arguments[0] as a scheduled transaction ID at line 406. Since ProcessCallbacksTransaction's argument is a callback event ID, the decoding fails and aborts block indexing. This is not theoretical: such transactions exist in v0 blocks.

A discriminator (e.g., script content comparison) is needed to distinguish executor transactions from ProcessCallbacksTransaction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@module/state_synchronization/indexer/extended/scheduled_transactions.go`
around lines 437 - 446, The current isExecutorTransaction(tx
*flow.TransactionBody, height uint64) incorrectly classifies v0
ProcessCallbacksTransaction as an executor transaction; update
isExecutorTransaction to add a discriminator that ensures the tx script/content
matches the executor scheduled-transaction script (or explicitly excludes the
ProcessCallbacksTransaction pattern) before returning true—use tx.Script (or its
byte signature/known hash or specific entrypoint/name in the script) alongside
the existing checks and keep executorAddr.ByHeight(height) usage; alternatively,
implement a helper like isProcessCallbacksTransaction(tx) and return false from
isExecutorTransaction when that helper detects a callback transaction so
argument decoding for scheduled tx IDs is only attempted for true executor
transactions.


// decodeScheduledTxIDArg decodes a JSON-CDC encoded UInt64 argument as a scheduled tx ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
. "github.com/onflow/flow-go/module/state_synchronization/indexer/extended"
)

const scheduledTestHeight = uint64(100)
const (
scheduledTestHeight = uint64(100) // v0 range on testnet (boundary at 290050888)
scheduledTestHeightV1 = uint64(290050900) // v1 range on testnet
)

// TestScheduledTransactionsIndexer_NoEvents verifies that indexing a block with no scheduler
// events stores an empty slice and advances the height.
Expand Down Expand Up @@ -220,9 +223,10 @@ func TestScheduledTransactionsIndexer_FailedTransaction(t *testing.T) {

// Height 2: PendingExecution for tx 42, no Executed event.
// The executor transaction attempted to execute the scheduled tx but failed.
// scheduledTestHeight falls in the v0 range on testnet, so the executor uses FlowServiceAccount.
header2 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeight+1))
pendingEvt := createPendingExecutionEvent(t, sc, 42, 1, 200, 80, owner, "A.xyz.Contract.Handler")
executorTx := makeExecutorTransactionBody(t, sc.ScheduledTransactionExecutor.Address, 42)
executorTx := makeExecutorTransactionBody(t, sc.FlowServiceAccount.Address, 42)
indexScheduledBlock(t, indexer, lm, db, BlockData{
Header: header2,
Events: []flow.Event{pendingEvt},
Expand All @@ -235,6 +239,42 @@ func TestScheduledTransactionsIndexer_FailedTransaction(t *testing.T) {
assert.Equal(t, executorTx.ID(), tx.ExecutedTransactionID)
}

// TestScheduledTransactionsIndexer_FailedTransactionV1 verifies that the failed-tx detection
// path matches executor transactions authorized by ScheduledTransactionExecutor (v1 system
// collection format, used after the version boundary).
func TestScheduledTransactionsIndexer_FailedTransactionV1(t *testing.T) {
t.Parallel()

sc := systemcontracts.SystemContractsForChain(flow.Testnet)
indexer, store, lm, db := newScheduledTxIndexerForTest(t, flow.Testnet, scheduledTestHeightV1)

owner := unittest.RandomAddressFixture()

// Height 1: schedule tx with id=99
header1 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1))
scheduledEvt := createScheduledEvent(t, sc, 99, 1, 3000, 200, 80, owner, "A.xyz.Contract.Handler", 15, "")
indexScheduledBlock(t, indexer, lm, db, BlockData{
Header: header1,
Events: []flow.Event{scheduledEvt},
})

// Height 2: PendingExecution for tx 99, no Executed event.
// v1 uses ScheduledTransactionExecutor.Address as the authorizer.
header2 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1+1))
pendingEvt := createPendingExecutionEvent(t, sc, 99, 1, 200, 80, owner, "A.xyz.Contract.Handler")
executorTx := makeExecutorTransactionBody(t, sc.ScheduledTransactionExecutor.Address, 99)
indexScheduledBlock(t, indexer, lm, db, BlockData{
Header: header2,
Events: []flow.Event{pendingEvt},
Transactions: []*flow.TransactionBody{executorTx},
})

tx, err := store.ByID(99)
require.NoError(t, err)
assert.Equal(t, access.ScheduledTxStatusFailed, tx.Status)
assert.Equal(t, executorTx.ID(), tx.ExecutedTransactionID)
}

// TestScheduledTransactionsIndexer_PendingWithoutExecuted verifies that a PendingExecution event
// without either a matching Executed event or a corresponding executor transaction returns an error.
func TestScheduledTransactionsIndexer_PendingWithoutExecuted(t *testing.T) {
Expand Down Expand Up @@ -601,7 +641,7 @@ func TestScheduledTransactionsIndexer_MixedFailedAndExecuted(t *testing.T) {
pending20 := createPendingExecutionEvent(t, sc, 20, 1, 100, 10, owner, "A.abc.Contract.Handler")
pending21 := createPendingExecutionEvent(t, sc, 21, 1, 150, 10, owner, "A.abc.Contract.Handler")
executed20 := createExecutedEvent(t, sc, 20, 1, 100, owner, "A.abc.Contract.Handler", 20, "")
executorTx21 := makeExecutorTransactionBody(t, sc.ScheduledTransactionExecutor.Address, 21)
executorTx21 := makeExecutorTransactionBody(t, sc.FlowServiceAccount.Address, 21)
indexScheduledBlock(t, indexer, lm, db, BlockData{
Header: header2,
Events: []flow.Event{pending20, pending21, executed20},
Expand Down Expand Up @@ -641,6 +681,88 @@ func TestScheduledTransactionsIndexer_NonExecutorTxSkipped(t *testing.T) {
// The non-executor tx has the wrong payer and should be skipped.
header2 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeight+1))
pendingEvt := createPendingExecutionEvent(t, sc, 30, 1, 100, 10, owner, "A.abc.Contract.Handler")
nonExecutorTx := &flow.TransactionBody{
Payer: unittest.RandomAddressFixture(), // wrong payer
Authorizers: []flow.Address{sc.FlowServiceAccount.Address},
}
executorTx := makeExecutorTransactionBody(t, sc.FlowServiceAccount.Address, 30)
indexScheduledBlock(t, indexer, lm, db, BlockData{
Header: header2,
Events: []flow.Event{pendingEvt},
Transactions: []*flow.TransactionBody{nonExecutorTx, executorTx},
})

tx, err := store.ByID(30)
require.NoError(t, err)
assert.Equal(t, access.ScheduledTxStatusFailed, tx.Status)
assert.Equal(t, executorTx.ID(), tx.ExecutedTransactionID)
}

// TestScheduledTransactionsIndexer_MixedFailedAndExecutedV1 is the v1 counterpart of
// TestScheduledTransactionsIndexer_MixedFailedAndExecuted, using ScheduledTransactionExecutor
// as the executor authorizer.
func TestScheduledTransactionsIndexer_MixedFailedAndExecutedV1(t *testing.T) {
t.Parallel()

sc := systemcontracts.SystemContractsForChain(flow.Testnet)
indexer, store, lm, db := newScheduledTxIndexerForTest(t, flow.Testnet, scheduledTestHeightV1)

owner := unittest.RandomAddressFixture()

// Height 1: schedule txs 20 and 21
header1 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1))
evt20 := createScheduledEvent(t, sc, 20, 1, 1000, 100, 10, owner, "A.abc.Contract.Handler", 20, "")
evt21 := createScheduledEvent(t, sc, 21, 1, 1000, 150, 10, owner, "A.abc.Contract.Handler", 21, "")
indexScheduledBlock(t, indexer, lm, db, BlockData{
Header: header1,
Events: []flow.Event{evt20, evt21},
})

// Height 2: tx 20 succeeds, tx 21 fails (executor tx present, no Executed event)
header2 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1+1))
pending20 := createPendingExecutionEvent(t, sc, 20, 1, 100, 10, owner, "A.abc.Contract.Handler")
pending21 := createPendingExecutionEvent(t, sc, 21, 1, 150, 10, owner, "A.abc.Contract.Handler")
executed20 := createExecutedEvent(t, sc, 20, 1, 100, owner, "A.abc.Contract.Handler", 20, "")
executorTx21 := makeExecutorTransactionBody(t, sc.ScheduledTransactionExecutor.Address, 21)
indexScheduledBlock(t, indexer, lm, db, BlockData{
Header: header2,
Events: []flow.Event{pending20, pending21, executed20},
Transactions: []*flow.TransactionBody{executorTx21},
})

tx20, err := store.ByID(20)
require.NoError(t, err)
assert.Equal(t, access.ScheduledTxStatusExecuted, tx20.Status)

tx21, err := store.ByID(21)
require.NoError(t, err)
assert.Equal(t, access.ScheduledTxStatusFailed, tx21.Status)
assert.Equal(t, executorTx21.ID(), tx21.ExecutedTransactionID)
}

// TestScheduledTransactionsIndexer_NonExecutorTxSkippedV1 is the v1 counterpart of
// TestScheduledTransactionsIndexer_NonExecutorTxSkipped, using ScheduledTransactionExecutor
// as the executor authorizer.
func TestScheduledTransactionsIndexer_NonExecutorTxSkippedV1(t *testing.T) {
t.Parallel()

sc := systemcontracts.SystemContractsForChain(flow.Testnet)
indexer, store, lm, db := newScheduledTxIndexerForTest(t, flow.Testnet, scheduledTestHeightV1)

owner := unittest.RandomAddressFixture()

// Height 1: schedule tx with id=30
header1 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1))
scheduledEvt := createScheduledEvent(t, sc, 30, 1, 1000, 100, 10, owner, "A.abc.Contract.Handler", 30, "")
indexScheduledBlock(t, indexer, lm, db, BlockData{
Header: header1,
Events: []flow.Event{scheduledEvt},
})

// Height 2: PendingExecution for tx 30, a non-executor tx, then the real executor tx.
// The non-executor tx has the wrong payer and should be skipped.
header2 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1+1))
pendingEvt := createPendingExecutionEvent(t, sc, 30, 1, 100, 10, owner, "A.abc.Contract.Handler")
nonExecutorTx := &flow.TransactionBody{
Payer: unittest.RandomAddressFixture(), // wrong payer
Authorizers: []flow.Address{sc.ScheduledTransactionExecutor.Address},
Expand All @@ -658,8 +780,10 @@ func TestScheduledTransactionsIndexer_NonExecutorTxSkipped(t *testing.T) {
assert.Equal(t, executorTx.ID(), tx.ExecutedTransactionID)
}

// TestScheduledTransactionsIndexer_ExecutorTxNoArguments verifies that an executor transaction
// with no arguments returns an error rather than silently skipping the failed tx.
// TestScheduledTransactionsIndexer_ExecutorTxNoArguments verifies that a transaction with the
// correct executor address and payer but no arguments is not treated as an executor transaction.
// This distinguishes executor transactions from other system transactions (e.g. ProcessCallbacksTransaction)
// that share the same authorizer in v0.
func TestScheduledTransactionsIndexer_ExecutorTxNoArguments(t *testing.T) {
t.Parallel()

Expand All @@ -669,19 +793,21 @@ func TestScheduledTransactionsIndexer_ExecutorTxNoArguments(t *testing.T) {

owner := unittest.RandomAddressFixture()
pendingEvt := createPendingExecutionEvent(t, sc, 50, 1, 100, 10, owner, "A.abc.Contract.Handler")
executorTx := &flow.TransactionBody{
// A system transaction with no arguments should not be matched as an executor tx,
// even though it has the right authorizer and payer.
noArgTx := &flow.TransactionBody{
Payer: flow.EmptyAddress,
Authorizers: []flow.Address{sc.ScheduledTransactionExecutor.Address},
Authorizers: []flow.Address{sc.FlowServiceAccount.Address},
Arguments: nil,
}

err := indexScheduledBlockExpectError(t, indexer, lm, db, BlockData{
Header: header,
Events: []flow.Event{pendingEvt},
Transactions: []*flow.TransactionBody{executorTx},
Transactions: []*flow.TransactionBody{noArgTx},
})
require.Error(t, err)
assert.Contains(t, err.Error(), "has no scheduled tx ID argument")
assert.Contains(t, err.Error(), "have no corresponding executor transaction")
}

// TestScheduledTransactionsIndexer_ExecutorTxMalformedArg verifies that an executor transaction
Expand All @@ -701,7 +827,7 @@ func TestScheduledTransactionsIndexer_ExecutorTxMalformedArg(t *testing.T) {
require.NoError(t, encErr)
executorTx := &flow.TransactionBody{
Payer: flow.EmptyAddress,
Authorizers: []flow.Address{sc.ScheduledTransactionExecutor.Address},
Authorizers: []flow.Address{sc.FlowServiceAccount.Address},
Arguments: [][]byte{malformedArg},
}

Expand Down
Loading