Skip to content

Commit 3449a3d

Browse files
committed
Add BaggageLogProcessor and address review comments
1 parent 195bcdb commit 3449a3d

File tree

5 files changed

+226
-3
lines changed

5 files changed

+226
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
## Unreleased
1313

1414
### Added
15+
- Add `BaggageLogProcessor` to `opentelemetry-processor-baggage`
16+
([#4062](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/4062))
1517

1618
- `opentelemetry-instrumentation-confluent-kafka`: Loosen confluent-kafka upper bound to <3.0.0
1719
([#4289](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4289))

processor/opentelemetry-processor-baggage/README.rst

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,41 @@ For example, to only copy baggage entries that match the regex `^key.+`:
6565
regex_predicate = lambda baggage_key: baggage_key.startswith("^key.+")
6666
tracer_provider.add_span_processor(BaggageSpanProcessor(regex_predicate))
6767

68+
BaggageLogProcessor
69+
-------------------
70+
71+
The BaggageLogProcessor reads entries stored in Baggage
72+
from the current context and adds the baggage entries' keys and
73+
values to the log record as attributes on emit.
74+
75+
Add this log processor to a logger provider.
76+
77+
To configure the log processor to copy all baggage entries:
78+
79+
::
80+
81+
from opentelemetry.processor.baggage import BaggageLogProcessor, ALLOW_ALL_BAGGAGE_KEYS
82+
83+
logger_provider = LoggerProvider()
84+
logger_provider.add_log_record_processor(BaggageLogProcessor(ALLOW_ALL_BAGGAGE_KEYS))
85+
86+
87+
Alternatively, you can provide a custom baggage key predicate to select which baggage keys you want to copy.
88+
89+
For example, to only copy baggage entries that start with `my-key`:
90+
91+
::
92+
93+
starts_with_predicate = lambda baggage_key: baggage_key.startswith("my-key")
94+
logger_provider.add_log_record_processor(BaggageLogProcessor(starts_with_predicate))
95+
96+
97+
For example, to only copy baggage entries that match the regex `^key.+`:
98+
99+
::
100+
101+
regex_predicate = lambda baggage_key: re.match(r"^key.+", baggage_key) is not None
102+
logger_provider.add_log_record_processor(BaggageLogProcessor(regex_predicate))
68103

69104
References
70105
----------

processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@
1313
# limitations under the License.
1414

1515
# pylint: disable=import-error
16-
1716
from .processor import ALLOW_ALL_BAGGAGE_KEYS, BaggageSpanProcessor
17+
from .log_processor import BaggageLogProcessor
1818
from .version import __version__
19-
20-
__all__ = ["ALLOW_ALL_BAGGAGE_KEYS", "BaggageSpanProcessor", "__version__"]
19+
__all__ = ["ALLOW_ALL_BAGGAGE_KEYS", "BaggageSpanProcessor", "BaggageLogProcessor", "__version__"]
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Callable, Optional, Sequence, Union
16+
17+
from opentelemetry.baggage import get_all as get_all_baggage
18+
from opentelemetry.sdk._logs import LogRecordProcessor, ReadWriteLogRecord
19+
20+
from opentelemetry.processor.baggage.processor import BaggageKeyPredicateT
21+
22+
BaggageKeyPredicatesT = Union[BaggageKeyPredicateT, Sequence[BaggageKeyPredicateT]]
23+
24+
25+
class BaggageLogProcessor(LogRecordProcessor):
26+
"""
27+
The BaggageLogProcessor reads entries stored in Baggage
28+
from the current context and adds the baggage entries' keys and
29+
values to the log record as attributes on emit.
30+
31+
Add this log processor to a logger provider.
32+
33+
⚠ Warning ⚠️
34+
35+
Do not put sensitive information in Baggage.
36+
37+
To repeat: a consequence of adding data to Baggage is that the keys and
38+
values will appear in all outgoing HTTP headers from the application.
39+
"""
40+
41+
def __init__(
42+
self,
43+
baggage_key_predicate: BaggageKeyPredicatesT,
44+
max_baggage_attributes: Optional[int] = None,
45+
) -> None:
46+
if callable(baggage_key_predicate):
47+
self._predicates = [baggage_key_predicate]
48+
else:
49+
self._predicates = list(baggage_key_predicate)
50+
self._max_baggage_attributes = max_baggage_attributes
51+
52+
def _matches(self, key: str) -> bool:
53+
return any(predicate(key) for predicate in self._predicates)
54+
55+
def on_emit(self, log_record: ReadWriteLogRecord) -> None:
56+
baggage = get_all_baggage()
57+
count = 0
58+
for key, value in baggage.items():
59+
if self._max_baggage_attributes is not None and count >= self._max_baggage_attributes:
60+
break
61+
if self._matches(key):
62+
if key not in log_record.log_record.attributes:
63+
log_record.log_record.attributes[key] = value
64+
count += 1
65+
66+
def shutdown(self) -> None:
67+
pass
68+
69+
def force_flush(self, timeout_millis: int = 30000) -> bool:
70+
return True
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import re
16+
import unittest
17+
18+
from opentelemetry.baggage import set_baggage
19+
from opentelemetry.context import attach, detach
20+
from opentelemetry.processor.baggage import (
21+
ALLOW_ALL_BAGGAGE_KEYS,
22+
BaggageLogProcessor,
23+
)
24+
from opentelemetry.sdk._logs import LoggerProvider, LogRecordProcessor
25+
from opentelemetry.sdk._logs.export import (
26+
InMemoryLogRecordExporter,
27+
BatchLogRecordProcessor,
28+
)
29+
30+
31+
class BaggageLogProcessorTest(unittest.TestCase):
32+
def setUp(self):
33+
self.exporter = InMemoryLogRecordExporter()
34+
self.logger_provider = LoggerProvider()
35+
self.logger_provider.add_log_record_processor(
36+
BaggageLogProcessor(ALLOW_ALL_BAGGAGE_KEYS)
37+
)
38+
self.logger_provider.add_log_record_processor(
39+
BatchLogRecordProcessor(self.exporter)
40+
)
41+
self.logger = self.logger_provider.get_logger("test-logger")
42+
43+
def _get_attributes(self):
44+
self.logger_provider.force_flush()
45+
logs = self.exporter.get_finished_logs()
46+
self.assertTrue(len(logs) > 0)
47+
return logs[-1].log_record.attributes
48+
49+
def test_check_the_baggage(self):
50+
self.assertIsInstance(
51+
BaggageLogProcessor(ALLOW_ALL_BAGGAGE_KEYS), LogRecordProcessor
52+
)
53+
54+
def test_baggage_added_to_log_record(self):
55+
token = attach(set_baggage("queen", "bee"))
56+
self.logger.emit(None)
57+
attributes = self._get_attributes()
58+
self.assertEqual(attributes.get("queen"), "bee")
59+
detach(token)
60+
61+
def test_baggage_with_prefix(self):
62+
token = attach(set_baggage("queen", "bee"))
63+
logger_provider = LoggerProvider()
64+
logger_provider.add_log_record_processor(
65+
BaggageLogProcessor(lambda key: key.startswith("que"))
66+
)
67+
exporter = InMemoryLogRecordExporter()
68+
logger_provider.add_log_record_processor(
69+
BatchLogRecordProcessor(exporter)
70+
)
71+
logger = logger_provider.get_logger("test-logger")
72+
logger.emit(None)
73+
logger_provider.force_flush()
74+
logs = exporter.get_finished_logs()
75+
attributes = logs[-1].log_record.attributes
76+
self.assertEqual(attributes.get("queen"), "bee")
77+
detach(token)
78+
79+
def test_baggage_with_regex(self):
80+
token = attach(set_baggage("queen", "bee"))
81+
logger_provider = LoggerProvider()
82+
logger_provider.add_log_record_processor(
83+
BaggageLogProcessor(
84+
lambda key: re.match(r"que.*", key) is not None
85+
)
86+
)
87+
exporter = InMemoryLogRecordExporter()
88+
logger_provider.add_log_record_processor(
89+
BatchLogRecordProcessor(exporter)
90+
)
91+
logger = logger_provider.get_logger("test-logger")
92+
logger.emit(None)
93+
logger_provider.force_flush()
94+
logs = exporter.get_finished_logs()
95+
attributes = logs[-1].log_record.attributes
96+
self.assertEqual(attributes.get("queen"), "bee")
97+
detach(token)
98+
99+
def test_no_baggage_not_added(self):
100+
self.logger.emit(None)
101+
self.logger_provider.force_flush()
102+
logs = self.exporter.get_finished_logs()
103+
self.assertTrue(len(logs) > 0)
104+
attributes = logs[-1].log_record.attributes
105+
self.assertNotIn("queen", attributes)
106+
107+
@staticmethod
108+
def has_prefix(baggage_key: str) -> bool:
109+
return baggage_key.startswith("que")
110+
111+
@staticmethod
112+
def matches_regex(baggage_key: str) -> bool:
113+
return re.match(r"que.*", baggage_key) is not None
114+
115+
116+
if __name__ == "__main__":
117+
unittest.main()

0 commit comments

Comments
 (0)