Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions tm2/pkg/bft/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,11 @@ func (n *Node) OnStop() {
n.Logger.Error("Error closing private validator", "err", err)
}

// Stop the package-level rpc/core txDispatcher before the event switch so
// its listenRoutine exits via its own Quit channel instead of racing
// evsw.Quit().
rpccore.Stop()

// Stop the non-reactor services
n.evsw.Stop()
n.eventStoreService.Stop()
Expand Down
9 changes: 7 additions & 2 deletions tm2/pkg/bft/rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,13 @@ func (td *txDispatcher) listenRoutine() {
select {
case event, ok := <-td.sub:
if !ok {
td.Stop()
panic("txDispatcher subscription unexpectedly closed")
// The event switch closed our subscription during shutdown
// (see events.SubscribeFilteredOn: the listener callback
// closes the channel when it would otherwise block and
// evsw.Quit() has fired). Stop cleanly rather than panic —
// pending getTxResult waiters will time out normally.
go func() { _ = td.Stop() }()
return
}
txEvent := event.(types.EventTx)
td.notifyTxEvent(txEvent)
Expand Down
129 changes: 129 additions & 0 deletions tm2/pkg/bft/rpc/core/mempool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package core

import (
"testing"
"time"

"github.com/gnolang/gno/tm2/pkg/bft/types"
"github.com/gnolang/gno/tm2/pkg/events"
"github.com/gnolang/gno/tm2/pkg/service"
"github.com/stretchr/testify/require"
)

// TestTxDispatcher_ClosedSubscriptionDoesNotPanic regression-tests the case
// where the event switch closes the txDispatcher's subscription channel on
// shutdown (via events.SubscribeFilteredOn's <-evsw.Quit() branch). The
// listenRoutine used to panic with "txDispatcher subscription unexpectedly
// closed"; it should now exit cleanly.
func TestTxDispatcher_ClosedSubscriptionDoesNotPanic(t *testing.T) {
t.Parallel()

sub := make(chan events.Event)
td := &txDispatcher{
evsw: events.NewEventSwitch(),
listenerID: "test",
sub: sub,
waiters: make(map[string]*txWaiter),
}
td.BaseService = *service.NewBaseService(nil, "txDispatcher", td)
require.NoError(t, td.Start())

done := make(chan struct{})
go func() {
td.Wait()
close(done)
}()

close(sub)

select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("listenRoutine did not exit after subscription closed")
}
}

// TestTxDispatcher_EventSwitchShutdown exercises the exact scenario reported
// in CI: FireEvent continues to invoke listener callbacks after evsw.Stop(),
// and when the unbuffered send would block while evsw.Quit() is closed, the
// subscribe callback closes the subscription channel. The listenRoutine must
// not panic.
func TestTxDispatcher_EventSwitchShutdown(t *testing.T) {
t.Parallel()

evsw := events.NewEventSwitch()
require.NoError(t, evsw.Start())

td := newTxDispatcher(evsw)
t.Cleanup(func() {
if td.IsRunning() {
td.Stop()
}
})

// We need to drive the event switch into a state where its listener
// callback must take the <-evsw.Quit() branch and close the sub
// channel. That requires: (a) listenRoutine not reading from ch when
// the callback runs, and (b) evsw.Quit() already closed by then.
//
// Sequence:
// 1. Lock td.mtx.
// 2. Fire event #1. listenRoutine receives it, then blocks on
// notifyTxEvent's td.mtx.Lock().
// 3. Fire event #2. listenRoutine is blocked, so its callback's
// `ch <- event` send cannot proceed — the callback waits in its
// own select.
// 4. Stop the evsw. evsw.Quit() closes, waking event #2's callback,
// which now picks <-evsw.Quit() and calls close(ch).
// 5. Unlock td.mtx. listenRoutine returns to its select, receives
// from closed ch, sees ok=false. Pre-fix: panic. Post-fix:
// return cleanly and Stop the dispatcher.
tx := types.Tx("stall-tx")

td.mtx.Lock()

firstDone := make(chan struct{})
go func() {
evsw.FireEvent(types.EventTx{Result: types.TxResult{Tx: tx}})
close(firstDone)
}()
// event #1's callback returns as soon as listenRoutine receives.
select {
case <-firstDone:
case <-time.After(2 * time.Second):
t.Fatal("first FireEvent never returned — listenRoutine not receiving")
}
// At this point listenRoutine is inside notifyTxEvent, blocking on
// td.mtx (owned by the test).

secondDone := make(chan struct{})
go func() {
defer close(secondDone)
defer func() { _ = recover() }() // tolerate preexisting SubscribeFilteredOn panics
evsw.FireEvent(types.EventTx{Result: types.TxResult{Tx: tx}})
}()
// Give event #2's callback time to enter its select and start
// blocking on the send (listenRoutine isn't receiving).
time.Sleep(50 * time.Millisecond)

// Stop the evsw now — this wakes event #2's callback onto the Quit
// branch, which calls close(ch).
evsw.Stop()

select {
case <-secondDone:
case <-time.After(2 * time.Second):
t.Fatal("second FireEvent never returned after evsw.Stop")
}

// Release the mutex. listenRoutine finishes event #1 and observes
// the closed subscription.
td.mtx.Unlock()

select {
case <-td.Quit():
// Expected: listenRoutine stopped the dispatcher cleanly.
case <-time.After(2 * time.Second):
t.Fatal("listenRoutine did not exit after subscription closed")
}
}
27 changes: 27 additions & 0 deletions tm2/pkg/bft/rpc/core/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,24 @@ func SetLogger(l *slog.Logger) {
logger = l
}

// SetEventSwitch wires the event switch into rpc/core and (re)creates the
// package-level txDispatcher bound to it.
//
// NOTE: rpc/core holds process-wide singletons (evsw, mempool, blockStore,
// consensusState, gTxDispatcher, …). Running multiple nodes in the same
// process — e.g. INMEMORY_TS integration tests, or in-process tests using
// TestingInMemoryNode with t.Parallel — is not safe: whoever calls the Set*
// functions last owns every global, so RPC handlers like BroadcastTxCommit
// can route through another node's state. Subprocess-isolated test modes
// (commandKindTesting, the default in CI) sidestep this. A proper fix
// requires threading per-node state through the RPC handlers; until then,
// treat rpc/core as single-node-per-process.
func SetEventSwitch(sw events.EventSwitch) {
// A previous node in this process may have left a running dispatcher
// behind; stop it before replacing it to avoid leaking its goroutine.
if gTxDispatcher != nil && gTxDispatcher.IsRunning() {
gTxDispatcher.Stop()
}
evsw = sw
gTxDispatcher = newTxDispatcher(evsw)
}
Expand All @@ -127,6 +144,16 @@ func Start() {
gTxDispatcher.Start()
}

// Stop tears down the package-level resources created by SetEventSwitch.
// It should be called before the associated event switch is stopped so the
// txDispatcher goroutine exits via its own Quit channel rather than racing
// the event switch's shutdown.
func Stop() {
if gTxDispatcher != nil && gTxDispatcher.IsRunning() {
gTxDispatcher.Stop()
}
}

// SetConfig sets an RPCConfig.
func SetConfig(c cfg.RPCConfig) {
config = c
Expand Down
Loading