Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@ module github.com/lyft/gostats
go 1.18

require github.com/kelseyhightower/envconfig v1.4.0

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.10.0
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
64 changes: 47 additions & 17 deletions net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"fmt"
"math"
"math/rand/v2"
"net"
"os"
"strconv"
Expand All @@ -23,9 +24,10 @@ type Logger interface {
}

const (
defaultRetryInterval = time.Second * 3
defaultDialTimeout = defaultRetryInterval / 2
defaultWriteTimeout = time.Second
baseReconnectDelay = 3 * time.Second
maxReconnectDelay = time.Minute
defaultDialTimeout = baseReconnectDelay / 2
defaultWriteTimeout = time.Second

flushInterval = time.Second
logOnEveryNDroppedBytes = 1 << 15 // Log once per 32kb of dropped stats
Expand Down Expand Up @@ -102,7 +104,8 @@ func NewNetSink(opts ...SinkOption) FlushableSink {
log: &loggingSink{writer: os.Stderr, now: time.Now},

// TODO (CEV): auto loading from the env is bad and should be removed.
conf: GetSettings(),
conf: GetSettings(),
reconnectDelay: baseReconnectDelay,
}
for _, opt := range opts {
opt.apply(s)
Expand All @@ -129,15 +132,16 @@ func NewNetSink(opts ...SinkOption) FlushableSink {
}

type netSink struct {
conn net.Conn
outc chan *bytes.Buffer
retryc chan *bytes.Buffer
mu sync.Mutex
bufWriter *bufio.Writer
doFlush chan chan struct{}
droppedBytes uint64
log Logger
conf Settings
conn net.Conn
outc chan *bytes.Buffer
retryc chan *bytes.Buffer
mu sync.Mutex
bufWriter *bufio.Writer
doFlush chan chan struct{}
droppedBytes uint64
log Logger
conf Settings
reconnectDelay time.Duration
}

type sinkWriter struct {
Expand Down Expand Up @@ -282,20 +286,27 @@ func (s *netSink) run() {
for {
if s.conn == nil {
if err := s.connect(addr); err != nil {
s.log.Warnf("connection error: %s", err)

// If the previous reconnect attempt failed, drain the flush
// queue to prevent Flush() from blocking indefinitely.
if reconnectFailed {
s.drainFlushQueue()
}
reconnectFailed = true

// TODO (CEV): don't sleep on the first retry
time.Sleep(defaultRetryInterval)
nextSleep := calculateNextSleep(s.reconnectDelay)

s.log.Warnf("connection error: %s, reconnecting in %s", err, nextSleep)
time.Sleep(nextSleep)
s.reconnectDelay = nextSleep

continue
}

if reconnectFailed {
s.log.Warnf("reconnected to %s", addr)
}
reconnectFailed = false
s.reconnectDelay = baseReconnectDelay
}

// Handle buffers that need to be retried first, if they exist.
Expand Down Expand Up @@ -419,3 +430,22 @@ func (b *buffer) WriteUnit64(val uint64) {
func (b *buffer) WriteFloat64(val float64) {
*b = strconv.AppendFloat(*b, val, 'f', 6, 64)
}

func calculateNextSleep(prevSleep time.Duration) time.Duration {
// Decorrelated Jitter: sleep = min(cap, random_between(base, prev_sleep * 3))
upperBound := prevSleep * 3

var nextSleep time.Duration
if upperBound > baseReconnectDelay {
randomRange := upperBound - baseReconnectDelay
jitter := time.Duration(rand.Int64N(int64(randomRange)))
nextSleep = baseReconnectDelay + jitter
} else {
nextSleep = baseReconnectDelay
}

if nextSleep > maxReconnectDelay {
nextSleep = maxReconnectDelay
}
return nextSleep
}
42 changes: 38 additions & 4 deletions net_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func foreverNow() time.Time {
Expand Down Expand Up @@ -665,12 +667,12 @@ func testNetSinkReconnect(t *testing.T, protocol string) {
// This test is flaky with UDP and the race detector, but good
// to have so we log instead of fail the test.
if protocol == "udp" {
stat := ts.WaitForStat(replaceFatalWithLog{t}, defaultRetryInterval*3)
stat := ts.WaitForStat(replaceFatalWithLog{t}, baseReconnectDelay*5)
if stat != "" && stat != expected {
t.Fatalf("stats got: %q want: %q", stat, expected)
}
} else {
stat := ts.WaitForStat(t, defaultRetryInterval*3)
stat := ts.WaitForStat(t, baseReconnectDelay*5)
if stat != expected {
t.Fatalf("stats got: %q want: %q", stat, expected)
}
Expand Down Expand Up @@ -719,7 +721,7 @@ func testNetSinkReconnectFailure(t *testing.T, protocol string) {
select {
case <-flushed:
// Ok
case <-time.After(defaultRetryInterval * 2):
case <-time.After(baseReconnectDelay * 5):
t.Fatalf("Only %d of %d Flush() calls succeeded",
atomic.LoadInt64(flushCount), N)
}
Expand Down Expand Up @@ -841,7 +843,7 @@ func testNetSinkIntegration(t *testing.T, protocol string) {
if err != nil {
t.Fatal(err)
}
case <-time.After(defaultRetryInterval * 2):
case <-time.After(baseReconnectDelay * 5):
t.Fatal("Timed out waiting for command to exit")
}
})
Expand Down Expand Up @@ -878,3 +880,35 @@ func BenchmarkFlushTimer(b *testing.B) {
sink.FlushTimer("TestTImer.___f=i.__tag1=v1", float64(i)/3)
}
}

func TestCalculateNextSleep(t *testing.T) {
t.Parallel()

// 1. Test that the sleep duration is within the expected bounds.
// Run it a few times to get some randomness.
for i := 0; i < 100; i++ {
// Start with the base delay
d := calculateNextSleep(baseReconnectDelay)
assert.GreaterOrEqual(t, d, baseReconnectDelay)
assert.Less(t, d, baseReconnectDelay*3)

// Try with a larger previous delay
prev := 10 * time.Second
d = calculateNextSleep(prev)
assert.GreaterOrEqual(t, d, baseReconnectDelay)
assert.Less(t, d, prev*3)
}

// 2. Test the cap
d := calculateNextSleep(maxReconnectDelay)
assert.GreaterOrEqual(t, d, baseReconnectDelay)
assert.LessOrEqual(t, d, maxReconnectDelay)

// 3. Test edge case where prevSleep * 3 <= base.
// It should default back to the base.
d = calculateNextSleep(baseReconnectDelay / 3)
assert.Equal(t, baseReconnectDelay, d)

d = calculateNextSleep(0)
assert.Equal(t, baseReconnectDelay, d)
}
2 changes: 1 addition & 1 deletion net_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (s *netTestSink) Restart(t testing.TB, resetBuffer bool) {
func (s *netTestSink) WaitForStat(t testing.TB, timeout time.Duration) string {
t.Helper()
if timeout <= 0 {
timeout = defaultRetryInterval * 2
timeout = baseReconnectDelay * 5
}
to := time.NewTimer(timeout)
defer to.Stop()
Expand Down
Loading