Skip to content

[da-vinci][server] Add bounded hot transient record cache for large records during ingestion#2700

Open
sixpluszero wants to merge 2 commits intolinkedin:mainfrom
sixpluszero:jlliu/thinking-cache
Open

[da-vinci][server] Add bounded hot transient record cache for large records during ingestion#2700
sixpluszero wants to merge 2 commits intolinkedin:mainfrom
sixpluszero:jlliu/thinking-cache

Conversation

@sixpluszero
Copy link
Copy Markdown
Contributor

Problem Statement

During Active-Active replication and Write Compute ingestion, the transient record map in PartitionConsumptionState holds records only until the drainer processes them. For large/chunked values, once evicted from the transient map, the ingestion pipeline must perform expensive RocksDB lookups to re-fetch the record. This is especially costly when the same key is accessed repeatedly across consumer poll boundaries.

Solution

Introduce a shared, bounded Caffeine cache per StoreIngestionTask that retains large transient records across consumer poll boundaries, acting as a second-level lookup after the per-partition transient record map.

Key design:

  • Version-level flag (transientRecordCacheEnabled) controls whether the cache is active for a given store version
  • Host-level kill switch (server.ingestion.transient.record.cache.enabled, default true) allows operators to disable the cache on a node immediately without requiring a new push
  • Weight-based eviction via Caffeine (server.ingestion.transient.record.cache.max.weight, default 32MB)
  • Admission gating — only records with value size >= threshold are admitted (server.ingestion.transient.record.cache.min.value.size, default 100KB)
  • Partition-safe — cache keys are prefixed with partition ID to avoid cross-partition collisions in the shared cache
  • Staleness protection — when a key is overwritten with a smaller value or null-value update, the stale hot cache entry is invalidated

Code changes

  • Added new code behind a config:
    • transientRecordCacheEnabled (version-level, default false)
    • server.ingestion.transient.record.cache.enabled (server-level, default true)
    • server.ingestion.transient.record.cache.max.weight (server-level, default 33554432 / 32MB)
    • server.ingestion.transient.record.cache.min.value.size (server-level, default 102400 / 100KB)
  • Introduced new log lines.
    • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues. The Caffeine cache is thread-safe; hit counter uses AtomicLong.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList).
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination.

How was this PR tested?

  • New unit tests added.
    • Hot cache retention after transient map removal
    • Admission threshold filtering (small values excluded)
    • Null hot cache backward compatibility
    • Two-partition shared cache key isolation
    • Cache invalidation on small-value overwrite
    • Cache invalidation on null-value overwrite
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility (if applicable). New Avro schema v42 with default values.

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

sixpluszero and others added 2 commits April 3, 2026 23:08
…ecords

during ingestion

Introduce a shared Caffeine cache per StoreIngestionTask that retains large
transient records across consumer poll boundaries, reducing expensive DB
lookups
for chunked values in AA-replication and write-compute stores. The cache is
gated
by a per-store flag (transientRecordCacheEnabled) and server-level configs for
max weight (default 32MB) and minimum value size (default 100KB).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…and add

host-level kill switch

- Move transientRecordCacheEnabled from Store interface to Version interface
so
  the cache can be toggled per version without affecting other versions.
- Add SERVER_INGESTION_TRANSIENT_RECORD_CACHE_ENABLED server config (default
true)
  as a host-level kill switch to disable the cache on a node without a new
push.
- Fix hot cache staleness: invalidate hot cache entry when a key is
overwritten
  with a value below the size threshold or a null-value update.
- Add tests for cache invalidation on small-value and null-value overwrites.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 6, 2026 22:25
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a bounded “hot” transient record cache (Caffeine) to retain frequently-accessed large records across consumer poll boundaries during ingestion (primarily for A/A replication and write-compute), reducing expensive RocksDB lookups after the per-partition transient map entry is drained.

Changes:

  • Add a version-level flag (transientRecordCacheEnabled) to the store metadata schema and bump the meta system store protocol to v42.
  • Add server-side configs (enable switch, max weight, min admission size) and instantiate a per-StoreIngestionTask shared Caffeine cache passed into PartitionConsumptionState.
  • Add unit tests covering cache retention, admission gating, invalidation behavior, and cross-partition key isolation; add a host-level metric hook for cache hits.

Reviewed changes

