Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,52 @@ def benchmark(self) -> MeasurementSource:
""")


class ProjectionPushdown(Scenario):
"""Hydrate an MV that reads one column from a 32-column persist table.

Exercises parquet ProjectionMask pushdown: persist's decoder skips the 31
columns the MV does not demand. The wide table is created once in
`init()`; each iteration measures fresh hydration of a narrow MV by
parking the cluster at rf=0, creating the MV, starting the timer,
bringing the cluster up to rf=1, and waiting for the MV to be readable.
"""

SCALE = 6

def init(self) -> list[Action]:
n = self.n()
column_defs = ", ".join(f"f{i} INTEGER" for i in range(32))
select_cols = ", ".join(
f"((generate_series * {17 + i * 7}) % 1000003)::int AS f{i}"
for i in range(32)
)
return [
TdAction(f"""
> CREATE CLUSTER pp_cluster SIZE 'scale=1,workers=16', REPLICATION FACTOR 1
> CREATE TABLE pp_wide ({column_defs})
> INSERT INTO pp_wide SELECT {select_cols} FROM generate_series(1, {n})
> SELECT COUNT(*) = {n} FROM pp_wide
true
"""),
]

def benchmark(self) -> MeasurementSource:
return Td(f"""
> DROP MATERIALIZED VIEW IF EXISTS pp_narrow
> ALTER CLUSTER pp_cluster SET (REPLICATION FACTOR 0)
> CREATE MATERIALIZED VIEW pp_narrow IN CLUSTER pp_cluster AS SELECT f0 FROM pp_wide
> SET CLUSTER = pp_cluster
> SELECT 1
/* A */
1
> ALTER CLUSTER pp_cluster SET (REPLICATION FACTOR 1)
> SELECT COUNT(*) FROM pp_narrow
/* B */
{self.n()}
> SET CLUSTER = default
""")


class FastPathFilterIndex(FastPath):
"""Measure the time it takes for the fast path to filter our all rows from a materialized view using an index and return"""

Expand Down
3 changes: 3 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ def get_variable_system_parameters(
"100000",
["10000", "100000", "1000000"],
),
VariableSystemParameter(
"storage_source_enable_column_projection", "true", ["true", "false"]
),
VariableSystemParameter(
"storage_statistics_collection_interval",
"1000",
Expand Down
3 changes: 3 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,9 @@ def __init__(
self.flags_with_values["enable_cast_elimination"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_upsert_v2"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_coalesce_case_transform"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["storage_source_enable_column_projection"] = (
BOOLEAN_FLAG_VALUES
)

# If you are adding a new config flag in Materialize, consider using it
# here instead of just marking it as uninteresting to silence the
Expand Down
7 changes: 6 additions & 1 deletion src/adapter/src/explain/mir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ impl<'a> Explainable<'a, DataflowDescription<OptimizedMirRelationExpr>> {
.iter_mut()
.map(|(id, import)| {
let op = import.desc.arguments.operators.as_ref();
ExplainSource::new(*id, op, context.config.filter_pushdown)
ExplainSource::new(
*id,
op,
context.config.filter_pushdown,
context.config.projection_pushdown,
)
})
.collect::<Vec<_>>();

Expand Down
14 changes: 12 additions & 2 deletions src/compute-types/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ impl<'a> DataflowDescription<Plan> {
.iter_mut()
.map(|(id, import)| {
let op = import.desc.arguments.operators.as_ref();
ExplainSource::new(*id, op, context.config.filter_pushdown)
ExplainSource::new(
*id,
op,
context.config.filter_pushdown,
context.config.projection_pushdown,
)
})
.collect::<Vec<_>>();

Expand Down Expand Up @@ -139,7 +144,12 @@ impl<'a> DataflowDescription<OptimizedMirRelationExpr> {
.iter_mut()
.map(|(id, import)| {
let op = import.desc.arguments.operators.as_ref();
ExplainSource::new(*id, op, context.config.filter_pushdown)
ExplainSource::new(
*id,
op,
context.config.filter_pushdown,
context.config.projection_pushdown,
)
})
.collect::<Vec<_>>();

Expand Down
53 changes: 51 additions & 2 deletions src/expr/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

//! `EXPLAIN` support for structures defined in this crate.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Formatter;
use std::time::Duration;

Expand All @@ -18,7 +18,7 @@ use mz_repr::GlobalId;
use mz_repr::explain::ExplainError::LinearChainsPlusRecursive;
use mz_repr::explain::text::DisplayText;
use mz_repr::explain::{
AnnotatedPlan, Explain, ExplainConfig, ExplainError, ExprHumanizer, ScalarOps,
AnnotatedPlan, Explain, ExplainConfig, ExplainError, ExprHumanizer, Indices, ScalarOps,
UnsupportedFormat, UsedIndexes,
};
use mz_repr::optimize::OptimizerFeatures;
Expand Down Expand Up @@ -93,18 +93,51 @@ where
}
}

/// Carries metadata about the columns demanded from a source decoder.
/// (Only emitted when a context flag is enabled.)
#[derive(Debug)]
pub struct ProjectionInfo {
/// Input columns demanded by the source MFP (filter + map + project).
pub demand: Vec<usize>,
/// Total input arity, for deciding whether the demand is the identity.
pub input_arity: usize,
}

impl<'a, C, M> DisplayText<C> for HumanizedExpr<'a, ProjectionInfo, M>
where
C: AsMut<Indent>,
M: HumanizerMode,
{
fn fmt_text(&self, f: &mut Formatter<'_>, ctx: &mut C) -> std::fmt::Result {
let ProjectionInfo {
demand,
input_arity,
} = self.expr;

// Suppress the annotation when every column is demanded — that's the
// case where projection pushdown is a no-op.
if demand.len() == *input_arity {
return Ok(());
}

writeln!(f, "{}demand=({})", ctx.as_mut(), Indices(demand))
}
}

#[allow(missing_debug_implementations)]
pub struct ExplainSource<'a> {
pub id: GlobalId,
pub op: Option<&'a MapFilterProject>,
pub pushdown_info: Option<PushdownInfo<'a>>,
pub projection_info: Option<ProjectionInfo>,
}

impl<'a> ExplainSource<'a> {
pub fn new(
id: GlobalId,
op: Option<&'a MapFilterProject>,
filter_pushdown: bool,
projection_pushdown: bool,
) -> ExplainSource<'a> {
let pushdown_info = if filter_pushdown {
op.map(|op| {
Expand All @@ -121,10 +154,23 @@ impl<'a> ExplainSource<'a> {
None
};

let projection_info = if projection_pushdown {
op.filter(|op| !op.is_identity()).map(|op| {
let demand: BTreeSet<usize> = op.demand();
ProjectionInfo {
demand: demand.into_iter().collect(),
input_arity: op.input_arity,
}
})
} else {
None
};

ExplainSource {
id,
op,
pushdown_info,
projection_info,
}
}

Expand Down Expand Up @@ -155,6 +201,9 @@ where
if let Some(pushdown_info) = &self.expr.pushdown_info {
self.child(pushdown_info).fmt_text(f, ctx)?;
}
if let Some(projection_info) = &self.expr.projection_info {
self.child(projection_info).fmt_text(f, ctx)?;
}
Ok(())
})
}
Expand Down
14 changes: 13 additions & 1 deletion src/expr/src/explain/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

use mz_repr::explain::json::DisplayJson;

use crate::explain::{ExplainMultiPlan, ExplainSinglePlan, ExplainSource, PushdownInfo};
use crate::explain::{
ExplainMultiPlan, ExplainSinglePlan, ExplainSource, ProjectionInfo, PushdownInfo,
};

impl<'a, T: 'a> DisplayJson for ExplainSinglePlan<'a, T>
where
Expand Down Expand Up @@ -47,6 +49,7 @@ where
id,
op,
pushdown_info,
projection_info,
}| {
let mut json = serde_json::json!({
"id": id,
Expand All @@ -58,6 +61,15 @@ where
object.insert("pushdown".to_owned(), serde_json::json!(pushdown));
}

if let Some(ProjectionInfo {
demand,
input_arity: _,
}) = projection_info
{
let object = json.as_object_mut().unwrap();
object.insert("demand".to_owned(), serde_json::json!(demand));
}

json
},
)
Expand Down
9 changes: 8 additions & 1 deletion src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::internal::machine::{
NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER,
};
use crate::internal::state::ROLLUP_THRESHOLD;
use crate::operators::STORAGE_SOURCE_DECODE_FUEL;
use crate::operators::{STORAGE_SOURCE_DECODE_FUEL, STORAGE_SOURCE_ENABLE_COLUMN_PROJECTION};
use crate::read::READER_LEASE_DURATION;

// Ignores the patch version
Expand Down Expand Up @@ -231,6 +231,12 @@ impl PersistConfig {
STORAGE_SOURCE_DECODE_FUEL.get(self)
}

/// Whether the persist source pushes the MFP's column demand into the
/// row decoder, skipping decode of un-demanded columns.
pub fn storage_source_enable_column_projection(&self) -> bool {
STORAGE_SOURCE_ENABLE_COLUMN_PROJECTION.get(self)
}

/// Overrides the value for "persist_reader_lease_duration".
pub fn set_reader_lease_duration(&self, val: Duration) {
self.set_config(&READER_LEASE_DURATION, val);
Expand Down Expand Up @@ -350,6 +356,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&crate::internal::state::ROLLUP_FALLBACK_THRESHOLD_MS)
.add(&crate::internal::state::ENABLE_INCREMENTAL_COMPACTION)
.add(&crate::operators::STORAGE_SOURCE_DECODE_FUEL)
.add(&crate::operators::STORAGE_SOURCE_ENABLE_COLUMN_PROJECTION)
.add(&crate::read::READER_LEASE_DURATION)
.add(&crate::rpc::PUBSUB_CLIENT_ENABLED)
.add(&crate::rpc::PUBSUB_PUSH_DIFF_ENABLED)
Expand Down
Loading
Loading