Skip to content

Commit eaf0a41

Browse files
authored
perf: Optimize NULL handling in array_has (#21471)
## Which issue does this PR close? - Closes #21470. ## Rationale for this change `array_has` uses `BooleanArray::builder` to construct its results. This updates the NULL buffer in an incremental fashion (row-by-row). It is more efficient to use `BooleanBufferBuilder` to construct the results and separately construct the output NULL buffer via `NullBuffer::union` or similar. Benchmarks (AR64): ``` array_has_i64 (scalar needle path) - found/10: 51.7 µs -> 49.2 µs, -4.7% - not_found/10: 43.2 µs -> 40.0 µs, -7.2% - found/100: 162.0 µs -> 152.3 µs, -6.3% - not_found/100: 143.1 µs -> 135.8 µs, -5.2% - found/500: 623.4 µs -> 572.5 µs, -8.4% - not_found/500: 593.4 µs -> 560.5 µs, -5.5% array_has_all (row-converter path) - all_found_small_needle/10: 699.4 µs -> 641.1 µs, -8.1% - not_all_found/10: 533.3 µs -> 470.7 µs, -9.5% - all_found_small_needle/100: 5.94 ms -> 4.49 ms, -24.4% - not_all_found/100: 4.60 ms -> 3.83 ms, -16.7% - all_found_small_needle/500: 34.6 ms -> 31.7 ms, -8.3% - not_all_found/500: 30.4 ms -> 28.4 ms, -6.7% array_has_any (row-converter path) - some_match/10: 619.7 µs -> 563.2 µs, -9.5% - no_match/10: 1.11 ms -> 1.04 ms, -0.1% - scalar_some_match/10: 467.4 µs -> 444.6 µs, -4.2% - scalar_no_match/10: 1.15 ms -> 994.5 µs, -14.5% - some_match/100: 4.98 ms -> 4.35 ms, -12.6% - no_match/100: 10.47 ms -> 8.90 ms, -15.0% - scalar_some_match/100: 4.63 ms -> 4.19 ms, -9.4% - scalar_no_match/100: 10.40 ms -> 9.98 ms, -4.1% - some_match/500: 31.8 ms -> 28.9 ms, -9.1% - no_match/500: 66.1 ms -> 58.3 ms, -11.8% - scalar_some_match/500: 25.6 ms -> 23.5 ms, -8.3% - scalar_no_match/500: 58.8 ms -> 54.1 ms, -8.0% array_has_strings (scalar needle path) - found/10: 396.3 µs -> 364.7 µs, -8.5% - not_found/10: 69.9 µs -> 67.5 µs, -3.6% - found/100: 1.34 ms -> 1.25 ms, -7.0% - not_found/100: 2.53 ms -> 2.37 ms, -6.1% - found/500: 3.36 ms -> 3.11 ms, -7.5% - not_found/500: 8.59 ms -> 8.12 ms, -5.5% array_has_all_strings (string fast path) - all_found/10: 1.08 ms -> 1.01 ms, -7.7% - not_all_found/10: 659.0 µs -> 632.5 µs, -3.9% - all_found/100: 5.50 ms -> 5.24 ms, -4.6% - not_all_found/100: 4.77 ms -> 4.58 ms, -4.0% - all_found/500: 27.4 ms -> 26.2 ms, -4.6% - not_all_found/500: 30.0 ms -> 28.8 ms, -3.9% array_has_any_strings (string fast path) - some_match/10: 946.5 µs -> 872.9 µs, -6.7% - no_match/10: 1.20 ms -> 1.12 ms, -6.0% - scalar_some_match/10: 420.4 µs -> 375.7 µs, -8.7% - scalar_no_match/10: 344.0 µs -> 309.5 µs, -10.1% - some_match/100: 4.93 ms -> 4.76 ms, -3.4% - no_match/100: 8.76 ms -> 8.50 ms, -2.9% - scalar_some_match/100: 1.49 ms -> 1.40 ms, -6.0% - scalar_no_match/100: 2.94 ms -> 2.72 ms, -7.8% - some_match/500: 24.0 ms -> 23.4 ms, -2.7% - no_match/500: 57.0 ms -> 55.8 ms, -2.1% - scalar_some_match/500: 5.45 ms -> 5.07 ms, -7.0% - scalar_no_match/500: 34.5 ms -> 32.9 ms, -4.5% array_has_any_scalar (varying scalar size) - i64_no_match/1: 173.7 µs -> 162.6 µs, -4.3% - i64_no_match/10: 139.8 µs -> 125.7 µs, -9.5% - i64_no_match/100: 264.8 µs -> 253.7 µs, -4.6% - i64_no_match/1000: 155.4 µs -> 142.4 µs, -8.0% - string_no_match/1: 125.1 µs -> 107.5 µs, -14.3% - string_no_match/10: 164.8 µs -> 147.0 µs, -10.1% - string_no_match/100: 257.8 µs -> 243.4 µs, -5.8% - string_no_match/1000: 180.9 µs -> 164.5 µs, -8.9% ``` ## What changes are included in this PR? * Implement optimization, minor code cleanup ## Are these changes tested? Yes. ## Are there any user-facing changes? No.
1 parent 1929d71 commit eaf0a41

File tree

1 file changed

+34
-38
lines changed

1 file changed

+34
-38
lines changed

datafusion/functions-nested/src/array_has.rs

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use arrow::array::{
2121
Array, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder, Datum, Scalar,
2222
StringArrayType,
2323
};
24-
use arrow::buffer::BooleanBuffer;
24+
use arrow::buffer::{BooleanBuffer, NullBuffer};
2525
use arrow::datatypes::DataType;
2626
use arrow::row::{RowConverter, Rows, SortField};
2727
use datafusion_common::cast::{as_fixed_size_list_array, as_generic_list_array};
@@ -314,7 +314,7 @@ impl<'a> ArrayWrapper<'a> {
314314
}
315315
}
316316

