Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 64 additions & 16 deletions cmd/ftsb_redisearch/cmd_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,53 @@ type processor struct {
clusterTopo radix.ClusterTopo
}

func (p *processor) Init(workerNumber int, _ bool, totalWorkers int) {
var err error = nil
// getDialOpts returns the common dial options for connections
func getDialOpts() []radix.DialOpt {
opts := make([]radix.DialOpt, 0)
if password != "" {
opts = append(opts, radix.DialAuthPass(password))
}
opts = append(opts, radix.DialTimeout(timeout))
return opts
}

customConnFunc := func(network, addr string) (radix.Conn, error) {
return radix.Dial(network, addr, opts...,
)
// getCustomConnFunc returns a ConnFunc using the common dial options
func getCustomConnFunc() func(network, addr string) (radix.Conn, error) {
opts := getDialOpts()
return func(network, addr string) (radix.Conn, error) {
return radix.Dial(network, addr, opts...)
}
}

// createPool creates a new radix.Pool with the standard configuration
func createPool() (*radix.Pool, error) {
customConnFunc := getCustomConnFunc()
return radix.NewPool("tcp", host, 1, radix.PoolConnFunc(customConnFunc), radix.PoolPipelineWindow(0, 0), radix.PoolPingInterval(1*time.Hour))
}

// reconnectPool closes the existing pool (if any) and creates a new one.
// Returns the new pool or logs a fatal error if reconnection fails.
func (p *processor) reconnectPool() {
if p.vanillaClient != nil {
p.vanillaClient.Close()
}
var err error
p.vanillaClient, err = createPool()
if err != nil {
if continueOnErr {
log.Printf("Error reconnecting to Redis: %v", err)
} else {
log.Fatalf("Fatal error reconnecting to Redis: %v", err)
}
} else {
log.Println("Successfully reconnected to Redis after error")
}
}

func (p *processor) Init(workerNumber int, _ bool, totalWorkers int) {
var err error = nil

customConnFunc := getCustomConnFunc()

// this cluster will use the ClientFunc to create a pool to each node in the
// cluster.
Expand All @@ -59,7 +94,7 @@ func (p *processor) Init(workerNumber int, _ bool, totalWorkers int) {
// add randomness on ping interval
//pingInterval := (20+rand.Intn(10))*1000000000
// We dont want PING to be issed from 5 to 5 seconds given that we know the connection is alive on the benchmark
p.vanillaClient, err = radix.NewPool("tcp", host, 1, radix.PoolConnFunc(customConnFunc), radix.PoolPipelineWindow(0, 0), radix.PoolPingInterval(1*time.Hour))
p.vanillaClient, err = createPool()
if err != nil {
log.Fatalf("Error preparing for redisearch ingestion, while creating new pool. error = %v", err)
}
Expand Down Expand Up @@ -116,10 +151,17 @@ func connectionProcessor(p *processor, rateLimiter *rate.Limiter, useRateLimiter
time.Sleep(r.Delay())
}
if !clusterMode {
cmdSlots[slotP], timesSlots[slotP] = sendFlatCmd(p, p.vanillaClient, cmdType, cmdQueryId, cmd, docFields, bytelen, cmdSlots[slotP], replies, timesSlots[slotP])
var hadError bool
cmdSlots[slotP], timesSlots[slotP], hadError = sendFlatCmd(p, p.vanillaClient, cmdType, cmdQueryId, cmd, docFields, bytelen, cmdSlots[slotP], replies, timesSlots[slotP])
if hadError && continueOnErr {
// Reconnect to get a fresh connection after an error.
// This prevents hanging on a broken/half-closed connection
// (e.g., after OOM errors where the server may close the connection).
p.reconnectPool()
}
} else {
client, _ := p.vanillaCluster.Client(clusterAddr[slotP])
cmdSlots[slotP], timesSlots[slotP] = sendFlatCmd(p, client, cmdType, cmdQueryId, cmd, docFields, bytelen, cmdSlots[slotP], replies, timesSlots[slotP])
cmdSlots[slotP], timesSlots[slotP], _ = sendFlatCmd(p, client, cmdType, cmdQueryId, cmd, docFields, bytelen, cmdSlots[slotP], replies, timesSlots[slotP])
}
}
p.wg.Done()
Expand All @@ -140,7 +182,7 @@ func getRxLen(v interface{}) (res uint64) {
return
}

func sendFlatCmd(p *processor, client radix.Client, cmdType, cmdQueryId, cmd string, docfields []string, txBytesCount uint64, cmds []radix.CmdAction, replies []interface{}, times []time.Time) ([]radix.CmdAction, []time.Time) {
func sendFlatCmd(p *processor, client radix.Client, cmdType, cmdQueryId, cmd string, docfields []string, txBytesCount uint64, cmds []radix.CmdAction, replies []interface{}, times []time.Time) ([]radix.CmdAction, []time.Time, bool) {
var err error = nil
var rcv interface{}
rxBytesCount := uint64(0)
Expand All @@ -149,12 +191,13 @@ func sendFlatCmd(p *processor, client radix.Client, cmdType, cmdQueryId, cmd str
replies = append(replies, rcv)
start := time.Now()
times = append(times, start)
cmds, times = sendIfRequired(p, client, cmdType, cmdQueryId, cmds, err, times, rxBytesCount, replies, txBytesCount)
return cmds, times
cmds, times, hadError := sendIfRequired(p, client, cmdType, cmdQueryId, cmds, err, times, rxBytesCount, replies, txBytesCount)
return cmds, times, hadError
}

func sendIfRequired(p *processor, client radix.Client, cmdType string, cmdQueryId string, cmds []radix.CmdAction, err error, times []time.Time, rxBytesCount uint64, replies []interface{}, txBytesCount uint64) ([]radix.CmdAction, []time.Time) {
func sendIfRequired(p *processor, client radix.Client, cmdType string, cmdQueryId string, cmds []radix.CmdAction, err error, times []time.Time, rxBytesCount uint64, replies []interface{}, txBytesCount uint64) ([]radix.CmdAction, []time.Time, bool) {
cmdLen := len(cmds)
hadError := false
if cmdLen >= pipeline {
if cmdLen == 1 {
// if pipeline is 1 no need to pipeline
Expand All @@ -163,10 +206,9 @@ func sendIfRequired(p *processor, client radix.Client, cmdType string, cmdQueryI
err = client.Do(radix.Pipeline(cmds...))
}
endT := time.Now()
hasError := false
isTimeout := false
if err != nil {
hasError = true
hadError = true

// Always log the error
if continueOnErr {
Expand All @@ -186,15 +228,15 @@ func sendIfRequired(p *processor, client radix.Client, cmdType string, cmdQueryI
took := uint64(duration.Microseconds())
rcv := replies[pos]
rxBytesCount += getRxLen(rcv)
stat := benchmark_runner.NewStat().AddEntry([]byte(cmdType), []byte(cmdQueryId), uint64(t.Unix()), took, hasError, isTimeout, txBytesCount, rxBytesCount)
stat := benchmark_runner.NewStat().AddEntry([]byte(cmdType), []byte(cmdQueryId), uint64(t.Unix()), took, hadError, isTimeout, txBytesCount, rxBytesCount)
p.cmdChan <- *stat
}
cmds = nil
cmds = make([]radix.CmdAction, 0, 0)
times = nil
times = make([]time.Time, 0, 0)
}
return cmds, times
return cmds, times, hadError
}

// ProcessBatch reads eventsBatches which contain rows of databuild for FT.ADD redis command string
Expand Down Expand Up @@ -228,6 +270,12 @@ func (p *processor) ProcessBatch(b benchmark_runner.Batch, doLoad bool, rateLimi
}

func (p *processor) Close(_ bool) {
if p.vanillaClient != nil {
p.vanillaClient.Close()
}
if p.vanillaCluster != nil {
p.vanillaCluster.Close()
}
}

func preProcessCmd(row string) (cmdType string, cmdQueryId string, keyPos int, cmd string, key string, clusterSlot int, args []string, bytelen uint64, err error) {
Expand Down
Loading