feat: add initial support for array_exists with lambda expression support#3611
feat: add initial support for array_exists with lambda expression support#3611andygrove wants to merge 8 commits intoapache:mainfrom
array_exists with lambda expression support#3611Conversation
Add native support for `array_exists(arr, x -> predicate(x))` in SQL and DataFrame API. This is the first general-purpose lambda expression infrastructure, which can later be extended to support `array_filter`, `array_transform`, and `array_forall`. The lambda body is serialized as a regular expression tree where `NamedLambdaVariable` leaf nodes are serialized as `LambdaVariable` proto messages. On the Rust side, `ArrayExistsExpr` evaluates the lambda body vectorized over all elements in a single pass: it flattens list values, expands the batch with repeat indices, appends elements as a `__comet_lambda_var` column, evaluates once, and reduces per row with SQL three-valued logic semantics. Unsupported lambda bodies (e.g. containing UDFs) fall back to Spark. Closes apache#3149
- Remove unused element_type proto field from ArrayExists - Add LargeListArray support via decompose_list helper - Use column index instead of name for lambda variable lookup - Add TimestampNTZType to supported element types - Restore CometNamedLambdaVariable as standalone serde object - Remove SQL-based Scala tests (covered by SQL file tests) - Add DataFrame tests for decimal and date element types - Add negative test for unsupported element type fallback - Add multi-column batch Rust unit test
| for (i, col) in batch.columns().iter().enumerate() { | ||
| let expanded = take(col.as_ref(), &repeat_indices_array, None)?; | ||
| expanded_columns.push(expanded); | ||
| expanded_fields.push(Arc::new(batch.schema().field(i).clone())); | ||
| } |
There was a problem hiding this comment.
non-blocking: I believe this will also expand uncaptured columns (those not referenced in the lambda body)
To avoid that costly expansion, is possible to either:
- Use a NullArray as it's creation is O(1) regardless of length,
- Only includes on the batch the captured columns and the lambda variable, and rewrite the lambda body adjusting columns indices, as done in http://github.com/apache/datafusion/pull/18329/changes#diff-ac23ff0fe78acd71875341026dd5907736e3e3f49e2c398a69e6b33cb6394ae8R92-R139
There was a problem hiding this comment.
Thank @gstvg. Excellent advice. I have implemented this now.
comphead
left a comment
There was a problem hiding this comment.
Thanks @andygrove this is actually great, Im going through this PR today to see how DF can learn from it
| } | ||
|
|
||
| expr.function match { | ||
| case LambdaFunction(body, Seq(_: NamedLambdaVariable), _) => |
There was a problem hiding this comment.
Do we need a fallback for cases where we cannot support a particular lambda?
There was a problem hiding this comment.
For instance, exists(arr1, x -> exists(arr2, y -> y > x)), nested lambdas will probably not work right.
array_exists with lambda expression support
- Use NullArray for uncaptured columns to avoid costly take() expansion - Add tests for literal false, true, and null lambdas - Fallback when legacy non-three-valued logic mode is configured - Detect and fallback for nested lambda expressions
# Conflicts: # native/spark-expr/src/array_funcs/mod.rs
| object CometArrayExists extends CometExpressionSerde[ArrayExists] { | ||
|
|
||
| /** Check if a lambda body contains nested lambda expressions (e.g., nested exists calls). */ | ||
| private def containsNestedLambda(expr: Expression, currentVar: NamedLambdaVariable): Boolean = { |
There was a problem hiding this comment.
Does currentVar get used?
|
|
||
| statement | ||
| INSERT INTO test_array_exists VALUES (array(1, 2, 3), array('a', 'bb', 'ccc'), array(1.5, 2.5, 3.5), array(false, false, true), array(100, 200, 300), 2), (array(1, 2), array('a', 'b'), array(0.5, 1.5), array(false, false), array(10, 20), 5), (array(), array(), array(), array(), array(), 0), (NULL, NULL, NULL, NULL, NULL, 1), (array(1, NULL, 3), array('a', NULL, 'ccc'), array(1.0, NULL, 3.0), array(true, NULL, false), array(100, NULL, 300), 2) | ||
|
|
There was a problem hiding this comment.
Might be worth adding a test for exists(array(0, null, 2, 3, null), x -> x IS NULL). i.e. null elements but the predicate returns a non null value.
| -- long type | ||
| query | ||
| SELECT exists(arr_long, x -> x > 250) FROM test_array_exists | ||
|
|
There was a problem hiding this comment.
Should we add a case for TimeStamp/TimestampNTZ?
|
I believe apache/datafusion#21231 will affect this. A CaseWhen/If within a lambda using it's variable will remove it from the batch Calling I think a check can be added here http://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/serde/conditional.scala until this get's fixed |
Closes #3149
Summary
array_exists(arr, x -> predicate(x))in SQL and DataFrame APIarray_filter,array_transform,array_forall