Doc: Design doc for unified per-object arrangement sizes#35884
Doc: Design doc for unified per-object arrangement sizes#35884leedqin wants to merge 2 commits intoMaterializeInc:mainfrom
Conversation
|
Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone. PR title guidelines
Pre-merge checklist
|
| The `cluster_id` is denormalized into the history table because replicas can be dropped | ||
| while their historical rows are still within the retention window. Without it, those | ||
| rows become unattributable to a cluster. |
There was a problem hiding this comment.
How would the cluster_id be used? Is it useful to have it without additional cluster information, like the name?
Also, we have mz_cluster_replica_history, which has both the cluster_id and the cluster_name for each replica that ever existed. Wouldn't that satisfy the need for attribution?
There was a problem hiding this comment.
Yes that would! Updated my design doc to use the mz_cluster_replica_history
| while their historical rows are still within the retention window. Without it, those | ||
| rows become unattributable to a cluster. | ||
|
|
||
| The history table follows the `mz_storage_usage_by_shard` collection and pruning pattern: |
There was a problem hiding this comment.
Note that the current storage usage collection code is known to scale badly and there are plans to change it: #35436. Just pointing this out to make sure we follow the fixed implementation, not the old implementation.
| SELECT | ||
| ce.export_id AS object_id, | ||
| GREATEST(10485760, (SUM(raw.size) / 10485760 * 10485760))::int8 AS size | ||
| FROM mz_introspection.mz_compute_exports AS ce | ||
| JOIN mz_introspection.mz_dataflow_operator_dataflows AS dod | ||
| ON dod.dataflow_id = ce.dataflow_id | ||
| JOIN ( | ||
| SELECT operator_id, COUNT(*) AS size | ||
| FROM ( | ||
| SELECT operator_id FROM mz_introspection.mz_arrangement_heap_size_raw | ||
| UNION ALL | ||
| SELECT operator_id FROM mz_introspection.mz_arrangement_batcher_size_raw | ||
| ) combined | ||
| GROUP BY operator_id | ||
| ) AS raw ON raw.operator_id = dod.id | ||
| GROUP BY ce.export_id |
There was a problem hiding this comment.
Given that this will run on each replica, we should spend some time optimizing the plan as much as we can. Some things that come to mind:
mz_dataflow_operator_dataflowsis a view with a join. We might be able to avoid some work by inlining it and removing parts we don't need.- Consider converting the subquery into a CTE, to give the optimizer an easier time.
- I don't think we should need two
GROUP BYs, one on theexport_idshould be enough. - Probably want to use query hints to limit memory usage.
There was a problem hiding this comment.
Should merge #35889 to fix some pathological optimizer behavior!
There was a problem hiding this comment.
Added these changes! Thank you for the suggestions; The optimized plan goes from 5 index reads to 4 (eliminated mz_dataflow_operators_per_worker), 6-way join to 3-way, and 2 reduces to 1. Added OPTIONS (AGGREGATE INPUT GROUP SIZE = 1000) for memory bounds.
| The `GREATEST(10MB, rounded)` expression serves two purposes: the integer division | ||
| (`/ 10485760 * 10485760`) rounds to 10MB boundaries, suppressing byte-level differential | ||
| churn that would otherwise propagate on every minor allocation. The `GREATEST` sets a | ||
| floor so that objects under 10MB still appear as 10MB rather than rounding to zero and | ||
| disappearing from results. (The POC uses the quantization without the `GREATEST` floor; | ||
| adding the floor is part of the remaining work.) |
There was a problem hiding this comment.
Avoiding churn on tiny changes seems great to me! How was the 10MB chosen?
There was a problem hiding this comment.
Just used a rough heuristic:
- Too small (e.g., 1KB) → almost no churn reduction, defeats the purpose
- Too large (e.g., 100MB) → hides real changes, objects under 100MB become invisible
- 10MB felt like a reasonable middle ground — most meaningful memory consumers are well above 10MB, and typical allocation noise is well below it
| - **Dataflow memory overhead.** Subscribe adds a dataflow to every replica. Same joins | ||
| as `mz_dataflow_arrangement_sizes`, so overhead is known and observable via existing | ||
| per-dataflow introspection. |
There was a problem hiding this comment.
I don't follow the "overhead is known and observable" part. mz_dataflow_arrangement_sizes is a view without an index, so it doesn't have overhead except when it's queried.
There was a problem hiding this comment.
Right, that view has no persistent overhead. Reworded to describe the subscribe's overhead directly instead.
| - **Stale replica cleanup.** Replicas dropped while environmentd is down leave orphaned | ||
| rows. Startup task compares against `mz_cluster_replicas` and retracts mismatches. |
There was a problem hiding this comment.
How can replicas be dropped while envd is down?
There was a problem hiding this comment.
Right I misunderstood that! Removed the live-table startup cleanup; it only prunes expired history rows now.
| - **History table growth.** ~1.7M rows at scale (1000 objects × 10 replicas × 168 | ||
| snapshots). Bounded by 7-day retention with startup pruning. | ||
| - **Silent feature disable.** No data if `ENABLE_INTROSPECTION_SUBSCRIBES = false`. | ||
| Repopulates automatically when re-enabled. |
There was a problem hiding this comment.
A thing missing from the list: The hourly collection only sees snapshots, so (a) it misses short memory spikes and (b) is may miss entire replicas if they were created and dropped between two collections.
| **Reasons not chosen:** Arrangement sizes are not in compute's Prometheus registry. Adding | ||
| them would require compute-side changes to register per-object metrics. The introspection | ||
| subscribe derives the same data from existing log sources with no compute changes. |
There was a problem hiding this comment.
Note that as part of @SangJunBak's metrics work we might want to add arrangement sizes to Prometheus metrics. At least arrangement sizes are part of the suggested customer-facing metrics and I wouldn't want us to run SQL queries on every scrape (especially not ones to compute introspection).
There was a problem hiding this comment.
Added as a note for a future solution that would be more efficient! Will keep an eye out to possibly migrate to using that for the maintained objects when that lands.
| 1. **Naming.** The unified source is named `mz_object_arrangement_sizes` in `mz_internal` | ||
| schema. The existing view with the same name is in `mz_introspection` schema. Different | ||
| schemas, so no conflict, but could cause confusion. Should we use a different name? |
There was a problem hiding this comment.
I don't think there is an existing mz_object_arrangement_sizes view!
There was a problem hiding this comment.
This is left over from my earlier draft that I was working with claude on this design doc based on my POC.
| 2. **Collection interval tuning.** The 1-hour default balances history granularity against | ||
| table growth. Should this be shorter (e.g., 15 minutes) for environments that need | ||
| finer-grained trending? |
There was a problem hiding this comment.
Should definitely be a dyncfg. We'll need that for testing anyway.
Introduces a design for two new system catalog objects in mz_internal: mz_object_arrangement_sizes (live, differential) and mz_object_arrangement_size_history (append-only, 7-day retention). The live table uses the introspection subscribe pattern to aggregate per-object arrangement memory from all replicas without session variables. The history table follows the mz_storage_usage_by_shard collection and pruning pattern for time-range queries. Part of CNS-42.
-Remove cluster_id from history schema; use mz_cluster_replica_history join instead -Optimize subscribe query: inline mz_dataflow_addresses_per_worker directly, single GROUP BY, CTE structure, AGGREGATE INPUT GROUP SIZE hint -Clarify startup cleanup: only prune expired history rows; live table handled by deferred_write -Rewrite pitfalls: dataflow overhead, staleness detection, hourly snapshot gaps, replica lifecycle -Use dyncfgs for collection interval and retention period instead of system variables -Fix stale cluster_id references in usage examples and validation plan
69962d4 to
860374b
Compare
Introduces a design for two new system catalog objects in mz_internal: mz_object_arrangement_sizes (live, differential) and mz_object_arrangement_size_history (append-only, 7-day retention).
The live table uses the introspection subscribe pattern to aggregate per-object arrangement memory from all replicas without session variables. The history table follows the mz_storage_usage_by_shard collection and pruning pattern for time-range queries.
Part of CNS-42
Motivation
Why does this change exist? Link to a GitHub issue, design doc, Slack
thread, or explain the problem in a sentence or two. A reviewer who has
no context should understand why after reading this section.
If this implements or addresses an existing issue, it's enough to link to that:
Closes
Fixes
etc.
Description
What does this PR actually do? Focus on the approach and any non-obvious
decisions. The diff shows the code --- use this space to explain what the
diff can't tell a reviewer.
Verification
How do you know this change is correct? Describe new or existing automated
tests, or manual steps you took.