Skip to content

Commit 07a9513

Browse files
committed
Support dynamic task count assignation
1 parent 20ed9aa commit 07a9513

41 files changed

Lines changed: 1523 additions & 59 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yml

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,33 @@ jobs:
4141
- uses: ./.github/actions/setup
4242
- run: cargo test --features integration
4343

44-
tpch-test:
44+
tpch-correctness-test:
45+
runs-on: ubuntu-latest
46+
strategy:
47+
fail-fast: false
48+
matrix:
49+
adaptive: [ "true", "false" ]
50+
steps:
51+
- uses: actions/checkout@v4
52+
- uses: ./.github/actions/setup
53+
- run: cargo test --features tpch --test tpch_correctness_test
54+
env:
55+
ADAPTIVE: ${{ matrix.adaptive }}
56+
57+
tpch-plans-test:
4558
runs-on: ubuntu-latest
4659
steps:
4760
- uses: actions/checkout@v4
4861
- uses: ./.github/actions/setup
49-
- run: cargo test --features tpch --test 'tpch_*'
62+
- run: cargo test --features tpch --test tpch_plans_test
5063

5164
tpcds-correctness-test:
5265
runs-on: ubuntu-latest
5366
strategy:
5467
fail-fast: false
5568
matrix:
56-
shard: ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10"]
69+
shard: [ "01", "02", "03", "04", "05", "06", "07", "08", "09", "10" ]
70+
adaptive: [ "true", "false" ]
5771
steps:
5872
- uses: actions/checkout@v4
5973
- uses: ./.github/actions/setup
@@ -62,6 +76,8 @@ jobs:
6276
path: testdata/tpcds/main.zip
6377
key: "main.zip"
6478
- run: cargo test --features tpcds --test tpcds_correctness_test shard${{ matrix.shard }}
79+
env:
80+
ADAPTIVE: ${{ matrix.adaptive }}
6581

6682
tpcds-plans-test:
6783
runs-on: ubuntu-latest
@@ -74,7 +90,24 @@ jobs:
7490
key: "main.zip"
7591
- run: cargo test --features tpcds --test tpcds_plans_test
7692

77-
clickbench-test:
93+
clickbench-correctness-test:
94+
runs-on: ubuntu-latest
95+
strategy:
96+
fail-fast: false
97+
matrix:
98+
adaptive: [ "true", "false" ]
99+
steps:
100+
- uses: actions/checkout@v4
101+
- uses: ./.github/actions/setup
102+
- uses: actions/cache@v4
103+
with:
104+
path: testdata/clickbench/
105+
key: "data"
106+
- run: cargo test --features clickbench --test clickbench_correctness_test
107+
env:
108+
ADAPTIVE: ${{ matrix.adaptive }}
109+
110+
clickbench-plans-test:
78111
runs-on: ubuntu-latest
79112
steps:
80113
- uses: actions/checkout@v4
@@ -83,7 +116,7 @@ jobs:
83116
with:
84117
path: testdata/clickbench/
85118
key: "data"
86-
- run: cargo test --features clickbench --test 'clickbench_*'
119+
- run: cargo test --features clickbench --test clickbench_plans_test
87120

88121
format-check:
89122
runs-on: ubuntu-latest

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ moka = { version = "0.12", features = ["sync", "future"] }
4848
crossbeam-queue = "0.3"
4949
sysinfo = { version = "0.30", optional = true }
5050
sketches-ddsketch = { version = "0.3", features = ["use_serde"] }
51+
num-traits = "0.2"
5152
bincode = "1"
5253
tonic-prost = "0.14.2"
5354

