Skip to content

feat(kafka-clients): handle readonly record headers on context injection through producer wrapper#17530

Open
wpessers wants to merge 2 commits intoopen-telemetry:mainfrom
wpessers:feat/kafka-clients/wrapper-handle-read-only-headers
Open

feat(kafka-clients): handle readonly record headers on context injection through producer wrapper#17530
wpessers wants to merge 2 commits intoopen-telemetry:mainfrom
wpessers:feat/kafka-clients/wrapper-handle-read-only-headers

Conversation

@wpessers
Copy link
Copy Markdown
Member

@wpessers wpessers commented Apr 4, 2026

After #17231 when looking at the code again I noticed some additional exception handling in the interception flow. Talking about the interception flow that has the try/catch around propagator.inject() at:

if (producerPropagationEnabled) {
try {
propagator.inject(context, record.headers(), SETTER);
} catch (Throwable t) {
// it can happen if headers are read only (when record is sent second time)
logger.log(WARNING, "failed to inject span context. sending record second time?", t);
}
}

I found that, as the comment and log statement there allude to, this is for when a record's headers have already been marked readonly (which happens after serialization through KafkaProducer.doSend()). So this try/catch should be there, in case of retries for example, to gracefully handle the IllegalStateException that is thrown when trying to inject headers into a readonly RecordHeaders instance.

Mea culpa for missing this on the previous PR.

@wpessers wpessers requested a review from a team as a code owner April 4, 2026 22:17
@trask trask added this to the v2.27.0 milestone Apr 5, 2026
@Test
@SuppressWarnings({"unchecked"})
void testProducerHandlesReadOnlyHeaders() {
Producer<String, String> producer =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively could use a mock, but this is fine too

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it may be more readable with a mock, I just went with this approach because it mirrors the setup of the existing tests in this same file. Your call though, I'd happily change it to use a mock if preferred!

@wpessers
Copy link
Copy Markdown
Member Author

wpessers commented Apr 7, 2026

@laurit while working on implementing your suggestion I dove a bit deeper into some other parts and realized some error handling we might want to add.

For the producer wrapper, at:

Context context = producerInstrumenter.start(parentContext, request);
if (producerPropagationEnabled) {
propagator.inject(context, record.headers(), SETTER);
}
try (Scope ignored = context.makeCurrent()) {
return sendFn.apply(record, new ProducerCallback(callback, parentContext, context, request));
}

producerInstrumenter.start() starts a span. That span will get ended in ProducerCallback.onCompletion(). If sendFn.apply() throws synchronously (which is well documented that it can happen at https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1052-L1056), the callback is never invoked, meaning the span is never ended.

So I'm thinking we may want to add some additional error handling to explicitly end the span in case of failure so that the span is always properly ended. WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants