Skip to content

Commit 41792e7

Browse files
authored
Add confluent kafka producer poll and flush returns (#2527)
1 parent 7bddbb5 commit 41792e7

File tree

4 files changed

+82
-5
lines changed

4 files changed

+82
-5
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Breaking changes
1111

12+
- Add return statement to Confluent kafka Producer poll() and flush() calls when instrumented by ConfluentKafkaInstrumentor().instrument_producer() ([#2527](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2527))
1213
- Rename `type` attribute to `asgi.event.type` in `opentelemetry-instrumentation-asgi`
1314
([#2300](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2300))
1415
- Rename AwsLambdaInstrumentor span attributes `faas.id` to `cloud.resource_id`, `faas.execution` to `faas.invocation_id`

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,10 @@ def __init__(self, producer: Producer, tracer: Tracer):
151151
self._tracer = tracer
152152

153153
def flush(self, timeout=-1):
154-
self._producer.flush(timeout)
154+
return self._producer.flush(timeout)
155155

156156
def poll(self, timeout=-1):
157-
self._producer.poll(timeout)
157+
return self._producer.poll(timeout)
158158

159159
def produce(
160160
self, topic, value=None, *args, **kwargs

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

+33-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
)
3232
from opentelemetry.test.test_base import TestBase
3333

34-
from .utils import MockConsumer, MockedMessage
34+
from .utils import MockConsumer, MockedMessage, MockedProducer
3535

3636

3737
class TestConfluentKafka(TestBase):
@@ -246,3 +246,35 @@ def _compare_spans(self, spans, expected_spans):
246246
self.assertEqual(
247247
expected_attribute_value, span.attributes[attribute_key]
248248
)
249+
250+
def test_producer_poll(self) -> None:
251+
instrumentation = ConfluentKafkaInstrumentor()
252+
message_queue = []
253+
254+
producer = MockedProducer(
255+
message_queue,
256+
{
257+
"bootstrap.servers": "localhost:29092",
258+
},
259+
)
260+
261+
producer = instrumentation.instrument_producer(producer)
262+
producer.produce(topic="topic-1", key="key-1", value="value-1")
263+
msg = producer.poll()
264+
self.assertIsNotNone(msg)
265+
266+
def test_producer_flush(self) -> None:
267+
instrumentation = ConfluentKafkaInstrumentor()
268+
message_queue = []
269+
270+
producer = MockedProducer(
271+
message_queue,
272+
{
273+
"bootstrap.servers": "localhost:29092",
274+
},
275+
)
276+
277+
producer = instrumentation.instrument_producer(producer)
278+
producer.produce(topic="topic-1", key="key-1", value="value-1")
279+
msg = producer.flush()
280+
self.assertIsNotNone(msg)

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py

+46-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from confluent_kafka import Consumer
1+
from typing import Optional
2+
3+
from confluent_kafka import Consumer, Producer
24

35

46
class MockConsumer(Consumer):
@@ -20,11 +22,21 @@ def poll(self, timeout=None):
2022

2123

2224
class MockedMessage:
23-
def __init__(self, topic: str, partition: int, offset: int, headers):
25+
def __init__(
26+
self,
27+
topic: str,
28+
partition: int,
29+
offset: int,
30+
headers,
31+
key: Optional[str] = None,
32+
value: Optional[str] = None,
33+
):
2434
self._topic = topic
2535
self._partition = partition
2636
self._offset = offset
2737
self._headers = headers
38+
self._key = key
39+
self._value = value
2840

2941
def topic(self):
3042
return self._topic
@@ -37,3 +49,35 @@ def offset(self):
3749

3850
def headers(self):
3951
return self._headers
52+
53+
def key(self):
54+
return self._key
55+
56+
def value(self):
57+
return self._value
58+
59+
60+
class MockedProducer(Producer):
61+
def __init__(self, queue, config):
62+
self._queue = queue
63+
super().__init__(config)
64+
65+
def produce(
66+
self, *args, **kwargs
67+
): # pylint: disable=keyword-arg-before-vararg
68+
self._queue.append(
69+
MockedMessage(
70+
topic=kwargs.get("topic"),
71+
partition=0,
72+
offset=0,
73+
headers=[],
74+
key=kwargs.get("key"),
75+
value=kwargs.get("value"),
76+
)
77+
)
78+
79+
def poll(self, *args, **kwargs):
80+
return len(self._queue)
81+
82+
def flush(self, *args, **kwargs):
83+
return len(self._queue)

0 commit comments

Comments
 (0)