Skip to content

Commit 48fd178

Browse files
Subham SinghalSubham Singhal
authored andcommitted
Resolve comment
1 parent 52147dd commit 48fd178

File tree

5 files changed

+20
-3
lines changed

5 files changed

+20
-3
lines changed

datafusion/common/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,7 @@ config_namespace! {
10911091
/// Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a
10921092
/// PartitionedTopKExec that maintains per-partition heaps, avoiding
10931093
/// a full sort of the input.
1094-
pub enable_window_topn: bool, default = true
1094+
pub enable_window_topn: bool, default = false
10951095

10961096
/// When set to true, the optimizer will push TopK (Sort with fetch)
10971097
/// below hash repartition when the partition key is a prefix of the

datafusion/physical-plan/src/sorts/partitioned_topk.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ impl PartitionedTopKExec {
232232
/// The output is sorted by `sort_exprs` (partition keys then order keys),
233233
/// uses the same partitioning as the input, emits all output at once
234234
/// (`EmissionType::Final`), and is bounded.
235+
235236
fn compute_properties(
236237
input: &Arc<dyn ExecutionPlan>,
237238
sort_exprs: LexOrdering,
@@ -269,8 +270,17 @@ impl DisplayAs for PartitionedTopKExec {
269270
)
270271
}
271272
DisplayFormatType::TreeRender => {
273+
let partition_exprs: Vec<String> = self.expr[..self.partition_prefix_len]
274+
.iter()
275+
.map(|e| format!("{}", e.expr))
276+
.collect();
277+
let order_exprs: Vec<String> = self.expr[self.partition_prefix_len..]
278+
.iter()
279+
.map(|e| format!("{e}"))
280+
.collect();
272281
writeln!(f, "fetch={}", self.fetch)?;
273-
writeln!(f, "{}", self.expr)
282+
writeln!(f, "partition=[{}]", partition_exprs.join(", "))?;
283+
writeln!(f, "order=[{}]", order_exprs.join(", "))
274284
}
275285
}
276286
}

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ datafusion.optimizer.enable_topk_aggregation true
309309
datafusion.optimizer.enable_topk_dynamic_filter_pushdown true
310310
datafusion.optimizer.enable_topk_repartition true
311311
datafusion.optimizer.enable_window_limits true
312+
datafusion.optimizer.enable_window_topn false
312313
datafusion.optimizer.expand_views_at_output false
313314
datafusion.optimizer.filter_null_join_keys false
314315
datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150
@@ -453,6 +454,7 @@ datafusion.optimizer.enable_topk_aggregation true When set to true, the optimize
453454
datafusion.optimizer.enable_topk_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase.
454455
datafusion.optimizer.enable_topk_repartition true When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle.
455456
datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible
457+
datafusion.optimizer.enable_window_topn false When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input.
456458
datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
457459
datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
458460
datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150 Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides with more rows than this will use hash table lookups instead. Set to 0 to always use hash table lookups. This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent very large IN lists that might not provide much benefit over hash table lookups. This uses the deduplicated row count once the build side has been evaluated. The default is 150 values per partition. This is inspired by Trino's `max-filter-keys-per-column` setting. See: <https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds>

datafusion/sqllogictest/test_files/window_topn.slt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ CREATE TABLE window_topn_t (id INT, pk INT, val INT) AS VALUES
3030
(9, 3, 50),
3131
(10, 3, 75);
3232

33+
# Enable the optimization for all tests
34+
statement ok
35+
SET datafusion.optimizer.enable_window_topn = true;
36+
3337
# Test 1: Correct results for top-2 per partition
3438
query III rowsort
3539
SELECT id, pk, val FROM (
@@ -109,4 +113,4 @@ statement ok
109113
SET datafusion.optimizer.enable_window_topn = true;
110114

111115
statement ok
112-
DROP TABLE window_topn_t;
116+
DROP TABLE window_topn_t;

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ The following configuration settings are available:
140140
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
141141
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |
142142
| datafusion.optimizer.enable_window_limits | true | When set to true, the optimizer will attempt to push limit operations past window functions, if possible |
143+
| datafusion.optimizer.enable_window_topn | true | When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. |
143144
| datafusion.optimizer.enable_topk_repartition | true | When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. |
144145
| datafusion.optimizer.enable_topk_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. |
145146
| datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. |

0 commit comments

Comments
 (0)