[BugFix] Honor per-snapshot schema for Iceberg time travel reads#74711
[BugFix] Honor per-snapshot schema for Iceberg time travel reads#74711GavinMar wants to merge 1 commit into
Conversation
Time travel queries (VERSION/TIMESTAMP AS OF) resolved the target snapshot only to select data files, while column resolution, the descriptor sent to BE, and scan planning all used the latest table schema. After a schema change (e.g. a column rename), such queries returned the current column names instead of the schema bound to the targeted snapshot, diverging from Iceberg semantics. Resolve the query period once during analysis, pin the resolved version range on the table relation, and rebind the IcebergTable to the snapshot schema via an immutable per-query copy. Convert pushdown predicates against the same schema and rebind partition specs to the scan schema inside StarRocksIcebergTableScan so partition, metrics, residual, and manifest evaluators bind consistently. Signed-off-by: GavinMar <yangguansuo@starrocks.com>
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[FE Incremental Coverage Report]❌ fail : 4 / 7 (57.14%) file detail
|
[BE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3e8c5c5528
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for (PartitionField field : getNativeTable().spec().fields()) { | ||
| if (currentSchema.findColumnName(field.sourceId()) != null | ||
| && schema.findColumnName(field.sourceId()) == null) { | ||
| throw new StarRocksConnectorException( |
There was a problem hiding this comment.
Don't reject older snapshots after partition evolution
For a table that was readable at an older snapshot before a later ALTER TABLE ... ADD COLUMN p/partition-spec replacement on p, this check rejects VERSION AS OF that older snapshot solely because the current partition spec references a field absent from the snapshot schema. Iceberg snapshots are planned with the specs attached to their manifests, and this change already carries per-scan specs elsewhere, so the current spec should not make historical snapshots unusable; users time-travelling to pre-evolution snapshots will now get Time travel is not supported instead of the old rows.
Useful? React with 👍 / 👎.
Why I'm doing:
Iceberg time travel queries (
VERSION AS OF/TIMESTAMP AS OF) resolved the target snapshot only to select data files, while column resolution, the result-set metadata, the descriptor sent to BE, and scan planning all used the latest table schema. After a schema change (e.g. a column rename), a time-travel read returned the current column names instead of the schema bound to the targeted snapshot:This diverges from Iceberg semantics. The snapshot's
schema-idis defined by the Iceberg spec as "ID of the table's current schema when the snapshot was created", and the Iceberg reference implementation (SnapshotUtil.schemaFor,DataScan#useSnapshotSchema), other query engines read snapshot-id/timestamp/tag time travel with the snapshot's schema.What I'm doing:
Resolve the query period once during analysis and bind the snapshot schema through the whole read path:
QueryAnalyzer: after table resolution, resolve the query period to a version range (via the newQueryPeriodResolver, extracted fromRelationTransformerso both layers share one implementation), pin the resolved range on theTableRelationso the transformer does not resolve it again, and, when the targeted snapshot's schema differs from the current one, replace the table with an immutable per-query copy created byIcebergTable#withReadSchema.IcebergTable:withReadSchemarebuilds the StarRocks-side full schema from the snapshot schema;getEffectiveIcebergSchemafeeds partition/bucket column resolution andtoThrift(columns,iceberg_schema, partition expressions and source column names) so FE and BE see one consistent schema. Queries whose current partition spec references a column missing from the snapshot schema are rejected with a clear error.IcebergMetadata: convert pushdown predicates against the effective (snapshot) schema, matching the schema the scan binds expressions against.StarRocksIcebergTableScan: the scan schema is the snapshot schema for time travel (DataScan#useSnapshotSchema), while table partition specs stay bound to the current schema. Rebind specs to the scan schema (scanSpecsById) for the residual/partition/metrics evaluators,DeleteFileIndex,ManifestGroup, and manifest filtering, so filter expressions bind consistently. A spec that cannot be rebound degrades to no pruning with the full filter kept as residual.Branch semantics: a branch reference is resolved to its head snapshot and read with that snapshot's schema, same as Trino. Spark instead reads branches with the current table schema (
SnapshotUtil.schemaFor(table, ref)); snapshot-id, timestamp, and tag reads behave identically across all three engines.Verification:
test_timetravel_snapshot_schema) cover tag/branch reads, old/new column name resolution, predicates on renamed columns, and a partitioned table whose partition source column was renamed.What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: