Skip to content

Commit 3bb587a

Browse files
authored
Merge branch 'main' into fix/aiokafka-tests-fixes
2 parents 40f30a5 + 58b554e commit 3bb587a

File tree

8 files changed

+67
-14
lines changed

8 files changed

+67
-14
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4242
([#4259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4259))
4343
- `opentelemetry-instrumentation-aiokafka`: fix `Unclosed AIOKafkaProducer` warning and `RuntimeWarning: coroutine was never awaited` in tests
4444
([#4384](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4384))
45+
- `opentelemetry-instrumentation-aiokafka`: Fix compatibility with aiokafka 0.13 by calling
46+
`_key_serializer`/`_value_serializer` directly instead of the internal `_serialize` method
47+
whose signature changed in 0.13 from `(topic, key, value)` to `(key, value, headers)`
48+
([#4379](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4379))
4549

4650
### Breaking changes
4751

instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,17 @@ async def _extract_send_partition(
162162
key = _extract_send_key(args, kwargs)
163163
value = _extract_send_value(args, kwargs)
164164
partition = _extract_argument("partition", 3, None, args, kwargs)
165-
key_bytes, value_bytes = cast(
166-
"tuple[bytes | None, bytes | None]",
167-
instance._serialize(topic, key, value), # type: ignore[reportUnknownMemberType]
165+
key_bytes = cast(
166+
"bytes | None",
167+
instance._key_serializer(key) # type: ignore[reportUnknownMemberType]
168+
if instance._key_serializer # type: ignore[reportUnknownMemberType]
169+
else key,
170+
)
171+
value_bytes = cast(
172+
"bytes | None",
173+
instance._value_serializer(value) # type: ignore[reportUnknownMemberType]
174+
if instance._value_serializer # type: ignore[reportUnknownMemberType]
175+
else value,
168176
)
169177
valid_types = (bytes, bytearray, memoryview, type(None))
170178
if (
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
aiokafka==0.11.0
1+
aiokafka==0.13.0; python_version >= "3.10"
2+
aiokafka==0.12.0; python_version < "3.10"
23
pytest==7.4.4
34
-e opentelemetry-instrumentation
45
-e instrumentation/opentelemetry-instrumentation-aiokafka

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414

1515
from __future__ import annotations
1616

17+
import sys
1718
import uuid
1819
from typing import Any, Sequence, cast
19-
from unittest import IsolatedAsyncioTestCase, TestCase, mock
20+
from unittest import IsolatedAsyncioTestCase, TestCase, mock, skipIf
2021

2122
import aiokafka
2223
from aiokafka import (
@@ -63,6 +64,10 @@ def test_instrument_api(self) -> None:
6364
)
6465

6566

67+
@skipIf(
68+
sys.version_info < (3, 10),
69+
"aiokafka >= 0.13 requires Python 3.10+",
70+
)
6671
class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
6772
@staticmethod
6873
def consumer_record_factory(
@@ -126,7 +131,7 @@ async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer:
126131

127132
@staticmethod
128133
async def producer_factory() -> AIOKafkaProducer:
129-
producer = AIOKafkaProducer(api_version="1.0")
134+
producer = AIOKafkaProducer()
130135

131136
producer.client._wait_on_metadata = mock.AsyncMock()
132137
producer.client.bootstrap = mock.AsyncMock()

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,8 @@ async def test_create_consumer_span(
371371

372372
async def test_kafka_properties_extractor(self):
373373
aiokafka_instance_mock = mock.Mock()
374-
aiokafka_instance_mock._serialize.return_value = None, None
374+
aiokafka_instance_mock._key_serializer = None
375+
aiokafka_instance_mock._value_serializer = None
375376
aiokafka_instance_mock._partition.return_value = "partition"
376377
aiokafka_instance_mock.client._wait_on_metadata = mock.AsyncMock()
377378
assert (

util/opentelemetry-util-genai/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1818
([#4320](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4320))
1919
- Add workflow invocation type to genAI utils
2020
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4310](#4310))
21+
- Check if upload works at startup in initializer of the `UploadCompletionHook`, instead
22+
of repeatedly failing on every upload ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4390](#4390)).
2123

2224
## Version 0.3b0 (2026-02-20)
2325

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,30 @@ def __init__(
156156
f"Invalid {upload_format=}. Must be one of {_FORMATS}"
157157
)
158158
self._format = upload_format
159+
self._content_type = (
160+
"application/json"
161+
if self._format == "json"
162+
else "application/jsonl"
163+
)
164+
test_path = posixpath.join(
165+
self._base_path,
166+
f".one_off_test_to_see_if_upload_works.{self._format}",
167+
)
168+
try:
169+
with self._fs.open(
170+
test_path, "w", content_type=self._content_type
171+
) as file:
172+
file.write("\n")
173+
except Exception as exception: # pylint: disable=broad-exception-caught
174+
raise ValueError(
175+
f"Failed to write file to the following path, upload is not working: {test_path}.\n Got error: {exception}"
176+
)
177+
# Try to delete the file.. But we don't explicitly ask people to grant the GCS delete IAM permission in our
178+
# docs, so if delete fails just leave the file..
179+
try:
180+
self._fs.rm_file(test_path) # pyright: ignore[reportUnknownMemberType]
181+
except Exception: # pylint: disable=broad-exception-caught
182+
pass
159183

160184
# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
161185
# limits the number of queued tasks. If the queue is full, data will be dropped.
@@ -271,13 +295,7 @@ def _do_upload(
271295
for message_idx, line in enumerate(message_lines):
272296
line[_MESSAGE_INDEX_KEY] = message_idx
273297

274-
content_type = (
275-
"application/json"
276-
if self._format == "json"
277-
else "application/jsonl"
278-
)
279-
280-
with self._fs.open(path, "w", content_type=content_type) as file:
298+
with self._fs.open(path, "w", content_type=self._content_type) as file:
281299
for message in message_lines:
282300
gen_ai_json_dump(message, file)
283301
file.write("\n")

util/opentelemetry-util-genai/tests/test_upload.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ def setUp(self):
102102
self.hook = UploadCompletionHook(
103103
base_path=BASE_PATH, max_queue_size=MAXSIZE, lru_cache_max_size=5
104104
)
105+
# 1 upload is done when creating the UploadHook to ensure upload works. Reset mock.
106+
self.mock_fs.reset_mock()
105107

106108
def tearDown(self) -> None:
107109
self.hook.shutdown()
@@ -145,6 +147,18 @@ def test_upload_then_shutdown(self):
145147
"should have uploaded 4 files",
146148
)
147149

150+
def test_failed_upload_causes_initializer_to_throw(self):
151+
self.mock_fs.open.side_effect = ValueError("Failed for some reason!")
152+
with self.assertRaisesRegex(
153+
ValueError,
154+
"Failed to write file to the following path, upload is not working:",
155+
):
156+
UploadCompletionHook(
157+
base_path=BASE_PATH,
158+
max_queue_size=MAXSIZE,
159+
lru_cache_max_size=5,
160+
)
161+
148162
def test_lru_cache_works(self):
149163
record = LogRecord()
150164
self.hook.on_completion(

0 commit comments

Comments
 (0)