[da-vinci][server] Fall back to latestProcessedRtPosition when LCRP is absent in Global RT DIV#2712
[da-vinci][server] Fall back to latestProcessedRtPosition when LCRP is absent in Global RT DIV#2712KaiSernLim wants to merge 3 commits intolinkedin:mainfrom
Conversation
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…s absent in Global RT DIV When a leader transition occurs with Global RT DIV enabled, getLeaderPosition() previously returned EARLIEST if no LCRP (divRtCheckpointPosition) had been checkpointed yet — for example, on the first leader transition after the fe ature is enabled on a store. This caused the leader to re-consume the RT topic from the beginning instead of from the position it would have used before Global RT DIV. The fix adds a fallback in getLeaderPosition(): when useCheckpointedDivRtPosition is true but no LCRP is present, the method now falls through to getLatestProcessedRtPosition(), which is the position that leader transitions used before Global RT DIV was introduced. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Fixes an incorrect leader RT subscribe position when Global RT DIV is enabled but the DIV RT checkpoint (LCRP) has not yet been established, preventing unnecessary re-consumption of the RT topic from EARLIEST after a leader transition.
Changes:
- Update
PartitionConsumptionState#getLeaderPosition()to fall back togetLatestProcessedRtPosition()whenuseCheckpointedDivRtPosition=truebut the LCRP isEARLIEST(absent). - Add a unit test covering the three key
getLeaderPosition()cases (DIV disabled, DIV enabled with LCRP, DIV enabled without LCRP). - Ignore
.worktrees/in.gitignore.
Reviewed changes
Copilot reviewed 2 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java | Implements the LCRP-absent fallback to latest processed RT position to avoid replaying the RT topic from the beginning. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java | Adds unit coverage validating the new fallback behavior and existing paths. |
| .gitignore | Adds .worktrees/ ignore entry. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // LCRP not yet established (e.g. first leader transition after Global RT DIV is enabled); | ||
| // fall back to the position that would have been used before Global RT DIV. | ||
| } | ||
| return getLatestProcessedRtPosition(pubSubBrokerAddress); |
There was a problem hiding this comment.
The method-level Javadoc for getLeaderPosition still says that when useCheckpointedDivRtPosition is true the method returns the global DIV checkpoint (LCRP). With this change, if LCRP is absent/EARLIEST the method now falls back to getLatestProcessedRtPosition. Please update the Javadoc to reflect this behavior so callers don’t rely on outdated semantics.
…ores isGlobalRtDivEnabled() now returns false when hybridStoreConfig is absent, even if the version-level flag is set. Global RT DIV consumes from an RT topic, so enabling it on a batch-only store has no meaning and could cause unexpected behavior in code paths gated by the flag. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Problem Statement
Two related gaps in Global RT DIV:
LCRP fallback: When Global RT DIV is enabled and a leader transition occurs, `getLeaderPosition()` takes the LCRP path. If no LCRP has been persisted yet (e.g. first F→L transition after the feature is enabled on a store), it falls straight to `EARLIEST`, re-consuming the entire RT topic. The correct fallback is `getLatestProcessedRtPosition()` — the position the leader would have subscribed at before Global RT DIV existed.
Hybrid guard: `isGlobalRtDivEnabled()` returned `true` for batch-only stores if the version flag happened to be set. Global RT DIV only makes sense for hybrid stores (which consume from an RT topic); enabling it on a batch-only store could trigger unexpected behavior in gated code paths.
Solution
Commit 1 — `PartitionConsumptionState.getLeaderPosition()`: when `useCheckpointedDivRtPosition=true` but LCRP is absent, fall through to `getLatestProcessedRtPosition()` instead of returning `EARLIEST`:
```java
if (useCheckpointedDivRtPosition) {
PubSubPosition lcrp = getDivRtCheckpointPosition(pubSubBrokerAddress);
if (!PubSubSymbolicPosition.EARLIEST.equals(lcrp)) {
return lcrp;
}
// LCRP not yet established; fall back to the pre-Global-RT-DIV subscribe position
}
return getLatestProcessedRtPosition(pubSubBrokerAddress);
```
Commit 2 — `StoreIngestionTask.isGlobalRtDivEnabled()`: add `&& isHybridMode()` guard:
```java
boolean isGlobalRtDivEnabled() {
return isGlobalRtDivEnabled && isHybridMode();
}
```
Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
How was this PR tested?
Does this PR introduce any user-facing or breaking changes?