@@ -94,7 +94,7 @@ def response_hook(span, response):
94
94
from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
95
95
from opentelemetry .instrumentation .utils import unwrap
96
96
from opentelemetry .semconv .trace import SpanAttributes
97
- from opentelemetry .trace import SpanKind , get_tracer
97
+ from opentelemetry .trace import SpanKind , Status , StatusCode , get_tracer
98
98
99
99
from .utils import sanitize_body
100
100
@@ -103,6 +103,7 @@ def response_hook(span, response):
103
103
es_transport_split = elasticsearch .VERSION [0 ] > 7
104
104
if es_transport_split :
105
105
import elastic_transport
106
+ from elastic_transport ._models import DefaultType
106
107
107
108
logger = getLogger (__name__ )
108
109
@@ -173,7 +174,12 @@ def _instrument(self, **kwargs):
173
174
174
175
def _uninstrument (self , ** kwargs ):
175
176
# pylint: disable=no-member
176
- unwrap (elasticsearch .Transport , "perform_request" )
177
+ transport_class = (
178
+ elastic_transport .Transport
179
+ if es_transport_split
180
+ else elasticsearch .Transport
181
+ )
182
+ unwrap (transport_class , "perform_request" )
177
183
178
184
179
185
_regex_doc_url = re .compile (r"/_doc/([^/]+)" )
@@ -234,7 +240,22 @@ def wrapper(wrapped, _, args, kwargs):
234
240
kind = SpanKind .CLIENT ,
235
241
) as span :
236
242
if callable (request_hook ):
237
- request_hook (span , method , url , kwargs )
243
+ # elasticsearch 8 changed the parameters quite a bit
244
+ if es_transport_split :
245
+
246
+ def normalize_kwargs (k , v ):
247
+ if isinstance (v , DefaultType ):
248
+ v = str (v )
249
+ elif isinstance (v , elastic_transport .HttpHeaders ):
250
+ v = dict (v )
251
+ return (k , v )
252
+
253
+ hook_kwargs = dict (
254
+ normalize_kwargs (k , v ) for k , v in kwargs .items ()
255
+ )
256
+ else :
257
+ hook_kwargs = kwargs
258
+ request_hook (span , method , url , hook_kwargs )
238
259
239
260
if span .is_recording ():
240
261
attributes = {
@@ -260,16 +281,41 @@ def wrapper(wrapped, _, args, kwargs):
260
281
span .set_attribute (key , value )
261
282
262
283
rv = wrapped (* args , ** kwargs )
263
- if isinstance (rv , dict ) and span .is_recording ():
284
+
285
+ body = rv .body if es_transport_split else rv
286
+ if isinstance (body , dict ) and span .is_recording ():
264
287
for member in _ATTRIBUTES_FROM_RESULT :
265
- if member in rv :
288
+ if member in body :
266
289
span .set_attribute (
267
290
f"elasticsearch.{ member } " ,
268
- str (rv [member ]),
291
+ str (body [member ]),
292
+ )
293
+
294
+ # since the transport split the raising of exceptions that set the error status
295
+ # are called after this code so need to set error status manually
296
+ if es_transport_split and span .is_recording ():
297
+ if not (method == "HEAD" and rv .meta .status == 404 ) and (
298
+ not 200 <= rv .meta .status < 299
299
+ ):
300
+ exception = elasticsearch .exceptions .HTTP_EXCEPTIONS .get (
301
+ rv .meta .status , elasticsearch .exceptions .ApiError
302
+ )
303
+ message = str (body )
304
+ if isinstance (body , dict ):
305
+ error = body .get ("error" , message )
306
+ if isinstance (error , dict ) and "type" in error :
307
+ error = error ["type" ]
308
+ message = error
309
+
310
+ span .set_status (
311
+ Status (
312
+ status_code = StatusCode .ERROR ,
313
+ description = f"{ exception .__name__ } : { message } " ,
269
314
)
315
+ )
270
316
271
317
if callable (response_hook ):
272
- response_hook (span , rv )
318
+ response_hook (span , body )
273
319
return rv
274
320
275
321
return wrapper
0 commit comments