Skip to content

Commit 6e5fbe0

Browse files
committed
test: cover LargeList and scan/take path for #6580
- Refactor the sliced-list merge_with_schema test into a generic helper and add a LargeList case. - Add a scan().filter().project([list_struct_col]) regression in scanner.rs that forces the panicking TakeExec -> merge_with_schema path via MaterializationStyle::AllEarlyExcept.
1 parent 9ed06dc commit 6e5fbe0

2 files changed

Lines changed: 172 additions & 32 deletions

File tree

rust/lance-arrow/src/lib.rs

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2319,56 +2319,71 @@ mod tests {
23192319
assert_eq!(merged_array.len(), 2);
23202320
}
23212321

2322+
#[test]
2323+
fn test_merge_with_schema_sliced_list_struct() {
2324+
test_merge_with_schema_sliced_list_struct_generic::<i32>();
2325+
}
2326+
2327+
#[test]
2328+
fn test_merge_with_schema_sliced_large_list_struct() {
2329+
test_merge_with_schema_sliced_list_struct_generic::<i64>();
2330+
}
2331+
23222332
// Regression for #6580: merge_with_schema panicked when the left list was a
23232333
// sliced view whose offsets did not start at zero (common after a filtered
23242334
// scan). Cloning those offsets alongside `trimmed_values` produced offsets
2325-
// larger than the trimmed child, panicking in `ListArray::new`.
2326-
#[test]
2327-
fn test_merge_with_schema_sliced_list_struct() {
2335+
// larger than the trimmed child, panicking in `(Large)ListArray::new`.
2336+
fn test_merge_with_schema_sliced_list_struct_generic<O: OffsetSizeTrait>() {
2337+
let make_list_dtype = |item_field: Arc<Field>| {
2338+
if O::IS_LARGE {
2339+
DataType::LargeList(item_field)
2340+
} else {
2341+
DataType::List(item_field)
2342+
}
2343+
};
2344+
23282345
// Build a List<Struct> with two rows of 5 items each, then slice away
23292346
// the first row so the remaining list's offsets start at 5, not 0.
2330-
let struct_fields = Fields::from(vec![Field::new("a", DataType::Int32, true)]);
2331-
let values = Arc::new(StructArray::new(
2332-
struct_fields.clone(),
2347+
let struct_fields_a = Fields::from(vec![Field::new("a", DataType::Int32, true)]);
2348+
let left_values = Arc::new(StructArray::new(
2349+
struct_fields_a.clone(),
23332350
vec![Arc::new(Int32Array::from_iter_values(0..10)) as ArrayRef],
23342351
None,
23352352
));
2336-
let full_list = ListArray::new(
2337-
Arc::new(Field::new("item", DataType::Struct(struct_fields), true)),
2338-
OffsetBuffer::from_lengths([5, 5]),
2339-
values,
2353+
let full_list = GenericListArray::<O>::new(
2354+
Arc::new(Field::new("item", DataType::Struct(struct_fields_a), true)),
2355+
OffsetBuffer::<O>::from_lengths([5, 5]),
2356+
left_values,
23402357
None,
23412358
);
23422359
let sliced_left = full_list.slice(1, 1);
2343-
assert_eq!(sliced_left.offsets()[0], 5);
2344-
assert_eq!(sliced_left.offsets()[1], 10);
2360+
assert_eq!(sliced_left.offsets()[0].as_usize(), 5);
2361+
assert_eq!(sliced_left.offsets()[1].as_usize(), 10);
23452362

2346-
let right_struct = Arc::new(StructArray::new(
2347-
Fields::from(vec![Field::new("b", DataType::Int32, true)]),
2363+
let struct_fields_b = Fields::from(vec![Field::new("b", DataType::Int32, true)]);
2364+
let right_values = Arc::new(StructArray::new(
2365+
struct_fields_b.clone(),
23482366
vec![Arc::new(Int32Array::from_iter_values(100..105)) as ArrayRef],
23492367
None,
23502368
));
2351-
let right_list = ListArray::new(
2352-
Arc::new(Field::new(
2353-
"item",
2354-
DataType::Struct(right_struct.fields().clone()),
2355-
true,
2356-
)),
2357-
OffsetBuffer::from_lengths([5]),
2358-
right_struct,
2369+
let right_list = GenericListArray::<O>::new(
2370+
Arc::new(Field::new("item", DataType::Struct(struct_fields_b), true)),
2371+
OffsetBuffer::<O>::from_lengths([5]),
2372+
right_values,
23592373
None,
23602374
);
23612375

2376+
let target_item_field = Arc::new(Field::new(
2377+
"item",
2378+
DataType::Struct(Fields::from(vec![
2379+
Field::new("a", DataType::Int32, true),
2380+
Field::new("b", DataType::Int32, true),
2381+
])),
2382+
true,
2383+
));
23622384
let target_fields = Fields::from(vec![Field::new(
23632385
"items",
2364-
DataType::List(Arc::new(Field::new(
2365-
"item",
2366-
DataType::Struct(Fields::from(vec![
2367-
Field::new("a", DataType::Int32, true),
2368-
Field::new("b", DataType::Int32, true),
2369-
])),
2370-
true,
2371-
))),
2386+
make_list_dtype(target_item_field),
23722387
true,
23732388
)]);
23742389

@@ -2399,10 +2414,10 @@ mod tests {
23992414
.column_by_name("items")
24002415
.unwrap()
24012416
.as_any()
2402-
.downcast_ref::<ListArray>()
2417+
.downcast_ref::<GenericListArray<O>>()
24032418
.unwrap();
24042419
assert_eq!(merged_list.len(), 1);
2405-
assert_eq!(merged_list.value_length(0), 5);
2420+
assert_eq!(merged_list.value_length(0).as_usize(), 5);
24062421
let merged_struct = merged_list.values().as_struct();
24072422
assert_eq!(merged_struct.num_columns(), 2);
24082423
let a = merged_struct

rust/lance/src/dataset/scanner.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5373,6 +5373,131 @@ mod test {
53735373
Ok(())
53745374
}
53755375

5376+
// Regression for #6580: a scan with `filter` + `project` of a
5377+
// `List<Struct>` column used to panic in `merge_with_schema` (called from
5378+
// `TakeStream::map_batch`) because the filtered batch arrived as a sliced
5379+
// view of a larger batch and the cloned list offsets did not start at
5380+
// zero. The trigger requires (a) a `List<Struct>` projection where the
5381+
// struct is split across `filtered_read` and `TakeExec` and (b) a
5382+
// sparse-tail selectivity pattern so the trailing filter result lands deep
5383+
// inside the values buffer of its source batch.
5384+
#[rstest]
5385+
#[tokio::test]
5386+
async fn test_filter_project_list_struct_sparse_tail(
5387+
// The panic is specific to v2.x storage; the legacy reader takes a
5388+
// different code path. V2_0 and V2_2 are the versions called out in
5389+
// the original report.
5390+
#[values(
5391+
LanceFileVersion::V2_0,
5392+
LanceFileVersion::Stable,
5393+
LanceFileVersion::V2_2
5394+
)]
5395+
data_storage_version: LanceFileVersion,
5396+
) {
5397+
use arrow_array::{ListArray, UInt16Array};
5398+
use arrow_buffer::{OffsetBuffer, ScalarBuffer};
5399+
5400+
let struct_fields = Fields::from(vec![
5401+
Arc::new(ArrowField::new("a", DataType::Int32, true)),
5402+
Arc::new(ArrowField::new("b", DataType::Int32, true)),
5403+
]);
5404+
let item_field = Arc::new(ArrowField::new(
5405+
"item",
5406+
DataType::Struct(struct_fields.clone()),
5407+
true,
5408+
));
5409+
let schema = Arc::new(ArrowSchema::new(vec![
5410+
ArrowField::new("id", DataType::Int32, false),
5411+
ArrowField::new("grp", DataType::UInt16, false),
5412+
ArrowField::new("items", DataType::List(item_field.clone()), false),
5413+
]));
5414+
5415+
let make_batch = |start: i32, n: usize, group: u16| -> RecordBatch {
5416+
let ids = Int32Array::from_iter_values(start..start + n as i32);
5417+
let groups = UInt16Array::from(vec![group; n]);
5418+
5419+
let mut offsets = Vec::with_capacity(n + 1);
5420+
let mut a_vals: Vec<i32> = Vec::new();
5421+
let mut b_vals: Vec<i32> = Vec::new();
5422+
offsets.push(0i32);
5423+
for i in 0..n {
5424+
// Variable-length lists (1..=18) so offsets don't land on
5425+
// batch-row boundaries.
5426+
let len = 1 + (i % 18);
5427+
for j in 0..len {
5428+
a_vals.push(j as i32);
5429+
b_vals.push(-(j as i32));
5430+
}
5431+
offsets.push(a_vals.len() as i32);
5432+
}
5433+
let struct_arr = Arc::new(StructArray::new(
5434+
struct_fields.clone(),
5435+
vec![
5436+
Arc::new(Int32Array::from(a_vals)) as ArrayRef,
5437+
Arc::new(Int32Array::from(b_vals)) as ArrayRef,
5438+
],
5439+
None,
5440+
));
5441+
let items = ListArray::new(
5442+
item_field.clone(),
5443+
OffsetBuffer::new(ScalarBuffer::from(offsets)),
5444+
struct_arr,
5445+
None,
5446+
);
5447+
RecordBatch::try_new(
5448+
schema.clone(),
5449+
vec![
5450+
Arc::new(ids) as ArrayRef,
5451+
Arc::new(groups) as ArrayRef,
5452+
Arc::new(items) as ArrayRef,
5453+
],
5454+
)
5455+
.unwrap()
5456+
};
5457+
5458+
// Sparse-tail selectivity (matching the original report's shape at a
5459+
// smaller scale): a large leading block of matches, a large gap of
5460+
// non-matches, then a small trailing match. Single fragment.
5461+
let batches = vec![
5462+
make_batch(0, 100_000, 7),
5463+
make_batch(100_000, 400_000, 1),
5464+
make_batch(500_000, 7_300, 7),
5465+
];
5466+
5467+
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
5468+
let dataset = Dataset::write(
5469+
reader,
5470+
"memory://",
5471+
Some(WriteParams {
5472+
max_rows_per_file: 1_000_000,
5473+
data_storage_version: Some(data_storage_version),
5474+
..Default::default()
5475+
}),
5476+
)
5477+
.await
5478+
.unwrap();
5479+
5480+
// Force a column split inside the `items` struct by marking `items.b`
5481+
// as a late-materialized field: `filtered_read` returns the batch with
5482+
// `items.a`, and `TakeExec` adds `items.b`. `merge_with_schema` then
5483+
// takes its `List<Struct>` branch, which is where the panic was.
5484+
let items_b_field_id = dataset
5485+
.schema()
5486+
.field("items")
5487+
.unwrap()
5488+
.child("item")
5489+
.unwrap()
5490+
.child("b")
5491+
.unwrap()
5492+
.id as u32;
5493+
let mut scan = dataset.scan();
5494+
scan.filter("grp = 7").unwrap();
5495+
scan.project(&["id", "items"]).unwrap();
5496+
scan.materialization_style(MaterializationStyle::AllEarlyExcept(vec![items_b_field_id]));
5497+
let result = scan.try_into_batch().await.unwrap();
5498+
assert_eq!(result.num_rows(), 107_300);
5499+
}
5500+
53765501
#[tokio::test]
53775502
async fn test_scan_regexp_match_and_non_empty_captions() {
53785503
// Build a small dataset with three Utf8 columns and verify the full

0 commit comments

Comments
 (0)