[vpj] Add VT Consistency Checker Spark Job using Lily Pad algorithm#2691
[vpj] Add VT Consistency Checker Spark Job using Lily Pad algorithm#2691Mohith22 wants to merge 2 commits intolinkedin:mainfrom
Conversation
fa3ef5d to
d43db37
Compare
| LilyPadUtils.Snapshot<ComparablePubSubPosition> dc0Snapshot; | ||
| try (PubSubSplitIterator dc0Iterator = new PubSubSplitIterator( | ||
| createConsumer(dc0Props, topicRepository, "dc0-checker-p" + partition), | ||
| dc0Split, |
There was a problem hiding this comment.
Suggestion: Replace TopicManager with PubSubConsumerAdapter on executor side
Each createTopicManager() call creates a full TopicManager stack: a TopicManagerRepository, a PubSubAdminAdapter (Kafka AdminClient), a TopicMetadataFetcher with its own consumer pool and thread pool. But on the executor side, the only usage is:
LilyPadSnapshotBuilder.buildSnapshot(iterator, topicManager, ...)
→ new ComparablePubSubPosition(position, topicManager, partition)
→ compareTo() → topicManager.comparePosition()
→ topicMetadataFetcher.comparePosition()
→ consumer.comparePositions()
→ offset1 - offset2 (pure arithmetic for Kafka)
The Kafka AdminClient is never used. The consumer pool and thread pool exist only to do offset subtraction.
The consumer already created for PubSubSplitIterator can do this directly via consumer.comparePositions(). The one wrinkle is that PubSubSplitIterator.close() closes the underlying consumer, and the try-with-resources on the iterator closes it before findInconsistencies() runs its comparisons. Fix: manage the consumer lifecycle at the outer scope instead.
This would require ComparablePubSubPosition and LilyPadSnapshotBuilder.buildSnapshot() to accept PubSubConsumerAdapter instead of TopicManager — a small change to the API introduced in PR #2628. Sketch:
PubSubConsumerAdapter dc0Consumer = createConsumer(dc0Props, topicRepository, "dc0-checker-p" + partition);
PubSubConsumerAdapter dc1Consumer = createConsumer(dc1Props, topicRepository, "dc1-checker-p" + partition);
try {
LilyPadUtils.Snapshot<ComparablePubSubPosition> dc0Snapshot;
try (PubSubSplitIterator dc0It = new PubSubSplitIterator(dc0Consumer, dc0Split, false)) {
dc0Snapshot = LilyPadSnapshotBuilder.buildSnapshot(dc0It, dc0Consumer, positionDeserializer, numberOfRegions);
}
LilyPadUtils.Snapshot<ComparablePubSubPosition> dc1Snapshot;
try (PubSubSplitIterator dc1It = new PubSubSplitIterator(dc1Consumer, dc1Split, false)) {
dc1Snapshot = LilyPadSnapshotBuilder.buildSnapshot(dc1It, dc1Consumer, positionDeserializer, numberOfRegions);
}
List<LilyPadUtils.Inconsistency<ComparablePubSubPosition>> found =
LilyPadUtils.findInconsistencies(dc0Snapshot, dc1Snapshot);
// ...
} finally {
Utils.closeQuietlyWithErrorLogged(dc0Consumer);
Utils.closeQuietlyWithErrorLogged(dc1Consumer);
}One caveat: PubSubSplitIterator.close() currently closes the consumer it wraps. With externally managed consumers, the iterator should skip that — either a constructor flag or just not using try-with-resources on the iterator.
On a 20-partition job this eliminates 40 unused Kafka AdminClient connections and 40 thread pools across executors.
The driver-side TopicManager is fine to keep — it adds useful retry logic around beginningPositions/endPositions.
There was a problem hiding this comment.
@haoxu07 What about driver side? Should we keep Topic Manager?
| validateRequiredProps(jobProps); | ||
| run(jobProps); | ||
| } | ||
|
|
There was a problem hiding this comment.
Nit: e.printStackTrace() writes to stderr instead of going through the logger. Rest of the class uses Log4j — consider LOGGER.error("Unable to read config file: {}", args[0], e) for consistency and to respect log routing.
| "Planned {} splits for topic {} from DC0={} and DC1={}", | ||
| partitionCount, | ||
| versionTopic, | ||
| dc0BrokerUrl, |
There was a problem hiding this comment.
DC0's partition count is used to fetch DC1 splits. If DC1 has a different partition count (e.g., topic corruption or partial creation), dc1Splits.get(p) would return null, causing an opaque NPE inside the Spark task. Consider fetching DC1's partition count and asserting equality here — cheap guard for a confusing failure mode.
There was a problem hiding this comment.
Good point! Added the check.
| } | ||
|
|
||
| /** Converts one {@link LilyPadUtils.Inconsistency} to a Spark output row. */ | ||
| static Row toRow(LilyPadUtils.Inconsistency<ComparablePubSubPosition> inc, String versionTopic, int partition) { |
There was a problem hiding this comment.
Minor: 0L is a valid hash value. If a real inconsistency happens at key_hash 0, it could be confused with error sentinel rows by downstream consumers filtering on key_hash. Consider using Long.MIN_VALUE as the sentinel — though the type=ERROR column does distinguish it.
There was a problem hiding this comment.
Since we have type as ERROR - it should be able to distinguish.
| } | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
The unit test covers the ERROR sentinel row, but the integration test only verifies the happy path (detecting an injected inconsistency). Consider adding a case where one DC's broker is unreachable to verify the ERROR row flows through the full Spark pipeline. Could be a follow-up.
|
|
||
| import static com.linkedin.venice.ConfigKeys.NATIVE_REPLICATION_SOURCE_FABRIC; | ||
| import static com.linkedin.venice.ConfigKeys.PARENT_KAFKA_CLUSTER_FABRIC_LIST; | ||
| import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARENT_DATA_CENTER_REGION_NAME; |
There was a problem hiding this comment.
Nit: wildcard import — the unit test file uses explicit imports. Venice OSS generally prefers explicit static imports over import static org.testng.Assert.*.
03f7bba to
6ce1c26
Compare
de8d21f to
e6b2431
Compare
e6b2431 to
f15b3ae
Compare
| // The repository is a thin wrapper (ConcurrentHashMap) with no resources beyond the TopicManagers it holds. | ||
| // The caller closes the returned TopicManager directly. Same pattern as PubSubSplitPlanner. | ||
| return new TopicManagerRepository(context, brokerUrl).getLocalTopicManager(); | ||
| } |
There was a problem hiding this comment.
Critical: TopicManagerRepository leaked.
createTopicManager() creates a new TopicManagerRepository(context, brokerUrl) but never stores or closes it. TopicManagerRepository implements Closeable and its close() shuts down internal thread pools (TopicMetadataFetcher). The caller closes the individual TopicManager via try-with-resources, but the repository object is abandoned.
Suggested fix — return the repository and let the caller manage it:
private static TopicManagerRepository createTopicManagerRepo(...) {
// ... same setup ...
return new TopicManagerRepository(context, brokerUrl);
}
// In run():
try (TopicManagerRepository dc0Repo = createTopicManagerRepo(dc0Props, ...)) {
TopicManager dc0TopicManager = dc0Repo.getLocalTopicManager();
// ...
}There was a problem hiding this comment.
Good catch! Now the method returns TopicManagerRepository which will be closed by try-with-resources.
|
|
||
| LilyPadUtils.Snapshot<ComparablePubSubPosition> dc0Snapshot = LilyPadSnapshotBuilder.buildSnapshot( | ||
| new PubSubSplitIterator(dc0Consumer, dc0PostEopSplit, false), | ||
| dc0Consumer, |
There was a problem hiding this comment.
Suggestion: PubSubSplitIterators never closed — debug stats lost.
The PubSubSplitIterator objects created here are never closed. The comment above correctly explains why consumers must outlive the iterators (for comparePositions() calls during findInconsistencies()). However, PubSubSplitIterator.close() logs valuable read/delivered/skipped stats that are useful for debugging production issues — those stats are silently lost.
Consider storing the iterators as local variables and logging the stats manually after buildSnapshot() returns, or adding a closeWithoutConsumer()-style method to PubSubSplitIterator that logs stats without closing the underlying consumer.
There was a problem hiding this comment.
We already log some info. after calling build snapshot like processed records and unique keys.
| * @throws IllegalStateException if the end of the partition is reached without finding EOP | ||
| */ | ||
| static PubSubPosition findEndOfPushPosition(PubSubConsumerAdapter consumer, PubSubTopicPartition tp) { | ||
| consumer.batchUnsubscribe(consumer.getAssignment()); |
There was a problem hiding this comment.
Critical: Single empty poll causes immediate failure — needs retry.
A single empty poll(5000) (transient broker hiccup, slow consumer, network blip) causes an immediate IllegalStateException. In production this would fail the entire partition unnecessarily.
PubSubSplitIterator already handles this with emptyRetryTimes + emptySleepMs. Suggest the same pattern here:
int emptyPolls = 0;
int maxEmptyPolls = 3;
while (true) {
List<DefaultPubSubMessage> messages = polled.getOrDefault(tp, Collections.emptyList());
if (messages.isEmpty()) {
if (++emptyPolls >= maxEmptyPolls) {
throw new IllegalStateException("No EOP found in " + tp + " after " + scanned + " records");
}
continue;
}
emptyPolls = 0;
// ... check for EOP
}Also consider adding progress logging every ~100k messages — for a VT with millions of batch records before EOP, this loop runs silently for a long time.
// TODO: optimize with binary search on offsets for large batch stores.
// Batch records have upstreamKafkaClusterId == -1, RT records have >= 0.
// Binary search on this boundary would reduce O(N) to O(log N).
There was a problem hiding this comment.
Ok, we will allow 3 empty polls before exiting the loop.
| @@ -0,0 +1,359 @@ | |||
| package com.linkedin.venice.endToEnd; | |||
There was a problem hiding this comment.
Critical: Consider combining the two integration tests into one more realistic test.
Currently:
- Test 1 (
testSparkJobDetectsInjectedInconsistency) — fully synthetic: empty push + direct VT injection. No real RT flow, no real leader metadata,findEndOfPushPositionbarely exercised (EOP is at offset ~1). - Test 2 (
testSparkJobSkipsBatchPushAndFindsNoInconsistenciesOnConsistentAAWrites) — real batch push + real RT writes via Samza, but expects zero inconsistencies (positive test only).
A single combined test would cover both and be more realistic:
- Batch push via VPJ (5 records) — exercises real EOP, real batch data
- Real RT writes from both DCs via Samza — real leader metadata, real AA replication
- Wait for replication to settle
- Inject one corrupted record for a known key into one DC's VT with a meaningful upstream position (e.g., offset 1, which both DCs' HWs will cover after step 5)
- Write more RT records for other keys to advance HW past the injected position
- Run checker — expect exactly one VALUE_MISMATCH for the corrupted key, zero for all others
This exercises the full pipeline (real EOP scanning, real batch record skipping, real RT leader metadata, real AA replication) while still detecting the injected inconsistency. It also validates that consistent keys are not falsely flagged.
There was a problem hiding this comment.
Combined both the tests.
| } | ||
| } | ||
| SparkSession spark = sparkSessionBuilder.getOrCreate(); | ||
|
|
There was a problem hiding this comment.
Suggestion: SparkSession may leak if code between creation and try block throws.
sparkSessionBuilder.getOrCreate() is at line 197, but the try { ... } finally { spark.stop(); } starts at line 199. If IntStream.range() or spark.sparkContext().longAccumulator() throws between those lines, the SparkSession leaks. Consider moving the session creation inside the try block.
| implementation libraries.rocksdbjni | ||
| implementation libraries.samzaApi | ||
| implementation libraries.spark | ||
| implementation libraries.apacheSparkSql |
There was a problem hiding this comment.
Comment: These deps may be redundant.
venice-test-common already has implementation project(':clients:venice-push-job'), and venice-push-job already declares implementation on apacheSparkSql, antlr4, and antlr4Runtime. With Gradle implementation → implementation, the transitive deps are on the compile classpath. These three new lines may be unnecessary.
If they ARE needed due to a classpath resolution issue, they should carry the same exclusions that venice-push-job/build.gradle applies to apacheSparkSql (Avro, Hadoop, log4j).
There was a problem hiding this comment.
Added the exclusions similar to VPJ.
| * Nullable fields are null when the key is absent in that DC (MISSING type) or on ERROR rows. | ||
| */ | ||
| static final StructType OUTPUT_SCHEMA = new StructType( | ||
| new StructField[] { new StructField("version_topic", DataTypes.StringType, false, Metadata.empty()), |
There was a problem hiding this comment.
nit: toRow() relies on positional indices 0–13 matching this schema definition. Consider adding index comments in toRow() to make the mapping explicit:
return RowFactory.create(
/* 0 version_topic */ versionTopic,
/* 1 vt_partition */ partition,
...…ns to use PubSubConsumerAdapter instead of TopicManager
f15b3ae to
e064c55
Compare
Problem Statement
The Lily Pad algorithm utilities were added in the previous PR. Running the consistency check across VT partitions with millions of keys per partition requires distributed execution for which we need a Spark Job.
Solution
Added VTConsistencyCheckerJob — a Spark job that parallelizes the Lily Pad algorithm across VT partitions. Each Spark task reads one partition from both DCs, builds snapshots, and detects inconsistencies. Results are written as Parquet with forensic context (key hash, value hashes, position vectors, high watermarks, timestamps). Failed partitions emit ERROR rows and continue. Invoked via properties file following Venice's push job pattern.
Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed.ConcurrentHashMap,CopyOnWriteArrayList).How was this PR tested?
Does this PR introduce any user-facing or breaking changes?