Global RT DIV: Fix SST key-ordering error on OFFLINE→STANDBY retry#2711
Global RT DIV: Fix SST key-ordering error on OFFLINE→STANDBY retry#2711KaiSernLim wants to merge 4 commits intolinkedin:mainfrom
Conversation
When Global RT DIV is enabled, a follower persists its latest consumed
VT position (LCVP) to the OffsetRecord via SyncVtDivNode so that on
the next OFFLINE→STANDBY transition it can re-subscribe from where it
left off instead of re-reading from EARLIEST.
Two related bugs caused LCVP to be persisted as EARLIEST even when real
VT progress had been made, causing the consumer to re-subscribe from
VT offset 0 on retry. With RocksDB deferred-write (SST) mode active
during batch push, this meant old records from a prior pass were written
into the same open SST file as new records, violating the requirement
that keys be strictly ascending within an SST file — surfacing as:
org.rocksdb.RocksDBException: Keys must be added in strict ascending order
Bug 1 — PartitionTracker.updateOffsetRecord wrote LCVP inside the
per-segment loop (PartitionTracker.java, private overload):
for (Map.Entry<GUID, Segment> entry : getSegments(type).entrySet()) {
// setLatestConsumedVtPosition only called when a segment exists
updateOffsetRecord(type, entry.getKey(), entry.getValue(), offsetRecord);
}
At SOP time, the VT producer has not yet sent any data, so no segment
exists and the loop body never executes. LCVP was therefore never
written to the OffsetRecord for the SOP SyncVtDivNode sync.
Fix: move setLatestConsumedVtPosition unconditionally to the public
updateOffsetRecord(TopicType, OffsetRecord) method, before the loop,
so LCVP is always persisted regardless of segment count.
Bug 2 — syncOffsetFromSnapshotIfNeeded did not guard against syncing
when LCVP is still EARLIEST or when vtDiv has no segments. Even with
Bug 1 fixed, cloneVtProducerStates carries LCVP=EARLIEST from the
consumerDiv when no VT records have been consumed yet. Enqueuing a
SyncVtDivNode in that state writes a useless (or harmful) snapshot.
Fix: after cloning, return early if the snapshot's LCVP equals EARLIEST
or if the VT segment map is empty. There is nothing meaningful to
persist in either case.
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Fixes a batch-push failure when Global RT DIV is enabled by ensuring the follower’s latest-consumed VT position (LCVP) is persisted correctly so retries don’t re-consume from an inconsistent position that can lead to RocksDB SST key-ordering violations.
Changes:
- Persist LCVP to
OffsetRecordfor VT unconditionally (even when no VT segments exist yet). - Add defensive early-return guards in
syncOffsetFromSnapshotIfNeededto avoid syncing snapshots that represent “no meaningful VT progress”.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java | Moves LCVP persistence to the public updateOffsetRecord path so it’s written even with an empty segment map. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java | Adds guards to skip snapshot-based offset syncing when LCVP is EARLIEST or when no VT segments are tracked yet. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Skip sync if no VT producer segments have been tracked yet. There is nothing meaningful | ||
| // to persist — the segment state is empty and writing it would be a no-op at best. | ||
| if (vtDiv.getPartitionStates(PartitionTracker.VERSION_TOPIC).isEmpty()) { | ||
| return; | ||
| } | ||
|
|
There was a problem hiding this comment.
The vtDiv.getPartitionStates(PartitionTracker.VERSION_TOPIC).isEmpty() early-return prevents execSyncOffsetFromSnapshotAsync from running when there are no VT segments yet (e.g., around SOP). With the accompanying PartitionTracker.updateOffsetRecord() change, syncing with an empty segment map is now still meaningful because it will persist the latest consumed VT position (LCVP) to the OffsetRecord. Skipping the sync here can leave the on-disk LCVP at EARLIEST until a segment appears, which seems to undermine the stated goal of resubscribing from the latest consumed VT position on retry. Consider removing this guard, or narrowing it to only skip when LCVP is EARLIEST (which is already handled above).
| // Skip sync if no VT producer segments have been tracked yet. There is nothing meaningful | |
| // to persist — the segment state is empty and writing it would be a no-op at best. | |
| if (vtDiv.getPartitionStates(PartitionTracker.VERSION_TOPIC).isEmpty()) { | |
| return; | |
| } |
| // Skip sync if no VT producer segments have been tracked yet. There is nothing meaningful | ||
| // to persist — the segment state is empty and writing it would be a no-op at best. | ||
| if (vtDiv.getPartitionStates(PartitionTracker.VERSION_TOPIC).isEmpty()) { | ||
| return; |
There was a problem hiding this comment.
vtDiv.getPartitionStates(...) materializes a new map (and ProducerPartitionState objects) via streams. In this method it’s used for an emptiness check and again for logging .size(), which can add avoidable allocations on a hot path. Consider computing it once and reusing it, or exposing/using a lighter-weight way to check whether any VT segments are tracked (e.g., a segment count) so you don’t have to build the full state map just to test emptiness/size.
Two new unit tests covering the bugs fixed in the previous commit: 1. TestPartitionTracker.testUpdateOffsetRecordPersistsLcvpWithNoSegments Verifies that updateOffsetRecord(VERSION_TOPIC, offsetRecord) writes LCVP to the OffsetRecord even when no VT producer segments have been tracked yet (e.g. at SOP time). Before the fix, the per-segment loop never executed with an empty segment map, so LCVP was never written and EARLIEST was left in the OffsetRecord. 2. LeaderFollowerStoreIngestionTaskTest.testSyncOffsetFromSnapshotIfNeededSkipsWh enLcvpIsEarliestOrNoSegments Verifies the two early-exit guards added to syncOffsetFromSnapshotIfNeeded: - LCVP = EARLIEST → execSyncOffsetFromSnapshotAsync is NOT called - LCVP non-EARLIEST but empty segment map → NOT called - LCVP non-EARLIEST and non-empty segment map → called (positive case) Also changes syncOffsetFromSnapshotIfNeeded to access consumerDiv through getConsumerDiv() instead of the field directly, enabling the spy-based test to stub the accessor. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Global RT DIV tracks RT producer segment/position state so that followers can pick up real-time ingestion where they left off on retry. This concept is inherently tied to hybrid stores — a batch-only store never receives real-time writes, so enabling Global RT DIV there is meaningless and could cause unexpected behavior (e.g. consumerDiv being used instead of drainerDiv, shouldSyncOffset returning false, getLocalVtSubscribePosition returning the consumed VT position instead of the processed one). Change: gate isGlobalRtDivEnabled() on isHybridMode() in addition to the store-level version flag. Also adds testIsGlobalRtDivEnabledRequiresHybridMode to verify: - flag=true + non-hybrid → returns false - flag=true + hybrid → returns true Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 6 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * At that point no VT producer segment has been opened yet, so the snapshot has LCVP=EARLIEST and an empty segment | ||
| * map. Allowing the sync to proceed in this state would stamp EARLIEST into the OffsetRecord, causing the follower | ||
| * to re-subscribe from VT offset 0 on the next Helix OFFLINE→STANDBY retry — leading to out-of-order SST key | ||
| * writes and a RocksDBException during batch push. |
There was a problem hiding this comment.
This test comment says the START_OF_PUSH snapshot has LCVP=EARLIEST, but in the production path updateLatestConsumedVtPosition(..., record.getPosition()) is invoked for each consumed VT record before syncOffsetFromSnapshotIfNeeded runs, so LCVP at SOP is typically an actual offset (often 0) rather than the symbolic EARLIEST. Consider rewording the comment to reflect when LCVP can legitimately still be EARLIEST (e.g., before any VT record is processed / partition tracker not yet updated).
| * At that point no VT producer segment has been opened yet, so the snapshot has LCVP=EARLIEST and an empty segment | |
| * map. Allowing the sync to proceed in this state would stamp EARLIEST into the OffsetRecord, causing the follower | |
| * to re-subscribe from VT offset 0 on the next Helix OFFLINE→STANDBY retry — leading to out-of-order SST key | |
| * writes and a RocksDBException during batch push. | |
| * If no VT record has been processed yet, or the partition tracker has not recorded VT progress yet, the snapshot | |
| * can still have LCVP=EARLIEST. Separately, at START_OF_PUSH no VT producer segment may have been opened yet, so | |
| * the snapshot's segment map can still be empty. Allowing the sync to proceed in either of these not-yet-meaningful | |
| * states would persist incomplete VT progress into the OffsetRecord, causing the follower to re-subscribe from an | |
| * incorrect VT offset on the next Helix OFFLINE→STANDBY retry — leading to out-of-order SST key writes and a | |
| * RocksDBException during batch push. |
Summary
Fixes a bug where enabling Global RT DIV causes batch push to fail with:
Background
When Global RT DIV is enabled, a follower tracks its latest consumed VT
position (LCVP) and periodically persists it to the OffsetRecord via
SyncVtDivNode. On each OFFLINE→STANDBY transition (including Helixretries with
RETRY_COUNT=3),getLocalVtSubscribePosition()reads LCVPfrom the OffsetRecord and uses it as the starting consume position for the
VT. This lets the follower resume from where it left off rather than
re-reading from the beginning of the topic.
With Global RT DIV off, the analogous field is
latestProcessedVtPosition,which is checkpointed via
updateOffsetMetadataInOffsetRecordinside thenormal
syncOffsetdrainer path. Critically,latestProcessedVtPositionand the RocksDB SST checkpoint are always written together in the same
syncOffsetcall — so they are always consistent on disk. A crash orfailed push just rolls both back to the last sync point, and re-subscribing
from that point writes new SST data cleanly in order.
With Global RT DIV on,
shouldSyncOffsetalways returnsfalse— thedrainer sync path is completely disabled. LCVP is only persisted through
SyncVtDivNode, which callsupdateAndSyncOffsetFromSnapshot→PartitionTracker.updateOffsetRecord→syncOffset. This separationintroduced two bugs that caused LCVP to be stamped as
EARLIESTeven afterreal VT progress had been made.
Bug 1:
PartitionTracker.updateOffsetRecord— LCVP not written when segment count is zeroSyncVtDivNodefires for every non-segment control message, includingSTART_OF_PUSH. At SOP time, the VT producer has not yet sent any user data,so no VT producer segment has been opened yet. The call chain was:
Inside
updateOffsetRecord,setLatestConsumedVtPositionwas called onlyinside the per-segment loop (private overload):
With zero segments the loop body never executes, so LCVP is never written to
the OffsetRecord.
cloneVtProducerStatesdoes copy the LCVP into thesnapshot (
destProducerTracker.updateLatestConsumedVtPosition(...)), butthat LCVP value never makes it into the OffsetRecord when segments are absent.
syncOffsetthen persists the OffsetRecord withLCVP = EARLIESTto disk.Fix: move
setLatestConsumedVtPositionto the publicupdateOffsetRecord(TopicType, OffsetRecord)method, unconditionally beforethe loop:
Bug 2:
syncOffsetFromSnapshotIfNeeded— syncing when LCVP is EARLIEST or segments are emptyEven with Bug 1 fixed,
consumerDivstarts withlatestConsumedVtPosition = EARLIESTand no VT segments. If
shouldSyncOffsetFromSnapshotreturnstruebefore theconsumer has made any meaningful VT progress (e.g. SOP triggers
isNonSegmentControlMessage),cloneVtProducerStatesproduces a snapshotwhose LCVP is EARLIEST and whose segment map is empty. Enqueueing that
snapshot as a
SyncVtDivNodeis either harmful (persisting EARLIEST to disk)or a no-op (empty segment map).
Fix: return early from
syncOffsetFromSnapshotIfNeededafter cloning ifeither condition holds:
How the SST ordering error manifests
With both bugs in place, the failure chain during batch push is:
from a prior failed attempt).
shouldSyncOffsetFromSnapshotreturnstrue(SOP is anon-segment control message).
SyncVtDivNodeis enqueued.SyncVtDivNode.execute()runs. vtDiv has 0 segments → Bug 1 means LCVP isnever written → OffsetRecord is persisted with
LCVP = EARLIEST.drainer fails (or one from a prior retry's data is re-consumed), the
lastRecordPersistedFuturecompletes exceptionally.SyncVtDivNodeinstances are skipped(
isCompletedExceptionally()check inSyncVtDivNode.execute()), so theSST file is never closed/synced.
getLocalVtSubscribePosition()reads LCVP = EARLIEST from disk.The consumer re-subscribes from VT offset 0.
same open SST file that already contains data from further into the
push (because the SST was never closed after step 5).
SST →
SstFileWriter.put()throwsRocksDBException: Keys must be added in strict ascending order.The two fixes together eliminate the root cause (Bug 1) and add a defensive
early-exit guard (Bug 2) to prevent the useless/harmful sync in the first place.
Why the same issue doesn't occur with Global RT DIV off
With Global RT DIV off:
getLocalVtSubscribePosition()useslatestProcessedVtPosition(not LCVP).latestProcessedVtPositionis checkpointed viaupdateOffsetMetadataInOffsetRecordinside
syncOffset.latestProcessedVtPositionare always written in thesame
syncOffsetcall, so they are always consistent on disk.checkDatabaseIntegrityuses the SST checkpoint to remove any uncommittedSST data, then re-subscription from the matching VT position writes new SST
data cleanly in order.
With Global RT DIV on (pre-fix), LCVP and the SST checkpoint were written via
separate code paths that could disagree: the SOP
SyncVtDivNodewrote anempty SST checkpoint (correct — no user data yet) but stamped EARLIEST as LCVP
(wrong — should reflect the SOP offset). This inconsistency is what allowed the
consumer to re-subscribe from the beginning of the topic while the SST was in a
mid-push state.
Testing Done
PartitionTrackerTest— existing tests pass; the LCVP-is-always-writteninvariant is exercised by tests that call
updateOffsetRecordwith an emptysegment map.
LeaderFollowerStoreIngestionTaskTest— all 56 tests pass after adding theearly-exit guards.
triggered this investigation (
cert-chunking_v1366-5).