Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 30 additions & 54 deletions execution/exchange/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ import (
"github.com/prometheus/prometheus/model/labels"
)

type errorChan chan error

func (c errorChan) getError() error {
for err := range c {
if err != nil {
return err
func drainErrChan(ch chan error) error {
for {
select {
case err := <-ch:
if err != nil {
for range len(ch) {
<-ch
}
return err
}
default:
return nil
}
}

return nil
}

// coalesce is a model.VectorOperator that merges input vectors from multiple downstream operators
Expand All @@ -40,27 +44,24 @@ type coalesce struct {

wg sync.WaitGroup
operators []model.VectorOperator
batchSize int64

// inVectors is an internal per-step cache for references to input vectors.
inVectors [][]model.StepVector
// sampleOffsets holds per-operator offsets needed to map an input sample ID to an output sample ID.
// sampleOffsets maps an input sample ID to an output sample ID.
sampleOffsets []uint64
// seriesCounts holds the number of series per operator for pre-allocation.
seriesCounts []int
// tempBufs are reusable buffers for reading from operators
tempBufs [][]model.StepVector
tempBufs [][]model.StepVector
errChan chan error
}

func NewCoalesce(opts *query.Options, batchSize int64, operators ...model.VectorOperator) model.VectorOperator {
func NewCoalesce(opts *query.Options, operators ...model.VectorOperator) model.VectorOperator {
if len(operators) == 1 {
return operators[0]
}
oper := &coalesce{
sampleOffsets: make([]uint64, len(operators)),
operators: operators,
inVectors: make([][]model.StepVector, len(operators)),
batchSize: batchSize,
errChan: make(chan error, len(operators)),
}

return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
Expand Down Expand Up @@ -96,28 +97,15 @@ func (c *coalesce) Next(ctx context.Context, buf []model.StepVector) (int, error
return 0, err
}

// Allocate temporary buffers on first use.
// Inner slices will be lazily pre-allocated by child operators when they append data.
if c.tempBufs == nil {
c.tempBufs = make([][]model.StepVector, len(c.operators))
for i := range c.tempBufs {
c.tempBufs[i] = make([]model.StepVector, len(buf))
}
}

var mu sync.Mutex
var minTs int64 = math.MaxInt64
var errChan = make(errorChan, len(c.operators))
vectorCounts := make([]int, len(c.operators))

for idx, o := range c.operators {
// We already have a batch from the previous iteration.
if c.inVectors[idx] != nil {
mu.Lock()
if len(c.inVectors[idx]) > 0 {
minTs = min(minTs, c.inVectors[idx][0].T)
}
mu.Unlock()
continue
}

Expand All @@ -126,18 +114,16 @@ func (c *coalesce) Next(ctx context.Context, buf []model.StepVector) (int, error
defer c.wg.Done()
defer func() {
if r := recover(); r != nil {
errChan <- errors.Newf("unexpected panic: %v", r)
c.errChan <- errors.Newf("unexpected panic: %v", r)
}
}()

n, err := o.Next(ctx, c.tempBufs[opIdx])
if err != nil {
errChan <- err
c.errChan <- err
return
}
vectorCounts[opIdx] = n

// Map input IDs to output IDs.
for i := range n {
vector := &c.tempBufs[opIdx][i]
for j := range vector.SampleIDs {
Expand All @@ -150,35 +136,34 @@ func (c *coalesce) Next(ctx context.Context, buf []model.StepVector) (int, error

if n > 0 {
c.inVectors[opIdx] = c.tempBufs[opIdx][:n]
mu.Lock()
minTs = min(minTs, c.tempBufs[opIdx][0].T)
mu.Unlock()
} else {
c.inVectors[opIdx] = nil
}
}(idx, o)
}
c.wg.Wait()
close(errChan)

if err := errChan.getError(); err != nil {
if err := drainErrChan(c.errChan); err != nil {
return 0, err
}

// Count vectors with minTs and prepare output
var minTs int64 = math.MaxInt64
for _, vectors := range c.inVectors {
if len(vectors) > 0 {
minTs = min(minTs, vectors[0].T)
}
}

n := 0
for opIdx, vectors := range c.inVectors {
if len(vectors) == 0 || vectors[0].T != minTs {
continue
}

// Initialize output vectors if needed
if n == 0 {
maxSteps := min(len(vectors), len(buf))
for i := range maxSteps {
buf[i].Reset(vectors[i].T)
// Ensure sufficient capacity for float samples.
// Histogram slices will grow on demand since most queries don't use them.
totalSamples := 0
totalHistograms := 0
for _, v := range c.inVectors {
Expand All @@ -199,13 +184,11 @@ func (c *coalesce) Next(ctx context.Context, buf []model.StepVector) (int, error
n = maxSteps
}

// Append samples from this operator
for i := 0; i < n && i < len(vectors); i++ {
buf[i].AppendSamples(vectors[i].SampleIDs, vectors[i].Samples)
buf[i].AppendHistograms(vectors[i].HistogramIDs, vectors[i].Histograms)
}

// Keep remaining vectors for next iteration
if n < len(vectors) {
c.inVectors[opIdx] = vectors[n:]
} else {
Expand All @@ -220,19 +203,18 @@ func (c *coalesce) loadSeries(ctx context.Context) error {
var wg sync.WaitGroup
var numSeries uint64
allSeries := make([][]labels.Labels, len(c.operators))
errChan := make(errorChan, len(c.operators))
for i := range c.operators {
wg.Add(1)
go func(i int) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
errChan <- errors.Newf("unexpected panic: %v", r)
c.errChan <- errors.Newf("unexpected panic: %v", r)
}
}()
series, err := c.operators[i].Series(ctx)
if err != nil {
errChan <- err
c.errChan <- err
return
}

Expand All @@ -241,22 +223,16 @@ func (c *coalesce) loadSeries(ctx context.Context) error {
}(i)
}
wg.Wait()
close(errChan)
if err := errChan.getError(); err != nil {
if err := drainErrChan(c.errChan); err != nil {
return err
}

c.sampleOffsets = make([]uint64, len(c.operators))
c.seriesCounts = make([]int, len(c.operators))
c.series = make([]labels.Labels, 0, numSeries)
for i, series := range allSeries {
c.sampleOffsets[i] = uint64(len(c.series))
c.seriesCounts[i] = len(series)
c.series = append(c.series, series...)
}

if c.batchSize == 0 || c.batchSize > int64(len(c.series)) {
c.batchSize = int64(len(c.series))
}
return nil
}
2 changes: 1 addition & 1 deletion execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func newDeduplication(ctx context.Context, e logicalplan.Deduplicate, scanners s
}
operators[i] = operator
}
coalesce := exchange.NewCoalesce(opts, 0, operators...)
coalesce := exchange.NewCoalesce(opts, operators...)
dedup := exchange.NewDedupOperator(coalesce, opts)
return exchange.NewConcurrent(dedup, 2, opts), nil
}
Expand Down
4 changes: 2 additions & 2 deletions storage/prometheus/scanners.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (p Scanners) NewVectorSelector(
operators = append(operators, operator)
}

return exchange.NewCoalesce(opts, logicalNode.BatchSize*int64(opts.DecodingConcurrency), operators...), nil
return exchange.NewCoalesce(opts, operators...), nil
}

func (p Scanners) NewMatrixSelector(
Expand Down Expand Up @@ -156,7 +156,7 @@ func (p Scanners) NewMatrixSelector(
operators = append(operators, exchange.NewConcurrent(operator, 2, opts))
}

return exchange.NewCoalesce(opts, vs.BatchSize*int64(opts.DecodingConcurrency), operators...), nil
return exchange.NewCoalesce(opts, operators...), nil
}

type histogramStatsSelector struct {
Expand Down
Loading