Skip to content

Commit 70f2b8c

Browse files
sterchelenemdneto
andauthored
fix(confluent-kafka): skip recv spans when poll/consume returns no messages (#4349)
Matches Java instrumentation behavior: only create a recv span when a message is actually received, avoiding empty spans on idle polls. Fixes #3590 Co-authored-by: Emídio Neto <9735060+emdneto@users.noreply.github.com>
1 parent a983382 commit 70f2b8c

File tree

3 files changed

+96
-21
lines changed

3 files changed

+96
-21
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2626

2727
- `opentelemetry-docker-tests`: Replace deprecated `SpanAttributes` from `opentelemetry.semconv.trace` with `opentelemetry.semconv._incubating.attributes`
2828
([#4339](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4339))
29+
- `opentelemetry-instrumentation-confluent-kafka`: Skip `recv` span creation when `poll()` returns no message or `consume()` returns an empty list, avoiding empty spans on idle polls
30+
([#4349](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4349))
2931
- Fix intermittent `Core Contrib Test` CI failures caused by GitHub git CDN SHA propagation lag by installing core packages from the already-checked-out local copy instead of a second git clone
3032
([#4305](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4305))
3133
- Don't import module in unwrap if not already imported

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -383,11 +383,11 @@ def wrap_poll(func, instance, tracer, args, kwargs):
383383
if instance._current_consume_span:
384384
_end_current_consume_span(instance)
385385

386-
with tracer.start_as_current_span(
387-
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
388-
):
389-
record = func(*args, **kwargs)
390-
if record:
386+
record = func(*args, **kwargs)
387+
if record:
388+
with tracer.start_as_current_span(
389+
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
390+
):
391391
_create_new_consume_span(instance, tracer, [record])
392392
_enrich_span(
393393
instance._current_consume_span,
@@ -396,9 +396,9 @@ def wrap_poll(func, instance, tracer, args, kwargs):
396396
record.offset(),
397397
operation=MessagingOperationTypeValues.PROCESS,
398398
)
399-
instance._current_context_token = context.attach(
400-
trace.set_span_in_context(instance._current_consume_span)
401-
)
399+
instance._current_context_token = context.attach(
400+
trace.set_span_in_context(instance._current_consume_span)
401+
)
402402

403403
return record
404404

@@ -407,21 +407,20 @@ def wrap_consume(func, instance, tracer, args, kwargs):
407407
if instance._current_consume_span:
408408
_end_current_consume_span(instance)
409409

410-
with tracer.start_as_current_span(
411-
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
412-
):
413-
records = func(*args, **kwargs)
414-
if len(records) > 0:
410+
records = func(*args, **kwargs)
411+
if len(records) > 0:
412+
with tracer.start_as_current_span(
413+
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
414+
):
415415
_create_new_consume_span(instance, tracer, records)
416416
_enrich_span(
417417
instance._current_consume_span,
418418
records[0].topic(),
419419
operation=MessagingOperationTypeValues.PROCESS,
420420
)
421-
422-
instance._current_context_token = context.attach(
423-
trace.set_span_in_context(instance._current_consume_span)
424-
)
421+
instance._current_context_token = context.attach(
422+
trace.set_span_in_context(instance._current_consume_span)
423+
)
425424

426425
return records
427426

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,6 @@ def test_poll(self) -> None:
197197
MESSAGING_MESSAGE_ID: "topic-30.1.3",
198198
},
199199
},
200-
{"name": "recv", "attributes": {}},
201200
]
202201

203202
consumer = MockConsumer(
@@ -213,7 +212,7 @@ def test_poll(self) -> None:
213212
consumer.poll()
214213
consumer.poll()
215214
consumer.poll()
216-
consumer.poll()
215+
consumer.poll() # empty poll — must not produce a span
217216

218217
span_list = self.memory_exporter.get_finished_spans()
219218
self._compare_spans(span_list, expected_spans)
@@ -259,7 +258,6 @@ def test_consume(self) -> None:
259258
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
260259
},
261260
},
262-
{"name": "recv", "attributes": {}},
263261
]
264262

265263
consumer = MockConsumer(
@@ -276,10 +274,86 @@ def test_consume(self) -> None:
276274
consumer.consume(3)
277275
consumer.consume(1)
278276
consumer.consume(2)
279-
consumer.consume(1)
277+
consumer.consume(1) # empty consume — must not produce a span
280278
span_list = self.memory_exporter.get_finished_spans()
281279
self._compare_spans(span_list, expected_spans)
282280

281+
def test_poll_empty_does_not_create_span(self) -> None:
282+
instrumentation = ConfluentKafkaInstrumentor()
283+
consumer = MockConsumer(
284+
[],
285+
{
286+
"bootstrap.servers": "localhost:29092",
287+
"group.id": "mygroup",
288+
"auto.offset.reset": "earliest",
289+
},
290+
)
291+
self.memory_exporter.clear()
292+
consumer = instrumentation.instrument_consumer(consumer)
293+
consumer.poll()
294+
consumer.poll()
295+
296+
span_list = self.memory_exporter.get_finished_spans()
297+
self.assertEqual(len(span_list), 0)
298+
299+
def test_consume_empty_does_not_create_span(self) -> None:
300+
instrumentation = ConfluentKafkaInstrumentor()
301+
consumer = MockConsumer(
302+
[],
303+
{
304+
"bootstrap.servers": "localhost:29092",
305+
"group.id": "mygroup",
306+
"auto.offset.reset": "earliest",
307+
},
308+
)
309+
self.memory_exporter.clear()
310+
consumer = instrumentation.instrument_consumer(consumer)
311+
consumer.consume(5)
312+
consumer.consume(5)
313+
314+
span_list = self.memory_exporter.get_finished_spans()
315+
self.assertEqual(len(span_list), 0)
316+
317+
def test_poll_empty_cleans_up_previous_span_and_token(self) -> None:
318+
instrumentation = ConfluentKafkaInstrumentor()
319+
consumer = MockConsumer(
320+
[MockedMessage("topic-1", 0, 0, [])],
321+
{
322+
"bootstrap.servers": "localhost:29092",
323+
"group.id": "mygroup",
324+
"auto.offset.reset": "earliest",
325+
},
326+
)
327+
consumer = instrumentation.instrument_consumer(consumer)
328+
consumer.poll() # non-empty: sets _current_consume_span and _current_context_token
329+
self.assertIsNotNone(consumer._current_consume_span)
330+
self.assertIsNotNone(consumer._current_context_token)
331+
332+
consumer.poll() # empty: should clean up both
333+
self.assertIsNone(consumer._current_consume_span)
334+
self.assertIsNone(consumer._current_context_token)
335+
336+
def test_consume_empty_cleans_up_previous_span_and_token(self) -> None:
337+
instrumentation = ConfluentKafkaInstrumentor()
338+
consumer = MockConsumer(
339+
[MockedMessage("topic-1", 0, 0, [])],
340+
{
341+
"bootstrap.servers": "localhost:29092",
342+
"group.id": "mygroup",
343+
"auto.offset.reset": "earliest",
344+
},
345+
)
346+
consumer = instrumentation.instrument_consumer(consumer)
347+
consumer.consume(
348+
1
349+
) # non-empty: sets _current_consume_span and _current_context_token
350+
self.assertIsNotNone(consumer._current_consume_span)
351+
self.assertIsNotNone(consumer._current_context_token)
352+
353+
consumer.consume(1) # empty: should clean up both
354+
self.assertIsNone(consumer._current_consume_span)
355+
self.assertIsNone(consumer._current_context_token)
356+
283357
def test_close(self) -> None:
284358
instrumentation = ConfluentKafkaInstrumentor()
285359
mocked_messages = [

0 commit comments

Comments
 (0)