Skip to content

Commit dc52c63

Browse files
authored
[feature](join) support ASOF join (#59591)
apache/doris-website#3290 docs here
1 parent c7d6fe3 commit dc52c63

File tree

97 files changed

+6141
-344
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

97 files changed

+6141
-344
lines changed

be/src/exec/common/hash_table/join_hash_table.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ class JoinHashTable {
7878

7979
DorisVector<uint8_t>& get_visited() { return visited; }
8080

81+
const DorisVector<uint32_t>& get_first() const { return first; }
82+
83+
const DorisVector<uint32_t>& get_next() const { return next; }
84+
85+
const Key* get_build_keys() const { return build_keys; }
86+
8187
bool empty_build_side() const { return _empty_build_side; }
8288

8389
void build(const Key* __restrict keys, const uint32_t* __restrict bucket_nums,
@@ -138,6 +144,14 @@ class JoinHashTable {
138144
build_idx, probe_rows, probe_idxs,
139145
probe_visited, build_idxs);
140146
}
147+
// ASOF JOIN: for each probe row, find one matching build row (the closest match)
148+
// The actual closest match logic is handled in ProcessHashTableProbe
149+
if (JoinOpType == TJoinOp::ASOF_LEFT_INNER_JOIN ||
150+
JoinOpType == TJoinOp::ASOF_LEFT_OUTER_JOIN) {
151+
// Use conjunct path to get all matching rows, then filter in ProcessHashTableProbe
152+
return _find_batch_conjunct<JoinOpType, false>(
153+
keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs);
154+
}
141155
if (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || JoinOpType == TJoinOp::LEFT_SEMI_JOIN ||
142156
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
143157
if (null_map) {
@@ -331,7 +345,8 @@ class JoinHashTable {
331345
JoinOpType == TJoinOp::LEFT_SEMI_JOIN ||
332346
JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
333347
JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
334-
JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) {
348+
JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN ||
349+
JoinOpType == TJoinOp::ASOF_LEFT_OUTER_JOIN) {
335350
// may over batch_size when emplace 0 into build_idxs
336351
if (!build_idx) {
337352
probe_idxs[matched_cnt] = probe_idx;

be/src/exec/common/join_utils.h

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,33 @@
1717

1818
#pragma once
1919

20+
#include <algorithm>
2021
#include <variant>
2122

2223
#include "exec/common/hash_table/hash_key_type.h"
2324
#include "exec/common/hash_table/hash_map_context.h"
2425
#include "exec/common/hash_table/join_hash_table.h"
2526

2627
namespace doris {
28+
29+
// Devirtualize compare_at for ASOF JOIN supported column types.
30+
// ASOF JOIN only supports DateV2, DateTimeV2, and TimestampTZ.
31+
// Dispatches to the concrete ColumnVector<T> once so that all compare_at
32+
// calls inside `func` are direct (non-virtual) calls.
33+
// `func` receives a single argument: a const pointer to the concrete column
34+
// (or const IColumn* as fallback for unexpected types).
35+
template <typename Func>
36+
decltype(auto) asof_column_dispatch(const IColumn* col, Func&& func) {
37+
if (const auto* c_dv2 = check_and_get_column<ColumnDateV2>(col)) {
38+
return std::forward<Func>(func)(c_dv2);
39+
} else if (const auto* c_dtv2 = check_and_get_column<ColumnDateTimeV2>(col)) {
40+
return std::forward<Func>(func)(c_dtv2);
41+
} else if (const auto* c_tstz = check_and_get_column<ColumnTimeStampTz>(col)) {
42+
return std::forward<Func>(func)(c_tstz);
43+
} else {
44+
return std::forward<Func>(func)(col);
45+
}
46+
}
2747
using JoinOpVariants =
2848
std::variant<std::integral_constant<TJoinOp::type, TJoinOp::INNER_JOIN>,
2949
std::integral_constant<TJoinOp::type, TJoinOp::LEFT_SEMI_JOIN>,
@@ -35,7 +55,20 @@ using JoinOpVariants =
3555
std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_SEMI_JOIN>,
3656
std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_ANTI_JOIN>,
3757
std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>,
38-
std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;
58+
std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>,
59+
std::integral_constant<TJoinOp::type, TJoinOp::ASOF_LEFT_INNER_JOIN>,
60+
std::integral_constant<TJoinOp::type, TJoinOp::ASOF_LEFT_OUTER_JOIN>>;
61+
62+
inline bool is_asof_join(TJoinOp::type join_op) {
63+
return join_op == TJoinOp::ASOF_LEFT_INNER_JOIN || join_op == TJoinOp::ASOF_LEFT_OUTER_JOIN;
64+
}
65+
66+
template <int JoinOpType>
67+
inline constexpr bool is_asof_join_op_v =
68+
JoinOpType == TJoinOp::ASOF_LEFT_INNER_JOIN || JoinOpType == TJoinOp::ASOF_LEFT_OUTER_JOIN;
69+
70+
template <int JoinOpType>
71+
inline constexpr bool is_asof_outer_join_op_v = JoinOpType == TJoinOp::ASOF_LEFT_OUTER_JOIN;
3972

4073
template <class T>
4174
using PrimaryTypeHashTableContext = MethodOneNumber<T, JoinHashMap<T, HashCRC32<T>, false>>;
@@ -197,4 +230,99 @@ inline void try_convert_to_direct_mapping(
197230
primary_to_direct_mapping(context, key_columns, variant_ptrs);
198231
}
199232

233+
// ASOF JOIN index with inline values for cache-friendly branchless binary search.
234+
// IntType is the integer representation of the ASOF column value:
235+
// uint32_t for DateV2, uint64_t for DateTimeV2 and TimestampTZ.
236+
// Rows are sorted by asof_value during build, then materialized into SoA arrays
237+
// so probe-side binary search only touches the ASOF values hot path.
238+
template <typename IntType>
239+
struct AsofIndexGroup {
240+
using int_type = IntType;
241+
242+
struct Entry {
243+
IntType asof_value;
244+
uint32_t row_index; // 1-based, 0 = invalid/padding
245+
};
246+
247+
std::vector<Entry> entries;
248+
std::vector<IntType> asof_values;
249+
std::vector<uint32_t> row_indexes;
250+
251+
void add_row(IntType value, uint32_t row_idx) { entries.push_back({value, row_idx}); }
252+
253+
void sort_and_finalize() {
254+
if (entries.empty()) {
255+
return;
256+
}
257+
if (entries.size() > 1) {
258+
pdqsort(entries.begin(), entries.end(),
259+
[](const Entry& a, const Entry& b) { return a.asof_value < b.asof_value; });
260+
}
261+
262+
asof_values.resize(entries.size());
263+
row_indexes.resize(entries.size());
264+
for (size_t i = 0; i < entries.size(); ++i) {
265+
asof_values[i] = entries[i].asof_value;
266+
row_indexes[i] = entries[i].row_index;
267+
}
268+
269+
std::vector<Entry>().swap(entries);
270+
}
271+
272+
const IntType* values_data() const { return asof_values.data(); }
273+
274+
// Branchless lower_bound: first i where asof_values[i] >= target
275+
ALWAYS_INLINE size_t lower_bound(IntType target) const {
276+
size_t lo = 0, n = asof_values.size();
277+
while (n > 1) {
278+
size_t half = n / 2;
279+
lo += half * (asof_values[lo + half] < target);
280+
n -= half;
281+
}
282+
if (lo < asof_values.size()) {
283+
lo += (asof_values[lo] < target);
284+
}
285+
return lo;
286+
}
287+
288+
// Branchless upper_bound: first i where asof_values[i] > target
289+
ALWAYS_INLINE size_t upper_bound(IntType target) const {
290+
size_t lo = 0, n = asof_values.size();
291+
while (n > 1) {
292+
size_t half = n / 2;
293+
lo += half * (asof_values[lo + half] <= target);
294+
n -= half;
295+
}
296+
if (lo < asof_values.size()) {
297+
lo += (asof_values[lo] <= target);
298+
}
299+
return lo;
300+
}
301+
302+
// Semantics by (is_greater, is_strict):
303+
// (true, false): probe >= build -> find largest build value <= probe
304+
// (true, true): probe > build -> find largest build value < probe
305+
// (false, false): probe <= build -> find smallest build value >= probe
306+
// (false, true): probe < build -> find smallest build value > probe
307+
// Returns the build row index of the best match, or 0 if no match.
308+
template <bool IsGreater, bool IsStrict>
309+
ALWAYS_INLINE uint32_t find_best_match(IntType probe_value) const {
310+
if (asof_values.empty()) {
311+
return 0;
312+
}
313+
if constexpr (IsGreater) {
314+
size_t pos = IsStrict ? lower_bound(probe_value) : upper_bound(probe_value);
315+
return pos > 0 ? row_indexes[pos - 1] : 0;
316+
} else {
317+
size_t pos = IsStrict ? upper_bound(probe_value) : lower_bound(probe_value);
318+
return pos < asof_values.size() ? row_indexes[pos] : 0;
319+
}
320+
}
321+
};
322+
323+
// Type-erased container for all ASOF index groups.
324+
// DateV2 -> uint32_t, DateTimeV2/TimestampTZ -> uint64_t.
325+
using AsofIndexVariant = std::variant<std::monostate, std::vector<AsofIndexGroup<uint32_t>>,
326+
std::vector<AsofIndexGroup<uint64_t>>>;
327+
200328
} // namespace doris

0 commit comments

Comments
 (0)