benchmarks/cdk/bin/datafusion-bench.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ async function main() {
2424
.option('--max-tasks-per-stage <number>', 'Max tasks per stage', '0')
2525
.option('--repartition-file-min-size <number>', 'repartition_file_min_size DF option', '10485760' /* upstream default */)
2626
.option('--target-partitions <number>', 'target_partitions DF option', '8')
27+
.option('--dynamic <boolean>', 'Use the dynamic task count assigner', 'false')
28+
.option('--bytes-per-partition-per-second <number>', 'Target throughput in bytes per partition per second for the dynamic task count allocator', `${16 * 1024 * 1024}`)
2729
.option('--queries <string>', 'Specific queries to run', undefined)
2830
.option('--debug <boolean>', 'Print the generated plans to stdout')
2931
.option('--warmup <boolean>', 'Perform a warmup query before the benchmarks', 'true')
@@ -46,6 +48,8 @@ async function main() {
4648
const childrenIsolatorUnions = options.childrenIsolatorUnions === 'true' || options.childrenIsolatorUnions === 1
4749
const broadcastJoins = options.broadcastJoins === 'true' || options.broadcastJoins === 1
4850
const partialReduce = options.partialReduce === 'true' || options.partialReduce === 1
51+
const dynamicTaskCount = options.dynamic === 'true' || options.dynamic === 1
52+
const bytesPerPartitionPerSecond = parseInt(options.bytesPerPartitionPerSecond)
4953
const debug = options.debug === true || options.debug === 'true' || options.debug === 1
5054
const warmup = options.warmup === true || options.warmup === 'true' || options.warmup === 1
5155

@@ -59,6 +63,8 @@ async function main() {
5963
compression,
6064
broadcastJoins,
6165
partialReduce,
66+
dynamicTaskCount,
67+
bytesPerPartitionPerSecond,
6268
maxTasksPerStage,
6369
repartitionFileMinSize,
6470
targetPartitions
@@ -98,6 +104,8 @@ class DataFusionRunner implements BenchmarkRunner {
98104
childrenIsolatorUnions: boolean;
99105
broadcastJoins: boolean;
100106
partialReduce: boolean;
107+
dynamicTaskCount: boolean;
108+
bytesPerPartitionPerSecond: number;
101109
maxTasksPerStage: number;
102110
repartitionFileMinSize: number;
103111
targetPartitions: number;
@@ -177,6 +185,8 @@ class DataFusionRunner implements BenchmarkRunner {
177185
SET distributed.children_isolator_unions=${this.options.childrenIsolatorUnions};
178186
SET distributed.broadcast_joins=${this.options.broadcastJoins};
179187
SET distributed.partial_reduce=${this.options.partialReduce};
188+
SET distributed.dynamic_task_count=${this.options.dynamicTaskCount};
189+
SET distributed.bytes_per_partition_per_second=${this.options.bytesPerPartitionPerSecond};
180190
SET distributed.max_tasks_per_stage=${this.options.maxTasksPerStage};
181191
SET datafusion.optimizer.repartition_file_min_size=${this.options.repartitionFileMinSize};
182192
SET datafusion.execution.target_partitions=${this.options.targetPartitions};

benchmarks/src/run.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ pub struct RunOpt {
105105
#[structopt(long, default_value = "0")]
106106
max_tasks_per_stage: usize,
107107

108+
/// Activate dynamic task count
109+
#[structopt(long)]
110+
dynamic: bool,
111+
108112
/// Number of iterations of each test run
109113
#[structopt(short = "i", long = "iterations", default_value = "5")]
110114
iterations: usize,
@@ -205,6 +209,7 @@ impl RunOpt {
205209
.with_distributed_cardinality_effect_task_scale_factor(
206210
self.cardinality_task_sf.unwrap_or(1.0),
207211
)?
212+
.with_distributed_dynamic_task_count(self.dynamic)?
208213
.with_distributed_compression(match self.compression.as_str() {
209214
"zstd" => Some(CompressionType::ZSTD),
210215
"lz4" => Some(CompressionType::LZ4_FRAME),

src/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod recursion;
55
mod task_context_helpers;
66
mod time;
77
mod uuid;
8+
mod vec;
89

910
pub(crate) use children_helpers::require_one_child;
1011
pub(crate) use on_drop_stream::on_drop_stream;
@@ -13,3 +14,4 @@ pub(crate) use recursion::TreeNodeExt;
1314
pub(crate) use task_context_helpers::task_ctx_with_extension;
1415
pub(crate) use time::now_ns;
1516
pub(crate) use uuid::{deserialize_uuid, serialize_uuid};
17+
pub(crate) use vec::{element_wise_sum, vec_avg_reduce, vec_cast, vec_div, vec_mul};

src/common/recursion.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,7 @@ mod tests {
589589
query_id: uuid::Uuid::nil(),
590590
num: 0,
591591
workers: vec![],
592+
runtime_stats: None,
592593
}))
593594
.unwrap()
594595
}

src/common/vec.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use datafusion::common::internal_err;
2+
use datafusion::error::Result;
3+
use num_traits::AsPrimitive;
4+
use std::ops::{AddAssign, DivAssign, MulAssign};
5+
6+
/// Converts a slice of type `I` to a `Vec<O>` using `as`-style primitive casting.
7+
pub(crate) fn vec_cast<I, O>(input: &[I]) -> Vec<O>
8+
where
9+
I: AsPrimitive<O>,
10+
O: Copy + 'static,
11+
{
12+
input.iter().map(|v| v.as_()).collect()
13+
}
14+
15+
/// Adds each element of `other` into the corresponding element of `one`, converting types via `AsPrimitive`.
16+
pub(crate) fn element_wise_sum<I, O>(mut one: Vec<I>, other: &[O]) -> Result<Vec<I>>
17+
where
18+
I: AddAssign + Copy + 'static,
19+
O: AsPrimitive<I> + 'static,
20+
{
21+
if one.len() != other.len() {
22+
return internal_err!("Cannot do an element wise sum of two vectors of different lengths");
23+
}
24+
for i in 0..one.len() {
25+
one[i] += other[i].as_();
26+
}
27+
Ok(one)
28+
}
29+
30+
/// Multiplies every element of `one` by the scalar `other`, converting types via `AsPrimitive`.
31+
pub(crate) fn vec_mul<I, O>(mut one: Vec<I>, other: O) -> Vec<I>
32+
where
33+
I: MulAssign + Copy + 'static,
34+
O: AsPrimitive<I> + 'static,
35+
{
36+
for el in one.iter_mut() {
37+
*el *= other.as_();
38+
}
39+
one
40+
}
41+
42+
/// Divides every element of `one` by the scalar `other`, converting types via `AsPrimitive`.
43+
pub(crate) fn vec_div<I, O>(mut one: Vec<I>, other: O) -> Vec<I>
44+
where
45+
I: DivAssign + Copy + 'static,
46+
O: AsPrimitive<I> + 'static,
47+
{
48+
for el in one.iter_mut() {
49+
*el /= other.as_();
50+
}
51+
one
52+
}
53+
54+
/// Reduces a collection of same-length `f32` vectors into a single vector by averaging element-wise.
55+
/// Empty inner vecs are skipped; returns an empty vec if all inputs are empty.
56+
pub(crate) fn vec_avg_reduce(vecs: Vec<Vec<f32>>) -> Result<Vec<f32>> {
57+
let sample_count = vecs.len();
58+
let mut iter = vecs.into_iter();
59+
let mut acc = loop {
60+
let Some(v) = iter.next() else {
61+
return Ok(vec![]);
62+
};
63+
if !v.is_empty() {
64+
break v;
65+
}
66+
};
67+
for v in iter {
68+
if v.is_empty() {
69+
continue;
70+
} else if acc.len() != v.len() {
71+
return internal_err!(
72+
"vec_avg_reduce: length mismatch — first vec has {} elements, got {}",
73+
acc.len(),
74+
v.len()
75+
);
76+
}
77+
acc = element_wise_sum(acc, &v)?;
78+
}
79+
Ok(vec_div(acc, sample_count as f32))
80+
}

src/coordinator/distributed.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
use crate::DistributedConfig;
12
use crate::common::{require_one_child, serialize_uuid};
23
use crate::coordinator::metrics_store::MetricsStore;
4+
use crate::coordinator::prepare_dynamic_plan::prepare_dynamic_plan;
35
use crate::coordinator::prepare_static_plan::prepare_static_plan;
46
use crate::coordinator::query_coordinator::QueryCoordinator;
57
use crate::distributed_planner::NetworkBoundaryExt;
@@ -198,7 +200,11 @@ impl ExecutionPlan for DistributedExec {
198200
builder.spawn(async move {
199201
let _guard = query_coordinator.end_query_guard();
200202

201-
let result = prepare_static_plan(&query_coordinator, &base_plan)?;
203+
let d_cfg = DistributedConfig::from_config_options(context.session_config().options())?;
204+
let result = match d_cfg.dynamic_task_count {
205+
true => prepare_dynamic_plan(&query_coordinator, &base_plan).await?,
206+
false => prepare_static_plan(&query_coordinator, &base_plan)?,
207+
};
202208

203209
plan_for_viz
204210
.lock()

src/coordinator/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod distributed;
22
mod latency_metric;
33
mod metrics_store;
4+
mod prepare_dynamic_plan;
45
mod prepare_static_plan;
56
mod query_coordinator;
67

0 commit comments

Comments
 (0)