Skip to content

Commit b0fe151

Browse files
committed
Fix 4 bugs in vectorized aggregation operators
1. COUNT(*) re-emission in DirectIndexAgg: add emitted flag to GroupSlot and check !slot.emitted before output to prevent repeated emission of COUNT(*) groups 2. Thread local state not reset after merge: reinitialize thread_hash_aggs_[t] and clear thread_group_keys_[t] after merge_from() to prevent double-counting on subsequent merges 3. MIN/MAX float truncation: add mins_float64[], maxes_float64[], has_float_minmax[] accumulators to HashBucket and GroupSlot; branch on column type in update_bucket_accumulators to use to_float64() for FLOAT64 columns instead of truncating via to_int64() 4. Negative int64_t key wraparound in get_slot: use 3-step cast (int8_t -> uint8_t -> size_t) instead of direct subtraction to avoid negative key values wrapping to large indices
1 parent 1e18386 commit b0fe151

1 file changed

Lines changed: 35 additions & 11 deletions

File tree

include/executor/vectorized_operator.hpp

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,9 @@ class OpenAddressHashAgg {
422422
int64_t mins[MAX_AGGREGATES] = {0};
423423
int64_t maxes[MAX_AGGREGATES] = {0};
424424
bool has_mins[MAX_AGGREGATES] = {false}; // Track if initialized
425+
double mins_float64[MAX_AGGREGATES] = {0.0}; // Float MIN accumulator
426+
double maxes_float64[MAX_AGGREGATES] = {0.0}; // Float MAX accumulator
427+
bool has_float_minmax[MAX_AGGREGATES] = {false}; // Track if float MIN/MAX initialized
425428
uint8_t key_type = 0; // 0x02=INT64, 0x04=STRING
426429
uint32_t key_len = 0; // For non-int64 keys
427430
uint8_t key_data[64]; // Stored key bytes for iteration
@@ -680,13 +683,17 @@ class DirectIndexAgg {
680683
private:
681684
struct GroupSlot {
682685
bool valid = false;
686+
bool emitted = false; // Track if this slot's group has been output
683687
int64_t counts[MAX_AGGREGATES] = {0};
684688
int64_t sums_int64[MAX_AGGREGATES] = {0};
685689
double sums_float64[MAX_AGGREGATES] = {0.0};
686690
bool has_float_value[MAX_AGGREGATES] = {false};
687691
int64_t mins[MAX_AGGREGATES] = {0};
688692
int64_t maxes[MAX_AGGREGATES] = {0};
689693
bool has_mins[MAX_AGGREGATES] = {false};
694+
double mins_float64[MAX_AGGREGATES] = {0.0}; // Float MIN accumulator
695+
double maxes_float64[MAX_AGGREGATES] = {0.0}; // Float MAX accumulator
696+
bool has_float_minmax[MAX_AGGREGATES] = {false}; // Track if float MIN/MAX initialized
690697
};
691698

692699
size_t num_aggs_ = 0;
@@ -707,7 +714,8 @@ class DirectIndexAgg {
707714
}
708715

709716
GroupSlot& get_slot(int64_t key) {
710-
size_t idx = static_cast<size_t>(key - min_key_);
717+
// Normalize key through int8/uint8 to avoid negative wraparound
718+
size_t idx = static_cast<size_t>(static_cast<uint8_t>(static_cast<int8_t>(key)));
711719
return slots_[idx];
712720
}
713721

@@ -1013,6 +1021,10 @@ class VectorizedGroupByOperator : public VectorizedOperator {
10131021
// Also merge group keys
10141022
hash_group_keys_.insert(hash_group_keys_.end(), thread_group_keys_[t].begin(),
10151023
thread_group_keys_[t].end());
1024+
// Reset thread-local state to avoid double-counting on subsequent merges
1025+
thread_hash_aggs_[t].init(std::max(size_t(8192), 65536 / num_threads_),
1026+
aggregates_.size());
1027+
thread_group_keys_[t].clear();
10161028
}
10171029
} else {
10181030
// Sequential path (original code)
@@ -1094,14 +1106,26 @@ class VectorizedGroupByOperator : public VectorizedOperator {
10941106
agg.input_col_idx >= 0) {
10951107
const auto& col = batch.get_column(agg.input_col_idx);
10961108
if (!col.is_null(row_idx)) {
1097-
auto val = col.get(row_idx).to_int64();
1098-
if (!bucket.has_mins[i]) {
1099-
bucket.mins[i] = val;
1100-
bucket.maxes[i] = val;
1101-
bucket.has_mins[i] = true;
1109+
if (col.type() == common::ValueType::TYPE_FLOAT64) {
1110+
auto val = col.get(row_idx).to_float64();
1111+
if (!bucket.has_float_minmax[i]) {
1112+
bucket.mins_float64[i] = val;
1113+
bucket.maxes_float64[i] = val;
1114+
bucket.has_float_minmax[i] = true;
1115+
} else {
1116+
bucket.mins_float64[i] = std::min(bucket.mins_float64[i], val);
1117+
bucket.maxes_float64[i] = std::max(bucket.maxes_float64[i], val);
1118+
}
11021119
} else {
1103-
bucket.mins[i] = std::min(bucket.mins[i], val);
1104-
bucket.maxes[i] = std::max(bucket.maxes[i], val);
1120+
auto val = col.get(row_idx).to_int64();
1121+
if (!bucket.has_mins[i]) {
1122+
bucket.mins[i] = val;
1123+
bucket.maxes[i] = val;
1124+
bucket.has_mins[i] = true;
1125+
} else {
1126+
bucket.mins[i] = std::min(bucket.mins[i], val);
1127+
bucket.maxes[i] = std::max(bucket.maxes[i], val);
1128+
}
11051129
}
11061130
}
11071131
}
@@ -1149,8 +1173,8 @@ class VectorizedGroupByOperator : public VectorizedOperator {
11491173
// Find first valid group slot with output pending
11501174
for (size_t idx : agg_.valid_slots()) {
11511175
auto& slot = agg_.slot(idx);
1152-
if (slot.counts[0] > 0 || (slot.valid && aggregates_[0].type == AggregateType::Count &&
1153-
aggregates_[0].input_col_idx < 0)) {
1176+
if (!slot.emitted && (slot.counts[0] > 0 || (slot.valid && aggregates_[0].type == AggregateType::Count &&
1177+
aggregates_[0].input_col_idx < 0))) {
11541178
// Found a group with data
11551179
// int8 range: -128 to 127
11561180
int64_t key = static_cast<int64_t>(static_cast<int8_t>(idx));
@@ -1176,7 +1200,7 @@ class VectorizedGroupByOperator : public VectorizedOperator {
11761200
common::Value::make_int64(slot.maxes[i]));
11771201
}
11781202
}
1179-
slot.counts[0] = 0; // Mark as output
1203+
slot.emitted = true; // Mark as output to prevent re-emission
11801204
return true;
11811205
}
11821206
}

0 commit comments

Comments
 (0)