Skip to content

Commit f170367

Browse files
allen-k1mocelotl
authored andcommitted
Bugfix/check future cancelled (open-telemetry#2461)
* Calling the exception() method when future is in the cancelled state is causing a CancelledError Calling the exception() method when future is in the cancelled state is causing a CancelledError. we should check the cancelled state first and call f.exception() only if it's not cancelled. * modify lint * modify lint * Update CHANGELOG.md * remove init() * add future cancelled test code * add future cancelled test code * add future cancelled test code * add future cancelled test code * add future cancelled test code * add future cancelled test code * lint * lint * remove if condition * modify test code * lint * lint * remove pytest --------- Co-authored-by: Diego Hurtado <[email protected]>
1 parent 0c4a7bd commit f170367

File tree

3 files changed

+71
-15
lines changed

3 files changed

+71
-15
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4747
([#2418](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2418))
4848
- Use sqlalchemy version in sqlalchemy commenter instead of opentelemetry library version
4949
([#2404](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2404))
50+
- `opentelemetry-instrumentation-asyncio` Check for cancelledException in the future
51+
([#2461](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2461))
5052
- Remove SDK dependency from opentelemetry-instrumentation-grpc
5153
([#2474](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2474))
5254

instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py

+9-15
Original file line numberDiff line numberDiff line change
@@ -116,21 +116,11 @@ class AsyncioInstrumentor(BaseInstrumentor):
116116
"run_coroutine_threadsafe",
117117
]
118118

119-
def __init__(self):
120-
super().__init__()
121-
self.process_duration_histogram = None
122-
self.process_created_counter = None
123-
124-
self._tracer = None
125-
self._meter = None
126-
self._coros_name_to_trace: set = set()
127-
self._to_thread_name_to_trace: set = set()
128-
self._future_active_enabled: bool = False
129-
130119
def instrumentation_dependencies(self) -> Collection[str]:
131120
return _instruments
132121

133122
def _instrument(self, **kwargs):
123+
# pylint: disable=attribute-defined-outside-init
134124
self._tracer = get_tracer(
135125
__name__, __version__, kwargs.get("tracer_provider")
136126
)
@@ -307,13 +297,17 @@ def trace_future(self, future):
307297
)
308298

309299
def callback(f):
310-
exception = f.exception()
311300
attr = {
312301
"type": "future",
302+
"state": (
303+
"cancelled"
304+
if f.cancelled()
305+
else determine_state(f.exception())
306+
),
313307
}
314-
state = determine_state(exception)
315-
attr["state"] = state
316-
self.record_process(start, attr, span, exception)
308+
self.record_process(
309+
start, attr, span, None if f.cancelled() else f.exception()
310+
)
317311

318312
future.add_done_callback(callback)
319313
return future
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import asyncio
2+
from unittest.mock import patch
3+
4+
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
5+
from opentelemetry.instrumentation.asyncio.environment_variables import (
6+
OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED,
7+
)
8+
from opentelemetry.test.test_base import TestBase
9+
from opentelemetry.trace import get_tracer
10+
11+
12+
class TestTraceFuture(TestBase):
13+
@patch.dict(
14+
"os.environ", {OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED: "true"}
15+
)
16+
def setUp(self):
17+
super().setUp()
18+
self._tracer = get_tracer(
19+
__name__,
20+
)
21+
self.instrumentor = AsyncioInstrumentor()
22+
self.instrumentor.instrument()
23+
24+
def tearDown(self):
25+
super().tearDown()
26+
self.instrumentor.uninstrument()
27+
28+
def test_trace_future_cancelled(self):
29+
async def future_cancelled():
30+
with self._tracer.start_as_current_span("root"):
31+
future = asyncio.Future()
32+
future = self.instrumentor.trace_future(future)
33+
future.cancel()
34+
35+
try:
36+
asyncio.run(future_cancelled())
37+
except asyncio.CancelledError as exc:
38+
self.assertEqual(isinstance(exc, asyncio.CancelledError), True)
39+
spans = self.memory_exporter.get_finished_spans()
40+
self.assertEqual(len(spans), 2)
41+
self.assertEqual(spans[0].name, "root")
42+
self.assertEqual(spans[1].name, "asyncio future")
43+
44+
metrics = (
45+
self.memory_metrics_reader.get_metrics_data()
46+
.resource_metrics[0]
47+
.scope_metrics[0]
48+
.metrics
49+
)
50+
self.assertEqual(len(metrics), 2)
51+
52+
self.assertEqual(metrics[0].name, "asyncio.process.duration")
53+
self.assertEqual(
54+
metrics[0].data.data_points[0].attributes["state"], "cancelled"
55+
)
56+
57+
self.assertEqual(metrics[1].name, "asyncio.process.created")
58+
self.assertEqual(
59+
metrics[1].data.data_points[0].attributes["state"], "cancelled"
60+
)

0 commit comments

Comments
 (0)