317-
fn nulls(&self) -> Option<&arrow::buffer::NullBuffer> {
317+
fn nulls(&self) -> Option<&NullBuffer> {
318318
match self {
319319
ArrayWrapper::FixedSizeList(arr) => arr.nulls(),
320320
ArrayWrapper::List(arr) => arr.nulls(),
@@ -327,20 +327,21 @@ fn array_has_dispatch_for_array<'a>(
327327
haystack: ArrayWrapper<'a>,
328328
needle: &ArrayRef,
329329
) -> Result<ArrayRef> {
330-
let mut boolean_builder = BooleanArray::builder(haystack.len());
330+
let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
331+
let mut result = BooleanBufferBuilder::new(haystack.len());
331332
for (i, arr) in haystack.iter().enumerate() {
332-
if arr.is_none() || needle.is_null(i) {
333-
boolean_builder.append_null();
333+
if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
334+
result.append(false);
334335
continue;
335336
}
336337
let arr = arr.unwrap();
337338
let is_nested = arr.data_type().is_nested();
338339
let needle_row = Scalar::new(needle.slice(i, 1));
339340
let eq_array = compare_with_eq(&arr, &needle_row, is_nested)?;
340-
boolean_builder.append_value(eq_array.true_count() > 0);
341+
result.append(eq_array.true_count() > 0);
341342
}
342343

343-
Ok(Arc::new(boolean_builder.finish()))
344+
Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
344345
}
345346

346347
fn array_has_dispatch_for_scalar(
@@ -435,9 +436,8 @@ fn general_array_has_for_all_and_any<'a>(
435436
let h_offsets: Vec<usize> = haystack.offsets().collect();
436437
let n_offsets: Vec<usize> = needle.offsets().collect();
437438

438-
let h_nulls = haystack.nulls();
439-
let n_nulls = needle.nulls();
440-
let mut builder = BooleanArray::builder(num_rows);
439+
let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
440+
let mut result = BooleanBufferBuilder::new(num_rows);
441441

442442
for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
443443
let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
@@ -460,13 +460,11 @@ fn general_array_has_for_all_and_any<'a>(
460460
let chunk_n_rows = converter.convert_columns(&[n_vals])?;
461461

462462
for i in chunk_start..chunk_end {
463-
if h_nulls.is_some_and(|n| n.is_null(i))
464-
|| n_nulls.is_some_and(|n| n.is_null(i))
465-
{
466-
builder.append_null();
463+
if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
464+
result.append(false);
467465
continue;
468466
}
469-
builder.append_value(general_array_has_all_and_any_kernel(
467+
result.append(general_array_has_all_and_any_kernel(
470468
&chunk_h_rows,
471469
(h_offsets[i] - h_elem_start)..(h_offsets[i + 1] - h_elem_start),
472470
&chunk_n_rows,
@@ -476,7 +474,7 @@ fn general_array_has_for_all_and_any<'a>(
476474
}
477475
}
478476

479-
Ok(Arc::new(builder.finish()))
477+
Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
480478
}
481479

482480
// String comparison for array_has_all and array_has_any
@@ -490,9 +488,8 @@ fn array_has_all_and_any_string_internal<'a>(
490488
let h_offsets: Vec<usize> = haystack.offsets().collect();
491489
let n_offsets: Vec<usize> = needle.offsets().collect();
492490

493-
let h_nulls = haystack.nulls();
494-
let n_nulls = needle.nulls();
495-
let mut builder = BooleanArray::builder(num_rows);
491+
let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
492+
let mut result = BooleanBufferBuilder::new(num_rows);
496493

497494
for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
498495
let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
@@ -513,25 +510,23 @@ fn array_has_all_and_any_string_internal<'a>(
513510
let chunk_n_strings = string_array_to_vec(n_vals.as_ref());
514511

515512
for i in chunk_start..chunk_end {
516-
if h_nulls.is_some_and(|n| n.is_null(i))
517-
|| n_nulls.is_some_and(|n| n.is_null(i))
518-
{
519-
builder.append_null();
513+
if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
514+
result.append(false);
520515
continue;
521516
}
522517
let h_start = h_offsets[i] - h_elem_start;
523518
let h_end = h_offsets[i + 1] - h_elem_start;
524519
let n_start = n_offsets[i] - n_elem_start;
525520
let n_end = n_offsets[i + 1] - n_elem_start;
526-
builder.append_value(array_has_string_kernel(
521+
result.append(array_has_string_kernel(
527522
&chunk_h_strings[h_start..h_end],
528523
&chunk_n_strings[n_start..n_end],
529524
comparison_type,
530525
));
531526
}
532527
}
533528

534-
Ok(Arc::new(builder.finish()))
529+
Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
535530
}
536531

537532
fn array_has_all_and_any_dispatch<'a>(
@@ -687,16 +682,16 @@ impl<'a> ScalarStringLookup<'a> {
687682
fn array_has_any_string_inner<'a, C: StringArrayType<'a> + Copy>(
688683
col_strings: C,
689684
col_offsets: &[usize],
690-
col_nulls: Option<&arrow::buffer::NullBuffer>,
685+
col_nulls: Option<&NullBuffer>,
691686
has_null_scalar: bool,
692687
scalar_lookup: &ScalarStringLookup<'_>,
693688
) -> ArrayRef {
694689
let num_rows = col_offsets.len() - 1;
695-
let mut builder = BooleanArray::builder(num_rows);
690+
let mut result = BooleanBufferBuilder::new(num_rows);
696691

697692
for i in 0..num_rows {
698693
if col_nulls.is_some_and(|v| v.is_null(i)) {
699-
builder.append_null();
694+
result.append(false);
700695
continue;
701696
}
702697
let start = col_offsets[i];
@@ -708,10 +703,10 @@ fn array_has_any_string_inner<'a, C: StringArrayType<'a> + Copy>(
708703
scalar_lookup.contains(col_strings.value(j))
709704
}
710705
});
711-
builder.append_value(found);
706+
result.append(found);
712707
}
713708

714-
Arc::new(builder.finish())
709+
Arc::new(BooleanArray::new(result.finish(), col_nulls.cloned()))
715710
}
716711

717712
/// General scalar fast path for `array_has_any`, using RowConverter for
@@ -734,7 +729,7 @@ fn array_has_any_with_scalar_general(
734729
let col_offsets: Vec<usize> = col_list.offsets().collect();
735730
let col_nulls = col_list.nulls();
736731

737-
let mut builder = BooleanArray::builder(col_list.len());
732+
let mut result = BooleanBufferBuilder::new(col_list.len());
738733
let num_scalar = scalar_rows.num_rows();
739734

740735
if num_scalar > SCALAR_SMALL_THRESHOLD {
@@ -745,38 +740,39 @@ fn array_has_any_with_scalar_general(
745740

746741
for i in 0..col_list.len() {
747742
if col_nulls.is_some_and(|v| v.is_null(i)) {
748-
builder.append_null();
743+
result.append(false);
749744
continue;
750745
}
751746
let start = col_offsets[i];
752747
let end = col_offsets[i + 1];
753748
let found =
754749
(start..end).any(|j| scalar_set.contains(col_rows.row(j).as_ref()));
755-
builder.append_value(found);
750+
result.append(found);
756751
}
757752
} else {
758753
// Small scalar: linear scan avoids HashSet hashing overhead
759754
for i in 0..col_list.len() {
760755
if col_nulls.is_some_and(|v| v.is_null(i)) {
761-
builder.append_null();
756+
result.append(false);
762757
continue;
763758
}
764759
let start = col_offsets[i];
765760
let end = col_offsets[i + 1];
766761
let found = (start..end)
767762
.any(|j| (0..num_scalar).any(|k| col_rows.row(j) == scalar_rows.row(k)));
768-
builder.append_value(found);
763+
result.append(found);
769764
}
770765
}
771766

772-
let result: ArrayRef = Arc::new(builder.finish());
767+
let output: ArrayRef =
768+
Arc::new(BooleanArray::new(result.finish(), col_nulls.cloned()));
773769

774770
if is_scalar_output {
775771
Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
776-
&result, 0,
772+
&output, 0,
777773
)?))
778774
} else {
779-
Ok(ColumnarValue::Array(result))
775+
Ok(ColumnarValue::Array(output))
780776
}
781777
}
782778

0 commit comments

Comments
 (0)