Phase 5 introduced high-level optimizations to reduce network latency and enable complex multi-shard query patterns.
Optimized query routing based on partitioning keys.
- Predicate Analysis: Detects filters on sharding keys (e.g.,
WHERE id = 100). - Targeted Dispatch: Routes fragments only to the specific node owning the shard, avoiding cluster-wide broadcasts.
Implemented coordination for distributed analytics.
- Partial Aggregation: Data nodes compute local counts and sums.
- Global Merge: The coordinator identifies aggregate functions in the SELECT list and merges partial results from all shards into a final result set.
Developed a prototype for cross-shard JOINs.
- Table Fetching: Coordinator retrieves full data from a smaller table across all shards.
- Broadcasting: Pushes the gathered data to the
ShuffleBufferof every node in the cluster. - Local Execution: Rewrites the query so each node joins its local shard with the broadcasted buffer data.
Enabled inter-node data movement.
- BufferScanOperator: A physical operator that reads from in-memory shuffle buffers instead of heap files.
- ClusterManager Buffering: Thread-safe staging area for data received via
PushDataRPCs.
- Broadcast joins are highly effective for small-to-large table joins but require careful consideration of coordinator memory limits.
- Merging aggregates at the coordinator is a bottleneck for very large clusters; future work could explore tree-based merging.
All scenarios, including distributed transactions (2PC) and join orchestration, have been verified with automated integration tests.
The vectorized execution engine was wired into QueryExecutor via set_parallel(true) mode, enabling SELECT queries to optionally use the vectorized batch path:
QueryExecutor::set_parallel(true)— enables vectorized batch executionQueryExecutor::set_storage_manager()— provides StorageManager for ColumnarTable lookupsbuild_vectorized_plan()— constructs operator tree (Scan → Filter → HashJoin → GroupBy → Project)execute_select()— branches onuse_vectorizedflag between Volcano (tuple) and vectorized (batch) paths- Join type support:
VectorizedHashJoinOperatorsupports INNER, LEFT, RIGHT, and FULL outer joins viaJoinTypeenum. RIGHT and FULL outer joins useright_matched_bitmap andemit_unmatched_right_rows()to emit unmatched right rows at end of probe. - Constraint: Sort/Limit queries fall back to Volcano path since SortOperator/LimitOperator don't inherit from VectorizedOperator
The flag-based chooser (parallel_ && storage_manager_ && !has_sort_or_limit) was replaced with a cost-based heuristic using per-table statistics:
ANALYZE TABLE— single-pass scan collects min/max/NDV/null_count stats stored inColumnInfo(catalog)RowEstimator(optimizer/row_estimator.cpp) — row count estimation from column statisticskVectorizedRowThreshold = 10000— heuristic: Vectorized batch execution outperforms Volcano above ~10k rows- Chooser guard — checks
ExprType::Columnto skip estimation for JOINs, subqueries, and aliased tables (Volcano fallback handles these correctly) - Text NDV — 64-char prefix truncation limits memory; note that long shared prefixes are underestimated