@@ -46,7 +46,7 @@ use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
4646use datafusion_physical_optimizer:: projection_pushdown:: ProjectionPushdown ;
4747use datafusion_physical_plan:: coalesce_partitions:: CoalescePartitionsExec ;
4848use datafusion_physical_plan:: coop:: CooperativeExec ;
49- use datafusion_physical_plan:: filter:: FilterExec ;
49+ use datafusion_physical_plan:: filter:: { FilterExec , FilterExecBuilder } ;
5050use datafusion_physical_plan:: joins:: utils:: { ColumnIndex , JoinFilter } ;
5151use datafusion_physical_plan:: joins:: {
5252 HashJoinExec , NestedLoopJoinExec , PartitionMode , StreamJoinPartitionMode ,
@@ -1754,3 +1754,121 @@ fn test_hash_join_empty_projection_embeds() -> Result<()> {
17541754
17551755 Ok ( ( ) )
17561756}
1757+
1758+ /// Regression test for <https://github.com/apache/datafusion/issues/21459>
1759+ ///
1760+ /// When a `ProjectionExec` sits on top of a `FilterExec` that already carries
1761+ /// an embedded projection, the `ProjectionPushdown` optimizer must not panic.
1762+ ///
1763+ /// Before the fix, `FilterExecBuilder::from(self)` copied stale projection
1764+ /// indices (e.g. `[0, 1, 2]`). After swapping, the new input was narrower
1765+ /// (2 columns), so `.build()` panicked with "project index out of bounds".
1766+ #[ test]
1767+ fn test_filter_with_embedded_projection_after_projection ( ) -> Result < ( ) > {
1768+ // DataSourceExec: [a, b, c, d, e]
1769+ let csv = create_simple_csv_exec ( ) ;
1770+
1771+ // FilterExec: a > 0, projection=[0, 1, 2] → output: [a, b, c]
1772+ let predicate = Arc :: new ( BinaryExpr :: new (
1773+ Arc :: new ( Column :: new ( "a" , 0 ) ) ,
1774+ Operator :: Gt ,
1775+ Arc :: new ( Literal :: new ( ScalarValue :: Int32 ( Some ( 0 ) ) ) ) ,
1776+ ) ) ;
1777+ let filter: Arc < dyn ExecutionPlan > = Arc :: new (
1778+ FilterExecBuilder :: new ( predicate, csv)
1779+ . apply_projection ( Some ( vec ! [ 0 , 1 , 2 ] ) ) ?
1780+ . build ( ) ?,
1781+ ) ;
1782+
1783+ // ProjectionExec: narrows [a, b, c] → [a, b]
1784+ let projection: Arc < dyn ExecutionPlan > = Arc :: new ( ProjectionExec :: try_new (
1785+ vec ! [
1786+ ProjectionExpr :: new( Arc :: new( Column :: new( "a" , 0 ) ) , "a" ) ,
1787+ ProjectionExpr :: new( Arc :: new( Column :: new( "b" , 1 ) ) , "b" ) ,
1788+ ] ,
1789+ filter,
1790+ ) ?) ;
1791+
1792+ let initial = displayable ( projection. as_ref ( ) ) . indent ( true ) . to_string ( ) ;
1793+ let actual = initial. trim ( ) ;
1794+ assert_snapshot ! (
1795+ actual,
1796+ @r"
1797+ ProjectionExec: expr=[a@0 as a, b@1 as b]
1798+ FilterExec: a@0 > 0, projection=[a@0, b@1, c@2]
1799+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1800+ "
1801+ ) ;
1802+
1803+ // This must not panic
1804+ let after_optimize =
1805+ ProjectionPushdown :: new ( ) . optimize ( projection, & ConfigOptions :: new ( ) ) ?;
1806+ let after_optimize_string = displayable ( after_optimize. as_ref ( ) )
1807+ . indent ( true )
1808+ . to_string ( ) ;
1809+ let actual = after_optimize_string. trim ( ) ;
1810+ assert_snapshot ! (
1811+ actual,
1812+ @r"
1813+ FilterExec: a@0 > 0
1814+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b], file_type=csv, has_header=false
1815+ "
1816+ ) ;
1817+
1818+ Ok ( ( ) )
1819+ }
1820+
1821+ /// Same as above, but the outer ProjectionExec also renames columns.
1822+ /// Ensures the rename is preserved after the projection pushdown swap.
1823+ #[ test]
1824+ fn test_filter_with_embedded_projection_after_renaming_projection ( ) -> Result < ( ) > {
1825+ let csv = create_simple_csv_exec ( ) ;
1826+
1827+ // FilterExec: b > 10, projection=[0, 1, 2, 3] → output: [a, b, c, d]
1828+ let predicate = Arc :: new ( BinaryExpr :: new (
1829+ Arc :: new ( Column :: new ( "b" , 1 ) ) ,
1830+ Operator :: Gt ,
1831+ Arc :: new ( Literal :: new ( ScalarValue :: Int32 ( Some ( 10 ) ) ) ) ,
1832+ ) ) ;
1833+ let filter: Arc < dyn ExecutionPlan > = Arc :: new (
1834+ FilterExecBuilder :: new ( predicate, csv)
1835+ . apply_projection ( Some ( vec ! [ 0 , 1 , 2 , 3 ] ) ) ?
1836+ . build ( ) ?,
1837+ ) ;
1838+
1839+ // ProjectionExec: [a as x, b as y] — narrows and renames
1840+ let projection: Arc < dyn ExecutionPlan > = Arc :: new ( ProjectionExec :: try_new (
1841+ vec ! [
1842+ ProjectionExpr :: new( Arc :: new( Column :: new( "a" , 0 ) ) , "x" ) ,
1843+ ProjectionExpr :: new( Arc :: new( Column :: new( "b" , 1 ) ) , "y" ) ,
1844+ ] ,
1845+ filter,
1846+ ) ?) ;
1847+
1848+ let initial = displayable ( projection. as_ref ( ) ) . indent ( true ) . to_string ( ) ;
1849+ let actual = initial. trim ( ) ;
1850+ assert_snapshot ! (
1851+ actual,
1852+ @r"
1853+ ProjectionExec: expr=[a@0 as x, b@1 as y]
1854+ FilterExec: b@1 > 10, projection=[a@0, b@1, c@2, d@3]
1855+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1856+ "
1857+ ) ;
1858+
1859+ let after_optimize =
1860+ ProjectionPushdown :: new ( ) . optimize ( projection, & ConfigOptions :: new ( ) ) ?;
1861+ let after_optimize_string = displayable ( after_optimize. as_ref ( ) )
1862+ . indent ( true )
1863+ . to_string ( ) ;
1864+ let actual = after_optimize_string. trim ( ) ;
1865+ assert_snapshot ! (
1866+ actual,
1867+ @r"
1868+ FilterExec: y@1 > 10
1869+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a@0 as x, b@1 as y], file_type=csv, has_header=false
1870+ "
1871+ ) ;
1872+
1873+ Ok ( ( ) )
1874+ }
0 commit comments