Copilot reviewed 10 out of 11 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
internal/venice-common/src/main/resources/avro/StoreMetaValue/v42/StoreMetaValue.avsc Adds transientRecordCacheEnabled to StoreVersion with default false.
internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java Bumps meta system store value schema protocol version to 42.
internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java Implements getter/setter for the new version-level flag.
internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java Adds new Version interface methods for transient record cache enablement.
internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java Introduces server config keys for cache enablement, max weight, and min value size.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java Adds unit tests validating hot-cache behavior and invalidation rules.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java Adds a sensor + recording method for hot-cache hit counts.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java Creates/configures the shared Caffeine cache and passes it into PCS instances.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java Adds hot-cache admission/invalidation and fallback lookup logic; tracks hit count.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java Wires server properties into new config getters.
build.gradle Updates schema compatibility override pin from StoreMetaValue v40 to v42.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +332 to +334
boolean isTransientRecordCacheEnabled();

void setTransientRecordCacheEnabled(boolean transientRecordCacheEnabled);
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding new abstract methods to the Version interface requires updating all Version implementers. ReadOnlyStore.ReadOnlyVersion (internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java) still implements Version and does not implement isTransientRecordCacheEnabled/setTransientRecordCacheEnabled, which will break compilation. Please either add default implementations in Version (e.g., return false / throw UnsupportedOperationException for setters) or update ReadOnlyVersion to delegate/throw accordingly.

Suggested change
boolean isTransientRecordCacheEnabled();
void setTransientRecordCacheEnabled(boolean transientRecordCacheEnabled);
default boolean isTransientRecordCacheEnabled() {
return false;
}
default void setTransientRecordCacheEnabled(boolean transientRecordCacheEnabled) {
throw new UnsupportedOperationException("setTransientRecordCacheEnabled is not supported");
}

Copilot uses AI. Check for mistakes.
Comment on lines +534 to +538
this.hotRecordCacheHitCountSensor = registerPerStoreAndTotalSensor(
"hot_record_cache_hit_count",
totalStats,
() -> totalStats.hotRecordCacheHitCountSensor,
new OccurrenceRate());
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hotRecordCacheHitCountSensor/recordHotRecordCacheHitCount() are added but there are currently no call sites recording this metric (search shows only this file). As a result the metric will stay at 0 and the code is effectively dead. Please wire it into the ingestion path (e.g., when a transient record lookup falls back to the hot cache) or remove the sensor/method until it’s recorded.

Suggested change
this.hotRecordCacheHitCountSensor = registerPerStoreAndTotalSensor(
"hot_record_cache_hit_count",
totalStats,
() -> totalStats.hotRecordCacheHitCountSensor,
new OccurrenceRate());
/**
* This metric is intentionally not registered until there is a real ingestion-path call site that records it.
* Registering an {@link OccurrenceRate} sensor without any recording path would expose a permanently-zero,
* effectively dead metric.
*/
this.hotRecordCacheHitCountSensor = null;

Copilot uses AI. Check for mistakes.
Comment on lines +817 to +825
private ByteArrayKey buildHotCacheKey(byte[] key) {
int partition = getPartition();
byte[] compositeKey = new byte[4 + key.length];
compositeKey[0] = (byte) (partition >>> 24);
compositeKey[1] = (byte) (partition >>> 16);
compositeKey[2] = (byte) (partition >>> 8);
compositeKey[3] = (byte) partition;
System.arraycopy(key, 0, compositeKey, 4, key.length);
return ByteArrayKey.wrap(compositeKey);
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildHotCacheKey allocates a new byte[] and ByteArrayKey on every hot-cache access. Since getTransientRecord() is on the ingestion hot path, this per-lookup allocation can add measurable GC pressure when the cache is enabled. Consider using a dedicated key object that combines partition + key without copying (similar to the existing ByteArrayKey assumption), or otherwise reducing allocations (e.g., caching a partition prefix and hashing partition into the key wrapper).

Copilot uses AI. Check for mistakes.
sixpluszero added a commit that referenced this pull request Apr 7, 2026
Two upcoming features need new version-level configs to be added to the Avro protocol schemas:

1. Transient Record Cache (PR #2700): A bounded hot transient record cache for large records during A/A ingestion.
2. Merged Value-RMD Column Family (PR #2528): Merging value and RMD into a single column family to reduce read amplification during A/A ingestion.
These features require protocol schema changes to be landed first, before the implementation PRs can use them.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants