Skip to content

Commit 4ac9cf6

Browse files
Dandandanclaude
andcommitted
Avoid concat_batches on hash join build side
Hash join currently concatenates the full build side into a single RecordBatch. When it is relatively large, this copies all data and fails for >2GB StringArray data. This PR stores build-side batches separately as Vec<RecordBatch> and uses composite (u64) indices (batch_idx << 32 | row_idx) to index into them. Build-side batches are coalesced to target_batch_size before hash map construction so that build_batch_from_indices hits the fast SingleBatch/take path for small-to-medium build sides. Key changes: - Store Vec<RecordBatch> instead of one concatenated batch in JoinLeftData - Use interleave for multi-batch gathering, take for single-batch - Element-wise key comparison in equal_rows_arr avoids materializing intermediate arrays - Coalesce build-side batches in collect_left_input - Coalesce batches in CoalescePartitionsExec output Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 5ba06ac commit 4ac9cf6

File tree

8 files changed

+926
-247
lines changed

8 files changed

+926
-247
lines changed

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,33 @@
1818
//! Defines the merge plan for executing partitions in parallel and then merging the results
1919
//! into a single partition
2020
21+
use std::pin::Pin;
2122
use std::sync::Arc;
23+
use std::task::{Context, Poll};
2224

2325
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2426
use super::stream::{ObservedStream, RecordBatchReceiverStream};
2527
use super::{
26-
DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
27-
Statistics,
28+
DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
29+
SendableRecordBatchStream, Statistics,
2830
};
31+
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
2932
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
3033
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
3134
use crate::projection::{ProjectionExec, make_with_child};
3235
use crate::sort_pushdown::SortOrderPushdownResult;
3336
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties};
3437
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3538

39+
use arrow::datatypes::SchemaRef;
40+
use arrow::record_batch::RecordBatch;
3641
use datafusion_common::config::ConfigOptions;
3742
use datafusion_common::tree_node::TreeNodeRecursion;
3843
use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
3944
use datafusion_execution::TaskContext;
4045
use datafusion_physical_expr::PhysicalExpr;
46+
use futures::ready;
47+
use futures::stream::{Stream, StreamExt};
4148

4249
/// Merge execution plan executes partitions in parallel and combines them into a single
4350
/// partition. No guarantees are made about the order of the resulting partition.
@@ -209,6 +216,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
209216
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
210217
let _timer = elapsed_compute.timer();
211218

219+
let batch_size = context.session_config().batch_size();
220+
212221
// use a stream that allows each sender to put in at
213222
// least one result in an attempt to maximize
214223
// parallelism.
@@ -226,11 +235,23 @@ impl ExecutionPlan for CoalescePartitionsExec {
226235
}
227236

228237
let stream = builder.build();
229-
Ok(Box::pin(ObservedStream::new(
230-
stream,
231-
baseline_metrics,
232-
self.fetch,
233-
)))
238+
// Coalesce small batches from multiple partitions into
239+
// larger batches of target_batch_size. This improves
240+
// downstream performance (e.g. hash join build side
241+
// benefits from fewer, larger batches).
242+
Ok(Box::pin(CoalescedStream {
243+
input: Box::pin(ObservedStream::new(
244+
stream,
245+
baseline_metrics,
246+
self.fetch,
247+
)),
248+
coalescer: LimitedBatchCoalescer::new(
249+
self.schema(),
250+
batch_size,
251+
None, // fetch is already handled by ObservedStream
252+
),
253+
completed: false,
254+
}))
234255
}
235256
}
236257
}
@@ -347,6 +368,53 @@ impl ExecutionPlan for CoalescePartitionsExec {
347368
}
348369
}
349370

