Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 0 additions & 90 deletions pkg/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ const (

// peerLimit defines limit of number of peers returned during active peer discovery.
peerLimit = 60

// seedReconnectBackoff is the initial backoff when reconnecting to a disconnected seed peer.
seedReconnectBackoff = 1 * time.Second

// seedReconnectMaxBackoff is the maximum backoff for seed peer reconnection attempts.
seedReconnectMaxBackoff = 30 * time.Second
)

// Client is a P2P client, implemented with libp2p.
Expand All @@ -62,11 +56,6 @@ type Client struct {
ps *pubsub.PubSub
started bool

ctx context.Context
cancel context.CancelFunc

seedPeers []peer.AddrInfo

metrics *Metrics
}

Expand Down Expand Up @@ -151,7 +140,6 @@ func (c *Client) Start(ctx context.Context) error {

func (c *Client) startWithHost(ctx context.Context, h host.Host) error {
c.host = h
c.ctx, c.cancel = context.WithCancel(ctx)
for _, a := range c.host.Addrs() {
c.logger.Info().Str("address", fmt.Sprintf("%s/p2p/%s", a, c.host.ID())).Msg("listening on address")
}
Expand Down Expand Up @@ -182,17 +170,11 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error {
}

c.started = true

c.host.Network().Notify(c.newDisconnectNotifee())

return nil
}

// Close gently stops Client.
func (c *Client) Close() error {
if c.cancel != nil {
c.cancel()
}
var err error
if c.dht != nil {
err = errors.Join(err, c.dht.Close())
Expand Down Expand Up @@ -263,77 +245,6 @@ func (c *Client) Peers() []PeerConnection {
return res
}

type disconnectNotifee struct {
c *Client
}

func (n disconnectNotifee) Connected(_ network.Network, conn network.Conn) {
p := conn.RemotePeer()
for _, sp := range n.c.seedPeers {
if sp.ID == p {
n.c.logger.Info().Str("peer", p.String()).Msg("connected to seed peer")
return
}
}
}
func (n disconnectNotifee) OpenedStream(_ network.Network, _ network.Stream) {}
func (n disconnectNotifee) ClosedStream(_ network.Network, _ network.Stream) {}
func (n disconnectNotifee) Listen(_ network.Network, _ multiaddr.Multiaddr) {}
func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) {}

func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) {
p := conn.RemotePeer()
for _, sp := range n.c.seedPeers {
if sp.ID == p {
n.c.logger.Warn().Str("peer", p.String()).Msg("disconnected from seed peer, scheduling reconnect")
go n.c.reconnectSeedPeer(sp)
return
}
}
}

func (c *Client) reconnectSeedPeer(sp peer.AddrInfo) {
backoff := seedReconnectBackoff
for {
if c.ctx.Err() != nil {
return
}
if c.isConnected(sp.ID) {
return
}

err := c.host.Connect(c.ctx, sp)
if err == nil {
c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnected to seed peer")
return
}
if c.ctx.Err() != nil {
return
}

c.logger.Debug().Str("peer", sp.ID.String()).Dur("backoff", backoff).Err(err).Msg("failed to reconnect to seed peer, retrying")
select {
case <-c.ctx.Done():
return
case <-time.After(backoff):
}

backoff *= 2
if backoff > seedReconnectMaxBackoff {
backoff = seedReconnectMaxBackoff
}
}
}

func (c *Client) newDisconnectNotifee() disconnectNotifee {
return disconnectNotifee{c: c}
}

// isConnected returns true if there is an active connection to the given peer.
func (c *Client) isConnected(id peer.ID) bool {
return c.host.Network().Connectedness(id) == network.Connected
}

func (c *Client) listen() (host.Host, error) {
maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress)
if err != nil {
Expand All @@ -345,7 +256,6 @@ func (c *Client) listen() (host.Host, error) {

func (c *Client) setupDHT(ctx context.Context) error {
peers := c.parseAddrInfoList(c.conf.Peers)
c.seedPeers = peers
if len(peers) == 0 {
c.logger.Info().Msg("no peers - only listening for connections")
}
Expand Down
98 changes: 0 additions & 98 deletions pkg/p2p/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,104 +278,6 @@ func waitForCondition(timeout time.Duration, conditionFunc func() bool) error {
}
}

func TestSeedPeerReconnect(t *testing.T) {
require := require.New(t)
assert := assert.New(t)
logger := zerolog.Nop()

mn := mocknet.New()
defer mn.Close()

seedKey, err := key.GenerateNodeKey()
require.NoError(err)
seedAddr, err := getAddr(seedKey.PrivKey)
require.NoError(err)
seedHost, err := mn.AddPeer(seedKey.PrivKey, seedAddr)
require.NoError(err)

clientKey, err := key.GenerateNodeKey()
require.NoError(err)
clientAddr, err := getAddr(clientKey.PrivKey)
require.NoError(err)
clientHost, err := mn.AddPeer(clientKey.PrivKey, clientAddr)
require.NoError(err)

seedAddrStr := seedHost.Addrs()[0].String() + "/p2p/" + seedHost.ID().String()
conf := config.P2PConfig{Peers: seedAddrStr}

client, err := NewClient(conf, clientKey.PrivKey, dssync.MutexWrap(datastore.NewMapDatastore()), "test-chain", logger, NopMetrics())
require.NoError(err)
require.NotNil(client)

err = mn.LinkAll()
require.NoError(err)
err = mn.ConnectAllButSelf()
require.NoError(err)

ctx := t.Context()
err = client.startWithHost(ctx, clientHost)
require.NoError(err)
defer client.Close()

err = waitForCondition(2*time.Second, func() bool {
return client.isConnected(seedHost.ID())
})
require.NoError(err, "client should connect to seed peer on start")

conns := client.host.Network().ConnsToPeer(seedHost.ID())
for _, conn := range conns {
conn.Close()
}
client.host.Network().ClosePeer(seedHost.ID())

assert.False(client.isConnected(seedHost.ID()), "seed peer should be disconnected")

err = waitForCondition(5*time.Second, func() bool {
return client.isConnected(seedHost.ID())
})
require.NoError(err, "client should reconnect to seed peer after disconnect")
}

func TestSeedPeerReconnectStopsOnClose(t *testing.T) {
require := require.New(t)

mn := mocknet.New()
defer mn.Close()

seedKey, err := key.GenerateNodeKey()
require.NoError(err)
seedAddr, err := getAddr(seedKey.PrivKey)
require.NoError(err)
seedHost, err := mn.AddPeer(seedKey.PrivKey, seedAddr)
require.NoError(err)

clientKey, err := key.GenerateNodeKey()
require.NoError(err)
clientAddr, err := getAddr(clientKey.PrivKey)
require.NoError(err)
clientHost, err := mn.AddPeer(clientKey.PrivKey, clientAddr)
require.NoError(err)

seedAddrStr := seedHost.Addrs()[0].String() + "/p2p/" + seedHost.ID().String()
conf := config.P2PConfig{Peers: seedAddrStr}

client, err := NewClient(conf, clientKey.PrivKey, dssync.MutexWrap(datastore.NewMapDatastore()), "test-chain", zerolog.Nop(), NopMetrics())
require.NoError(err)

err = mn.LinkAll()
require.NoError(err)
err = mn.ConnectAllButSelf()
require.NoError(err)

ctx := t.Context()
err = client.startWithHost(ctx, clientHost)
require.NoError(err)

require.NoError(client.Close())

require.Error(client.ctx.Err(), "client context should be cancelled after Close")
}

func TestClientInfoMethods(t *testing.T) {
require := require.New(t)
assert := assert.New(t)
Expand Down
Loading