[WIP][server][protocol] Add accurate unique key count tracking for store versions#2676
[WIP][server][protocol] Add accurate unique key count tracking for store versions#2676m-nagarajan wants to merge 6 commits intolinkedin:mainfrom
Conversation
# Conflicts: # clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java
There was a problem hiding this comment.
Pull request overview
Adds exact per-partition unique key count tracking for store versions (persisted with offsets and exported via OTel), plus optional batch RMD sentinel behavior for hybrid A/A batch pushes.
Changes:
- Introduces
uniqueKeyCountpersistence viaPartitionStateschema v21 +OffsetRecord, and in-memory tracking viaPartitionConsumptionState. - Propagates key-count deltas from leaders to followers via a VT
"kcs"header, and adds VeniceWriter overloads to pass custom PubSub headers. - Exposes the count as an OTel
ASYNC_GAUGEmetric dimensioned byVERSION_ROLEandREPLICA_TYPE, and updates TTL filtering to treat ts=0 RMD as a non-filterable sentinel.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Adds unit tests asserting custom "kcs" headers are emitted on put/delete records. |
| internal/venice-common/src/main/resources/avro/PartitionState/v21/PartitionState.avsc | Adds schema v21 including persisted uniqueKeyCount field (default -1). |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Adds header-passing overloads for delete/put-large manifest emission to propagate "kcs" signals. |
| internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java | Bumps PartitionState protocol version to 21. |
| internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java | Initializes and exposes persisted uniqueKeyCount through OffsetRecord. |
| internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java | Adds config keys for batch RMD sentinel + batch counting + hybrid RT key count tracking. |
| clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceKafkaInputTTLFilter.java | Adds tests ensuring ts=0 sentinel RMD is not TTL-filtered. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/common/VeniceRmdTTLFilter.java | Implements ts=0 sentinel exemption for value-level RMD timestamp TTL filtering. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java | Updates expected server metric entity count for the new OTel gauge. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStatsTest.java | Adds tests validating unique key count gauge callback behavior and emission. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntityTest.java | Adds expectation for the new ingestion.unique_key_count metric entity. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/UniqueKeyCountTest.java | Adds PCS/OffsetRecord/unit-level tests for unique key count semantics and header encoding. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/UniqueKeyCountMockTest.java | Adds mock-level tests covering schema/message-type filtering and hybrid/batch guards. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/UniqueKeyCountIntegrationTest.java | Adds lifecycle/persistence + OTel emission “integration-style” tests for unique key count. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java | Updates method signatures for producing to VT to include optional headers. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java | Registers new async gauge metric with VERSION_ROLE + REPLICA_TYPE dimensions. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntity.java | Defines the new UNIQUE_KEY_COUNT OTel metric entity. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java | Persists uniqueKeyCount on syncOffset; adds batch counting + follower "kcs" application logic. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java | Adds AtomicLong uniqueKeyCount with batch finalize + increment/decrement helpers and restore from checkpoint. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java | Adds batch RMD sentinel writes for hybrid stores and leader-side "kcs" signal computation + header propagation. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java | Wires new feature flags into server config. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| emptyPartitionState.upstreamVersionTopicPubSubPosition = PubSubSymbolicPosition.EARLIEST.toWireFormatBuffer(); | ||
| emptyPartitionState.lastConsumedVersionTopicPubSubPosition = PubSubSymbolicPosition.EARLIEST.toWireFormatBuffer(); |
There was a problem hiding this comment.
getEmptyPartitionState() sets lastConsumedVersionTopicPubSubPosition twice (lines 111 and 113) and also assigns upstreamVersionTopicPubSubPosition twice (lines 105 and 112). This duplication is redundant and makes it harder to reason about the intended defaults; remove the extra assignments so each field is initialized once.
| emptyPartitionState.upstreamVersionTopicPubSubPosition = PubSubSymbolicPosition.EARLIEST.toWireFormatBuffer(); | |
| emptyPartitionState.lastConsumedVersionTopicPubSubPosition = PubSubSymbolicPosition.EARLIEST.toWireFormatBuffer(); |
| PubSubMessageHeaders headers = | ||
| (pubSubMessageHeaders != null) ? pubSubMessageHeaders : EmptyPubSubMessageHeaders.SINGLETON; | ||
| CompletableFuture<PubSubProduceResult> produceResultFuture; | ||
| synchronized (this.partitionLocks[partition]) { | ||
| produceResultFuture = sendMessage( |
There was a problem hiding this comment.
This new overload forwards the caller-provided PubSubMessageHeaders instance into sendMessage(), which can append additional headers (e.g., view partition header) by mutating the same object. If callers reuse a headers instance across records, this can leak headers between messages. Consider making a defensive copy of the provided headers (or explicitly documenting that the writer may mutate the provided instance).
| @Test | ||
| public void testChunkFragmentBatchPutReturnsValueType() { | ||
| // Chunk fragment PUT → also returns VALUE, but our code filters by schemaId != CHUNK | ||
| doReturn(false).when(ingestionTask).isDaVinciClient(); | ||
| doReturn(false).when(pcs).isEndOfPushReceived(); |
There was a problem hiding this comment.
testChunkFragmentBatchPutReturnsValueType doesn't execute any production code path or contain assertions, so it will always pass and doesn't validate the intended behavior. Add an assertion (e.g., call getStorageOperationTypeForPut(...) and verify the returned type / expected exception) or remove the test.
| assert !key.contains("_") : "Config key should use dots, not underscores: " + key; | ||
| assert key.startsWith("server.") : "Config key should start with 'server.': " + key; |
There was a problem hiding this comment.
This test uses Java assert statements for validation. Since JVM assertions are typically disabled in test runs unless -ea is set, these checks may be skipped silently. Use TestNG assertions (Assert.assertTrue/False) so the test always enforces the expectation.
| assert !key.contains("_") : "Config key should use dots, not underscores: " + key; | |
| assert key.startsWith("server.") : "Config key should start with 'server.': " + key; | |
| Assert.assertFalse(key.contains("_"), "Config key should use dots, not underscores: " + key); | |
| Assert.assertTrue(key.startsWith("server."), "Config key should start with 'server.': " + key); |
| synchronized (this.partitionLocks[partition]) { | ||
| produceResultFuture = sendMessage( | ||
| producerMetadata -> kafkaKey, | ||
| MessageType.DELETE, | ||
| delete, | ||
| false, | ||
| partition, | ||
| callback, | ||
| true, | ||
| leaderMetadataWrapper, | ||
| logicalTs, | ||
| headers); | ||
| } |
There was a problem hiding this comment.
sendMessage(...) already synchronizes on partitionLocks[partition], so synchronizing again around the call here is redundant (the lock is re-entrant but the extra block adds noise and can mislead readers into thinking additional ordering is required). Consider removing the outer synchronized (this.partitionLocks[partition]) block and rely on sendMessage's internal per-partition locking.
| synchronized (this.partitionLocks[partition]) { | |
| produceResultFuture = sendMessage( | |
| producerMetadata -> kafkaKey, | |
| MessageType.DELETE, | |
| delete, | |
| false, | |
| partition, | |
| callback, | |
| true, | |
| leaderMetadataWrapper, | |
| logicalTs, | |
| headers); | |
| } | |
| produceResultFuture = sendMessage( | |
| producerMetadata -> kafkaKey, | |
| MessageType.DELETE, | |
| delete, | |
| false, | |
| partition, | |
| callback, | |
| true, | |
| leaderMetadataWrapper, | |
| logicalTs, | |
| headers); |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 23 out of 23 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // wasAlive: did a live value exist before this write? (old value != null) | ||
| // isAlive: does a live value exist after resolution? (merge result != null) | ||
| PubSubMessageHeaders vtHeaders = EmptyPubSubMessageHeaders.SINGLETON; | ||
| if (uniqueKeyCountForHybridStoreEnabled && partitionConsumptionState.getUniqueKeyCount() >= 0) { | ||
| boolean wasAlive = (mergeConflictResultWrapper.getOldValueByteBufferProvider().get() != null); | ||
| boolean isAlive = (mergeConflictResult.getNewValue() != null); |
There was a problem hiding this comment.
Unique key count signal computation forces evaluation of mergeConflictResultWrapper.getOldValueByteBufferProvider().get() to derive wasAlive. Since this provider is backed by a lazy value lookup, this can introduce an extra storage read on RT writes where conflict resolution could have been decided from RMD alone. Consider deriving wasAlive from already-available metadata (e.g., tombstone state in RMD) or carrying an explicit “old value existed” flag from the conflict resolution path, so the signal computation doesn’t defeat the laziness and add per-record I/O.
| synchronized (this.partitionLocks[partition]) { | ||
| return sendMessage( | ||
| manifestKeyProvider, | ||
| MessageType.PUT, | ||
| manifestPayload, | ||
| false, | ||
| partition, | ||
| callback, | ||
| true, | ||
| leaderMetadataWrapper, | ||
| logicalTs, | ||
| manifestHeaders); | ||
| } |
There was a problem hiding this comment.
Redundant synchronization: this method synchronizes on partitionLocks[partition] and then calls the private sendMessage(...) overload which synchronizes on partitionLocks[partition] again. Since the inner sendMessage already provides the ordering/DIV atomicity guarantees, the outer lock can be removed to reduce overhead and contention.
| synchronized (this.partitionLocks[partition]) { | |
| return sendMessage( | |
| manifestKeyProvider, | |
| MessageType.PUT, | |
| manifestPayload, | |
| false, | |
| partition, | |
| callback, | |
| true, | |
| leaderMetadataWrapper, | |
| logicalTs, | |
| manifestHeaders); | |
| } | |
| return sendMessage( | |
| manifestKeyProvider, | |
| MessageType.PUT, | |
| manifestPayload, | |
| false, | |
| partition, | |
| callback, | |
| true, | |
| leaderMetadataWrapper, | |
| logicalTs, | |
| manifestHeaders); |
| Assert.assertTrue( | ||
| offsetRecord.getUniqueKeyCount() >= 0, | ||
| "Expected unique key count to be non-negative after batch push, but got: " | ||
| + offsetRecord.getUniqueKeyCount()); | ||
| Assert.assertTrue( | ||
| offsetRecord.getUniqueKeyCount() > 0, | ||
| "Expected unique key count to be positive after 100-record batch push, but got: " | ||
| + offsetRecord.getUniqueKeyCount()); |
There was a problem hiding this comment.
This test’s Javadoc says the unique key count should equal 100 for a single-partition push, but the assertions only check >= 0 / > 0. Since this feature is intended to be accurate, asserting the exact expected count (100) would catch drift/regressions (e.g., chunk-fragment miscounting or duplicate counting).
| Assert.assertTrue( | |
| offsetRecord.getUniqueKeyCount() >= 0, | |
| "Expected unique key count to be non-negative after batch push, but got: " | |
| + offsetRecord.getUniqueKeyCount()); | |
| Assert.assertTrue( | |
| offsetRecord.getUniqueKeyCount() > 0, | |
| "Expected unique key count to be positive after 100-record batch push, but got: " | |
| + offsetRecord.getUniqueKeyCount()); | |
| long uniqueKeyCount = offsetRecord.getUniqueKeyCount(); | |
| Assert.assertEquals( | |
| uniqueKeyCount, | |
| 100L, | |
| "Expected unique key count to be exactly 100 after 100-record batch push, but got: " + uniqueKeyCount); |
| // Validate the unique key count increased | ||
| TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { | ||
| OffsetRecord offsetRecord = getOffsetRecord(topicName, 0); | ||
| Assert.assertTrue( | ||
| offsetRecord.getUniqueKeyCount() > batchCount[0], | ||
| "Expected unique key count (" + offsetRecord.getUniqueKeyCount() + ") to be greater than batch count (" | ||
| + batchCount[0] + ") after inserting 10 new keys"); | ||
| }); | ||
|
|
||
| long[] countAfterInserts = new long[1]; | ||
| TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { | ||
| countAfterInserts[0] = getOffsetRecord(topicName, 0).getUniqueKeyCount(); | ||
| Assert.assertTrue(countAfterInserts[0] > batchCount[0]); |
There was a problem hiding this comment.
After inserting 10 new RT keys, the test only asserts the count is greater than the batch baseline. To validate the correctness of the +1/-1 signal pipeline (and not just “some change happened”), this should assert the exact expected delta (baseline + 10) once the RT keys are confirmed readable.
| // Validate the unique key count increased | |
| TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { | |
| OffsetRecord offsetRecord = getOffsetRecord(topicName, 0); | |
| Assert.assertTrue( | |
| offsetRecord.getUniqueKeyCount() > batchCount[0], | |
| "Expected unique key count (" + offsetRecord.getUniqueKeyCount() + ") to be greater than batch count (" | |
| + batchCount[0] + ") after inserting 10 new keys"); | |
| }); | |
| long[] countAfterInserts = new long[1]; | |
| TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { | |
| countAfterInserts[0] = getOffsetRecord(topicName, 0).getUniqueKeyCount(); | |
| Assert.assertTrue(countAfterInserts[0] > batchCount[0]); | |
| // Validate the unique key count increased by exactly 10 after inserting 10 new unique keys | |
| TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { | |
| OffsetRecord offsetRecord = getOffsetRecord(topicName, 0); | |
| long actualCount = offsetRecord.getUniqueKeyCount(); | |
| long expectedCount = batchCount[0] + 10; | |
| Assert.assertEquals( | |
| actualCount, | |
| expectedCount, | |
| "Expected unique key count to be batch count + 10 after inserting 10 new keys, but got: " + actualCount | |
| + " (batch count: " + batchCount[0] + ")"); | |
| }); | |
| long[] countAfterInserts = new long[1]; | |
| TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { | |
| countAfterInserts[0] = getOffsetRecord(topicName, 0).getUniqueKeyCount(); | |
| long expectedCount = batchCount[0] + 10; | |
| Assert.assertEquals( | |
| countAfterInserts[0], | |
| expectedCount, | |
| "Expected post-insert unique key count snapshot to equal batch count + 10 before deletes"); |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 22 out of 22 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Chunk fragments: schemaId == CHUNK -> filtered out by processKafkaDataMessage, NOT counted | ||
| // Manifest: schemaId == CHUNKED_VALUE_MANIFEST -> passes filter, counted |
There was a problem hiding this comment.
doBatchChunked(..., int chunksPerKey) never uses chunksPerKey, and the implementation doesn't actually model chunk fragments vs manifest (it just increments once per logical key). This is misleading given the method name/Javadoc. Either remove the unused parameter and simplify the Javadoc, or loop over simulated chunk fragments (without calling the increment) to make the intent explicit.
| // Chunk fragments: schemaId == CHUNK -> filtered out by processKafkaDataMessage, NOT counted | |
| // Manifest: schemaId == CHUNKED_VALUE_MANIFEST -> passes filter, counted | |
| for (int chunk = 0; chunk < chunksPerKey; chunk++) { | |
| // Chunk fragments: schemaId == CHUNK -> filtered out by processKafkaDataMessage, NOT counted. | |
| // Intentionally no incrementUniqueKeyCountForBatchRecord call here. | |
| } | |
| // Manifest: schemaId == CHUNKED_VALUE_MANIFEST -> passes filter, counted once per logical key. |
Problem Statement
Venice currently lacks an accurate unique key count for store versions. The orthogonal PR (PR #2671) provides only approximate counts of all key activities, but not the actual key count present in the store.
Solution
Adds an exact unique key count that is maintained per-partition in
PartitionConsumptionStatevia anAtomicLongfield, persisted atomically with consumption offsets throughOffsetRecord/PartitionState(schema v21), and exposed as an OTelASYNC_GAUGEmetric withVERSION_ROLEandREPLICA_TYPEdimensions.End-to-End Flow
Phase 1: Batch Push (SOP → EOP) — all replicas
Phase 2a: Default RMD for Hybrid Batch — leader only (A/A hybrid stores)
Phase 2b: RT Signal Computation — leader only (A/A hybrid stores, post-EOP)
Phase 2c: Follower Signal Application (post-EOP)
Persistence & Crash Recovery
OTel Metric Emission
Three independent config flags
Batch key counting (
server.unique.key.count.for.all.batch.push.enabled, defaultfalse): Fallback for PR [common][vpj][da-vinci] Add record count verification for batch push and HLL-based repush validation #2642 (VPJ per-partition record count via EOP headers). All replicas count logical PUTs during batch push (SOP→EOP) with key-order dedup for speculative execution. Chunk fragments filtered. Mid-batch checkpoints capture partial counts. Becomes unnecessary once [common][vpj][da-vinci] Add record count verification for batch push and HLL-based repush validation #2642 ships.Default ts=0 RMD for hybrid batch (
server.add.rmd.to.batch.push.for.hybrid.stores, defaultfalse): During batch push for hybrid A/A stores, writes a default timestamp-zero RMD alongside every batch PUT/DELETE. Ensures every batch key has RMD so DCR avoids theputWithoutRmd()fallback — enabling zero-additional-I/O for field-level/UPDATE stores. TTL filter treats ts=0 as sentinel (BATCH_RMD_SENTINEL_TIMESTAMP).Hybrid RT signal (
server.unique.key.count.for.hybrid.store.enabled, defaultfalse): Leader computes +1/-1 signal from before/after key existence during DCR.wasAlivedetermined from RMD state (ts=0 → batch key alive, rmd null + 2a → new key, ts>0 → resolve old value). Signal propagated to followers via "kcs" VT header.Key design decisions
AtomicLongwith -1 sentinel: -1 = not tracked, >= 0 = active count. The>= 0guard is the single source of truth for "is the count usable?"wasOldValueAlivecaptured eagerly inMergeConflictResultWrapperBEFOREsetTransientRecord()overwrites the transient cache — avoids stale Lazy resolutionwasAliveoptimization: ts=0 → alive (batch key, no value read), rmd null + 2a → dead (no read), ts>0 → resolve old value (transient cache hit or RocksDB)ArrayUtils.compareUnsignedkey-order comparison matching VPJ/RocksDB sort orderBATCH_RMD_SENTINEL_TIMESTAMP = 0Lconstant inRmdConstantsused in producer (AASTI), consumer (TTL filter), and signal computationPubSubMessageHeaderinstances (KEY_CREATED_SIGNAL/KEY_DELETED_SIGNAL) avoid per-record byte array allocationVeniceWriter.delete()andputLargeValue()gain header-passing overloads; manifest-only headers for chunked PUTsEnumMap<ReplicaType, AsyncMetricEntityStateOneEnum<VersionRole>>workaround (pendingAsyncMetricEntityStateTwoEnumsin PR [da-vinci][server] Add OTel metrics to AggVersionedStorageEngineStats #2673)Code changes
server.add.rmd.to.batch.push.for.hybrid.stores(defaultfalse)server.unique.key.count.for.all.batch.push.enabled(defaultfalse)server.unique.key.count.for.hybrid.store.enabled(defaultfalse)RedundantExceptionFilterfor unexpected signal values)Concurrency-Specific Checks
Both reviewer and PR author to verify
AtomicLongfor unique key count; single-threaded drainer for batch counting; key-level lock serializes same-key DCR;wasOldValueAlivecaptured before cache mutation)synchronized,RWLock) are used where needed.ConcurrentHashMap,CopyOnWriteArrayList). (VeniceConcurrentHashMapfor OTel stats maps)How was this PR tested?
UniqueKeyCountTest— PCS operations, batch dedup, production code paths via doCallRealMethod/reflection for putInStorageEngine, removeFromStorageEngine, trackUniqueKeyCount, processMessage, processEndOfPush;IngestionOtelStatsTest— ASYNC_GAUGE callback with leader/follower filtering)UniqueKeyCountScenarioTest— config-driven lifecycle with OTel gauge validation;TestUniqueKeyCount— E2E with real Venice cluster: batch push, hybrid RT, incremental push, chunked store, version swap)ActiveActiveStoreIngestionTaskTest,MergeConflictResultWrapperTest,VeniceWriterUnitTest,TestVeniceKafkaInputTTLFilter,IngestionOtelMetricEntityTest,ServerMetricEntityTest)uniqueKeyCountwith default -1; v20 data deserializes correctly; TTL filter ts=0 sentinel is backward-compatible)161 unit tests + 6 E2E integration tests, all passing.
Does this PR introduce any user-facing or breaking changes?
All three features are behind disabled-by-default config flags. No behavior changes until explicitly enabled.