371+
/// Stream that coalesces small batches into larger ones using
372+
/// [`LimitedBatchCoalescer`].
373+
struct CoalescedStream {
374+
input: SendableRecordBatchStream,
375+
coalescer: LimitedBatchCoalescer,
376+
completed: bool,
377+
}
378+
379+
impl Stream for CoalescedStream {
380+
type Item = Result<RecordBatch>;
381+
382+
fn poll_next(
383+
mut self: Pin<&mut Self>,
384+
cx: &mut Context<'_>,
385+
) -> Poll<Option<Self::Item>> {
386+
loop {
387+
if let Some(batch) = self.coalescer.next_completed_batch() {
388+
return Poll::Ready(Some(Ok(batch)));
389+
}
390+
if self.completed {
391+
return Poll::Ready(None);
392+
}
393+
let input_batch = ready!(self.input.poll_next_unpin(cx));
394+
match input_batch {
395+
None => {
396+
self.completed = true;
397+
self.coalescer.finish()?;
398+
}
399+
Some(Ok(batch)) => match self.coalescer.push_batch(batch)? {
400+
PushBatchStatus::Continue => {}
401+
PushBatchStatus::LimitReached => {
402+
self.completed = true;
403+
self.coalescer.finish()?;
404+
}
405+
},
406+
other => return Poll::Ready(other),
407+
}
408+
}
409+
}
410+
}
411+
412+
impl RecordBatchStream for CoalescedStream {
413+
fn schema(&self) -> SchemaRef {
414+
self.coalescer.schema()
415+
}
416+
}
417+
350418
#[cfg(test)]
351419
mod tests {
352420
use super::*;
@@ -378,10 +446,9 @@ mod tests {
378446
1
379447
);
380448

381-
// the result should contain 4 batches (one per input partition)
449+
// the result should contain all rows (coalesced into fewer batches)
382450
let iter = merge.execute(0, task_ctx)?;
383451
let batches = common::collect(iter).await?;
384-
assert_eq!(batches.len(), num_partitions);
385452

386453
// there should be a total of 400 rows (100 per each partition)
387454
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();

datafusion/physical-plan/src/joins/array_map.rs

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,17 @@ impl ArrayMap {
157157
max_val.wrapping_sub(min_val)
158158
}
159159

160-
/// Creates a new [`ArrayMap`] from the given array of join keys.
160+
/// Creates a new [`ArrayMap`] from per-batch arrays of join keys.
161161
///
162-
/// Note: This function processes only the non-null values in the input `array`,
162+
/// Note: This function processes only the non-null values in the input arrays,
163163
/// ignoring any rows where the key is `NULL`.
164164
///
165-
pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> Result<Self> {
165+
pub(crate) fn try_new(
166+
arrays: &[&ArrayRef],
167+
total_num_rows: usize,
168+
min_val: u64,
169+
max_val: u64,
170+
) -> Result<Self> {
166171
let range = max_val.wrapping_sub(min_val);
167172
if range >= usize::MAX as u64 {
168173
return internal_err!("ArrayMap key range is too large to be allocated.");
@@ -173,10 +178,16 @@ impl ArrayMap {
173178
let mut next: Vec<u32> = vec![];
174179
let mut num_of_distinct_key = 0;
175180

181+
let data_type = arrays
182+
.first()
183+
.map(|a| a.data_type().clone())
184+
.unwrap_or(DataType::Int32);
185+
176186
downcast_supported_integer!(
177-
array.data_type() => (
178-
fill_data,
179-
array,
187+
&data_type => (
188+
fill_data_batched,
189+
arrays,
190+
total_num_rows,
180191
min_val,
181192
&mut data,
182193
&mut next,
@@ -192,8 +203,9 @@ impl ArrayMap {
192203
})
193204
}
194205

195-
fn fill_data<T: ArrowNumericType>(
196-
array: &ArrayRef,
206+
fn fill_data_batched<T: ArrowNumericType>(
207+
arrays: &[&ArrayRef],
208+
total_num_rows: usize,
197209
offset_val: u64,
198210
data: &mut [u32],
199211
next: &mut Vec<u32>,
@@ -202,25 +214,32 @@ impl ArrayMap {
202214
where
203215
T::Native: AsPrimitive<u64>,
204216
{
205-
let arr = array.as_primitive::<T>();
206217
// Iterate in reverse to maintain FIFO order when there are duplicate keys.
207-
for (i, val) in arr.iter().enumerate().rev() {
208-
if let Some(val) = val {
209-
let key: u64 = val.as_();
210-
let idx = key.wrapping_sub(offset_val) as usize;
211-
if idx >= data.len() {
212-
return internal_err!("failed build Array idx >= data.len()");
213-
}
214-
215-
if data[idx] != 0 {
216-
if next.is_empty() {
217-
*next = vec![0; array.len()]
218+
// We iterate batches in reverse, and within each batch iterate rows in reverse,
219+
// using a flat index that spans all batches.
220+
let mut flat_offset = total_num_rows;
221+
for array in arrays.iter().rev() {
222+
let arr = array.as_primitive::<T>();
223+
flat_offset -= arr.len();
224+
for (row_idx, val) in arr.iter().enumerate().rev() {
225+
if let Some(val) = val {
226+
let key: u64 = val.as_();
227+
let idx = key.wrapping_sub(offset_val) as usize;
228+
if idx >= data.len() {
229+
return internal_err!("failed build Array idx >= data.len()");
218230
}
219-
next[i] = data[idx]
220-
} else {
221-
*num_of_distinct_key += 1;
231+
let flat_idx = flat_offset + row_idx;
232+
233+
if data[idx] != 0 {
234+
if next.is_empty() {
235+
*next = vec![0; total_num_rows]
236+
}
237+
next[flat_idx] = data[idx]
238+
} else {
239+
*num_of_distinct_key += 1;
240+
}
241+
data[idx] = flat_idx as u32 + 1;
222242
}
223-
data[idx] = (i) as u32 + 1;
224243
}
225244
}
226245
Ok(())
@@ -419,7 +438,7 @@ mod tests {
419438
#[test]
420439
fn test_array_map_limit_offset_duplicate_elements() -> Result<()> {
421440
let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 1, 2]));
422-
let map = ArrayMap::try_new(&build, 1, 2)?;
441+
let map = ArrayMap::try_new(&[&build], build.len(), 1, 2)?;
423442
let probe = [Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef];
424443

425444
let mut prob_idx = Vec::new();
@@ -450,7 +469,7 @@ mod tests {
450469
#[test]
451470
fn test_array_map_with_limit_and_misses() -> Result<()> {
452471
let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
453-
let map = ArrayMap::try_new(&build, 1, 2)?;
472+
let map = ArrayMap::try_new(&[&build], build.len(), 1, 2)?;
454473
let probe = [Arc::new(Int32Array::from(vec![10, 1, 2])) as ArrayRef];
455474

456475
let (mut p_idx, mut b_idx) = (vec![], vec![]);
@@ -483,7 +502,7 @@ mod tests {
483502
#[test]
484503
fn test_array_map_with_build_duplicates_and_misses() -> Result<()> {
485504
let build_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 1]));
486-
let array_map = ArrayMap::try_new(&build_array, 1, 1)?;
505+
let array_map = ArrayMap::try_new(&[&build_array], build_array.len(), 1, 1)?;
487506
// prob: 10(m), 1(h1, h2), 20(m), 1(h1, h2)
488507
let probe_array: ArrayRef = Arc::new(Int32Array::from(vec![10, 1, 20, 1]));
489508
let prob_side_keys = [probe_array];
@@ -513,7 +532,12 @@ mod tests {
513532
let min_val = -5_i128;
514533
let max_val = 10_i128;
515534

516-
let array_map = ArrayMap::try_new(&build_array, min_val as u64, max_val as u64)?;
535+
let array_map = ArrayMap::try_new(
536+
&[&build_array],
537+
build_array.len(),
538+
min_val as u64,
539+
max_val as u64,
540+
)?;
517541

518542
// Probe array
519543
let probe_array: ArrayRef = Arc::new(Int64Array::from(vec![0, -5, 10, -1]));

0 commit comments

Comments
 (0)