Skip to content

Commit 448a40f

Browse files
committed
fix(confluent-kafka): skip recv spans when poll/consume returns no messages
Matches Java instrumentation behavior: only create a recv span when a message is actually received, avoiding empty spans on idle polls. Fixes #3590
1 parent e1fbc0d commit 448a40f

File tree

2 files changed

+54
-21
lines changed

2 files changed

+54
-21
lines changed

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -383,11 +383,11 @@ def wrap_poll(func, instance, tracer, args, kwargs):
383383
if instance._current_consume_span:
384384
_end_current_consume_span(instance)
385385

386-
with tracer.start_as_current_span(
387-
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
388-
):
389-
record = func(*args, **kwargs)
390-
if record:
386+
record = func(*args, **kwargs)
387+
if record:
388+
with tracer.start_as_current_span(
389+
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
390+
):
391391
_create_new_consume_span(instance, tracer, [record])
392392
_enrich_span(
393393
instance._current_consume_span,
@@ -396,9 +396,9 @@ def wrap_poll(func, instance, tracer, args, kwargs):
396396
record.offset(),
397397
operation=MessagingOperationTypeValues.PROCESS,
398398
)
399-
instance._current_context_token = context.attach(
400-
trace.set_span_in_context(instance._current_consume_span)
401-
)
399+
instance._current_context_token = context.attach(
400+
trace.set_span_in_context(instance._current_consume_span)
401+
)
402402

403403
return record
404404

@@ -407,21 +407,20 @@ def wrap_consume(func, instance, tracer, args, kwargs):
407407
if instance._current_consume_span:
408408
_end_current_consume_span(instance)
409409

410-
with tracer.start_as_current_span(
411-
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
412-
):
413-
records = func(*args, **kwargs)
414-
if len(records) > 0:
410+
records = func(*args, **kwargs)
411+
if len(records) > 0:
412+
with tracer.start_as_current_span(
413+
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
414+
):
415415
_create_new_consume_span(instance, tracer, records)
416416
_enrich_span(
417417
instance._current_consume_span,
418418
records[0].topic(),
419419
operation=MessagingOperationTypeValues.PROCESS,
420420
)
421-
422-
instance._current_context_token = context.attach(
423-
trace.set_span_in_context(instance._current_consume_span)
424-
)
421+
instance._current_context_token = context.attach(
422+
trace.set_span_in_context(instance._current_consume_span)
423+
)
425424

426425
return records
427426

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,6 @@ def test_poll(self) -> None:
197197
MESSAGING_MESSAGE_ID: "topic-30.1.3",
198198
},
199199
},
200-
{"name": "recv", "attributes": {}},
201200
]
202201

203202
consumer = MockConsumer(
@@ -213,7 +212,7 @@ def test_poll(self) -> None:
213212
consumer.poll()
214213
consumer.poll()
215214
consumer.poll()
216-
consumer.poll()
215+
consumer.poll() # empty poll — must not produce a span
217216

218217
span_list = self.memory_exporter.get_finished_spans()
219218
self._compare_spans(span_list, expected_spans)
@@ -259,7 +258,6 @@ def test_consume(self) -> None:
259258
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
260259
},
261260
},
262-
{"name": "recv", "attributes": {}},
263261
]
264262

265263
consumer = MockConsumer(
@@ -276,10 +274,46 @@ def test_consume(self) -> None:
276274
consumer.consume(3)
277275
consumer.consume(1)
278276
consumer.consume(2)
279-
consumer.consume(1)
277+
consumer.consume(1) # empty consume — must not produce a span
280278
span_list = self.memory_exporter.get_finished_spans()
281279
self._compare_spans(span_list, expected_spans)
282280

281+
def test_poll_empty_does_not_create_span(self) -> None:
282+
instrumentation = ConfluentKafkaInstrumentor()
283+
consumer = MockConsumer(
284+
[],
285+
{
286+
"bootstrap.servers": "localhost:29092",
287+
"group.id": "mygroup",
288+
"auto.offset.reset": "earliest",
289+
},
290+
)
291+
self.memory_exporter.clear()
292+
consumer = instrumentation.instrument_consumer(consumer)
293+
consumer.poll()
294+
consumer.poll()
295+
296+
span_list = self.memory_exporter.get_finished_spans()
297+
self.assertEqual(len(span_list), 0)
298+
299+
def test_consume_empty_does_not_create_span(self) -> None:
300+
instrumentation = ConfluentKafkaInstrumentor()
301+
consumer = MockConsumer(
302+
[],
303+
{
304+
"bootstrap.servers": "localhost:29092",
305+
"group.id": "mygroup",
306+
"auto.offset.reset": "earliest",
307+
},
308+
)
309+
self.memory_exporter.clear()
310+
consumer = instrumentation.instrument_consumer(consumer)
311+
consumer.consume(5)
312+
consumer.consume(5)
313+
314+
span_list = self.memory_exporter.get_finished_spans()
315+
self.assertEqual(len(span_list), 0)
316+
283317
def test_close(self) -> None:
284318
instrumentation = ConfluentKafkaInstrumentor()
285319
mocked_messages = [

0 commit comments

Comments
 (0)