[da-vinci][server] Add per-poll record count metric (poll_result_size)#2697
[da-vinci][server] Add per-poll record count metric (poll_result_size)#2697sixpluszero wants to merge 3 commits intolinkedin:mainfrom
Conversation
Add a poll_result_size metric recorded before mini-batch splitting in the leader AA/WC path to capture the true per-poll record count per partition. The existing batch_processing_request_size is recorded per mini-batch after splitting, making it impossible to distinguish small polls from tail batches of larger polls. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…unit tests The poll_result_size metric was only recorded in the AA/WC batch processing path (produceToStoreBufferServiceOrKafkaInBatch). This adds the same metric to the non-batch path (produceToStoreBufferServiceOrKafka) so that poll sizes are captured regardless of whether parallel processing is enabled. Also adds three unit tests covering both paths and the filtered-records edge case. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR adds a new per-poll metric (poll_result_size) to improve visibility into the true number of records returned by each consumer poll (per partition), independent of mini-batch splitting and regardless of whether ingestion runs in the batch (AA/WC parallel) or non-batch path.
Changes:
- Record
poll_result_sizeinStoreIngestionTaskfor both non-batch and batch ingestion flows. - Add Tehuti plumbing in
IngestionStatsand aggregation/delegation inAggVersionedIngestionStats. - Add an OTel metric entity + recording API for
poll_result_size, including a no-op implementation, and unit tests validating behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java | Records per-poll record count in both non-batch and batch ingestion paths. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java | Adds Tehuti sensor and recording method for poll_result_size. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java | Delegates poll-size recording to Tehuti (versioned/total) and OTel. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntity.java | Introduces OTel metric entity definition for poll-size. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java | Wires the new OTel metric into the stats implementation and exposes a record method. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/NoOpIngestionOtelStats.java | Adds no-op override for poll-size recording. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java | Adds unit tests verifying metric emission (non-batch, batch) and suppression when all records are filtered. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int totalRecordsInPoll = 0; | ||
| for (List<DefaultPubSubMessage> batch: batches) { | ||
| totalRecordsInPoll += batch.size(); | ||
| } | ||
| if (totalRecordsInPoll > 0) { | ||
| versionedIngestionStats | ||
| .recordPollResultSize(storeName, versionNumber, totalRecordsInPoll, beforeProcessingBatchRecordsTimestampMs); | ||
| } |
There was a problem hiding this comment.
In the batch path, totalRecordsInPoll is recomputed by iterating over the already-split batches, which adds an extra pass and contradicts the intent of recording the poll size before mini-batch splitting. Consider tracking the count while building batches (or incrementing a counter alongside ongoingBatch.add(record)) so the metric is computed directly from the pre-split poll result without the additional loop.
|
|
||
| POLL_RESULT_SIZE( | ||
| "ingestion.poll_result.size", MetricType.MIN_MAX_COUNT_SUM_AGGREGATIONS, MetricUnit.NUMBER, | ||
| "Total records per poll per partition before mini-batch splitting (leader AA/WC path)", |
There was a problem hiding this comment.
The OTel metric description says this is specific to the "leader AA/WC path", but poll_result_size is recorded in both the batch (AA/WC) and non-batch ingestion paths. Please update the description to reflect the broader coverage (and ideally mention that it counts only records that pass shouldProcessRecord).
| "Total records per poll per partition before mini-batch splitting (leader AA/WC path)", | |
| "Total records per poll per partition before mini-batch splitting, counting only records that pass shouldProcessRecord", |
- Fix NPE in testConsumedBytesSinceLastSyncTracking by adding versionedIngestionStats, storeName, and versionNumber to mock setup - Add POLL_RESULT_SIZE to IngestionOtelMetricEntityTest expected defs - Update ServerMetricEntityTest count from 139 to 140 - Eliminate extra loop in batch path by counting during batch building - Update OTel metric description to reflect both batch and non-batch path coverage Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Problem Statement
The existing
batch_processing_request_sizemetric is recorded per mini-batch after splitting in the leader AA/WC path. This makes it impossible to distinguish genuinely small polls from tail batches of larger polls, obscuring the true per-poll record count.Additionally, poll size was only tracked in the batch processing path (
produceToStoreBufferServiceOrKafkaInBatch), meaning any ingestion that goes through the non-batch path (e.g., non-AA/WC stores, VT consumption, or when parallel processing is disabled) had no poll size visibility at all.Solution
poll_result_sizemetric (Tehuti + OTel) recorded before mini-batch splitting to capture the true per-poll record count per partition.produceToStoreBufferServiceOrKafkaInBatch) and the non-batch path (produceToStoreBufferServiceOrKafka), so poll sizes are captured regardless of whether AA/WC parallel processing is enabled.shouldProcessRecordare counted, consistent across both paths.Code changes
StoreIngestionTask.java: Addpoll_result_sizerecording in bothproduceToStoreBufferServiceOrKafka(non-batch) andproduceToStoreBufferServiceOrKafkaInBatch(batch) paths.IngestionStats.java: AddpollResultSizeSensor(Tehuti).AggVersionedIngestionStats.java: AddrecordPollResultSizedelegation to both Tehuti and OTel stats.IngestionOtelStats.java/IngestionOtelMetricEntity.java: AddPOLL_RESULT_SIZEOTel metric entity and recording method.NoOpIngestionOtelStats.java: Add no-op override.Added new code behind a config. No new config — metric is always emitted.
Introduced new log lines. No new log lines.
Concurrency-Specific Checks
How was this PR tested?
testPollResultSizeRecordedInNonBatchPath— verifies metric is recorded with correct count (3 records) in the non-batch path.testPollResultSizeNotRecordedWhenAllRecordsFiltered— verifies metric is NOT recorded when all records are filtered byshouldProcessRecord.testPollResultSizeRecordedInBatchPath— verifies metric is recorded with correct count (5 records) in the AA/WC batch path.Does this PR introduce any user-facing or breaking changes?