Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,18 @@

package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;

import static java.util.logging.Level.WARNING;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaHeadersSetter;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaPropagation;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;

/**
* Helper for producer-side instrumentation.
Expand All @@ -30,10 +25,6 @@
* at any time.
*/
public class KafkaProducerTelemetry {
private static final Logger logger = Logger.getLogger(KafkaProducerTelemetry.class.getName());

private static final TextMapSetter<Headers> SETTER = KafkaHeadersSetter.INSTANCE;

private final TextMapPropagator propagator;
private final Instrumenter<KafkaProducerRequest, RecordMetadata> producerInstrumenter;
private final boolean producerPropagationEnabled;
Expand All @@ -52,24 +43,24 @@ public KafkaProducerTelemetry(
*
* @param record the producer record to inject span info.
*/
public <K, V> void buildAndInjectSpan(ProducerRecord<K, V> record, String clientId) {
public <K, V> ProducerRecord<K, V> buildAndInjectSpan(
ProducerRecord<K, V> record, String clientId) {
Context parentContext = Context.current();

KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
if (!producerInstrumenter.shouldStart(parentContext, request)) {
return;
return record;
}

Context context = producerInstrumenter.start(parentContext, request);
if (producerPropagationEnabled) {
try {
propagator.inject(context, record.headers(), SETTER);
} catch (Throwable t) {
// it can happen if headers are read only (when record is sent second time)
logger.log(WARNING, "failed to inject span context. sending record second time?", t);
try {
if (producerPropagationEnabled) {
record = KafkaPropagation.propagateContext(propagator, context, record);
}
} finally {
producerInstrumenter.end(context, request, null, null);
}
producerInstrumenter.end(context, request, null, null);
return record;
}

/**
Expand All @@ -93,7 +84,7 @@ public <K, V> Future<RecordMetadata> buildAndInjectSpan(

Context context = producerInstrumenter.start(parentContext, request);
if (producerPropagationEnabled) {
propagator.inject(context, record.headers(), SETTER);
record = KafkaPropagation.propagateContext(propagator, context, record);
}

try (Scope ignored = context.makeCurrent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class OpenTelemetryProducerInterceptor<K, V> implements ProducerIntercept
@CanIgnoreReturnValue
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
if (producerTelemetry != null) {
producerTelemetry.buildAndInjectSpan(producerRecord, clientId);
return producerTelemetry.buildAndInjectSpan(producerRecord, clientId);
}
return producerRecord;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@

package io.opentelemetry.instrumentation.kafkaclients.v2_6;

import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand Down Expand Up @@ -57,4 +61,29 @@ void testProducerExceptionPropagatesToCaller() {
.isInstanceOf(IllegalStateException.class)
.hasMessage("can't invoke");
}

@Test
@SuppressWarnings({"unchecked"})
void testProducerHandlesReadOnlyHeaders() {
Producer<String, String> producer =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively could use a mock, but this is fine too

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it may be more readable with a mock, I just went with this approach because it mirrors the setup of the existing tests in this same file. Your call though, I'd happily change it to use a mock if preferred!

(Producer<String, String>)
Proxy.newProxyInstance(
ExceptionHandlingTest.class.getClassLoader(),
new Class<?>[] {Producer.class},
(proxy, method, args) -> {
if ("send".equals(method.getName())) {
return CompletableFuture.completedFuture(null);
}
throw new IllegalStateException("can't invoke");
});
KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build();
Producer<String, String> wrappedProducer = telemetry.wrap(producer);

ProducerRecord<String, String> record =
new ProducerRecord<>(
"test-topic", null, null, "test-key", "test-value", new RecordHeaders());
((RecordHeaders) record.headers()).setReadOnly();
assertThatNoException()
.isThrownBy(() -> testing.runWithSpan("parent", () -> wrappedProducer.send(record)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -50,8 +51,14 @@ private static boolean hasMaxUsableProduceMagic() {

public static <K, V> ProducerRecord<K, V> propagateContext(
Context context, ProducerRecord<K, V> record) {
return propagateContext(
GlobalOpenTelemetry.getPropagators().getTextMapPropagator(), context, record);
}

public static <K, V> ProducerRecord<K, V> propagateContext(
TextMapPropagator propagator, Context context, ProducerRecord<K, V> record) {
try {
inject(context, record);
inject(propagator, context, record);
} catch (IllegalStateException e) {
// headers must be read-only from reused record. try again with new one.
record =
Expand All @@ -63,15 +70,14 @@ record =
record.value(),
record.headers());

inject(context, record);
inject(propagator, context, record);
}
return record;
}

private static <K, V> void inject(Context context, ProducerRecord<K, V> record) {
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(context, record.headers(), SETTER);
private static <K, V> void inject(
TextMapPropagator propagator, Context context, ProducerRecord<K, V> record) {
propagator.inject(context, record.headers(), SETTER);
}

private KafkaPropagation() {}
Expand Down
Loading