Skip to content

Commit 2c291bf

Browse files
committed
Rough draft of the contribution
1 parent 72576f6 commit 2c291bf

File tree

2 files changed

+346
-106
lines changed

2 files changed

+346
-106
lines changed

instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py

+235-92
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,62 @@ def _build_span_name(
184184
return name
185185

186186

187+
def _add_create_attributes(span: Span, args: tuple[Any, ...]):
188+
_set_span_attribute_if_value(
189+
span, "redis.create_index.index", _value_or_none(args, 1)
190+
)
191+
# According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
192+
try:
193+
schema_index = args.index("SCHEMA")
194+
except ValueError:
195+
return
196+
schema = args[schema_index:]
197+
field_attribute = ""
198+
# Schema in format:
199+
# [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
200+
field_attribute = "".join(
201+
f"Field(name: {schema[index - 1]}, type: {schema[index]});"
202+
for index in range(1, len(schema))
203+
if schema[index] in _FIELD_TYPES
204+
)
205+
_set_span_attribute_if_value(
206+
span,
207+
"redis.create_index.fields",
208+
field_attribute,
209+
)
210+
211+
212+
def _add_search_attributes(span: Span, response, args):
213+
_set_span_attribute_if_value(
214+
span, "redis.search.index", _value_or_none(args, 1)
215+
)
216+
_set_span_attribute_if_value(
217+
span, "redis.search.query", _value_or_none(args, 2)
218+
)
219+
# Parse response from search
220+
# https://redis.io/docs/latest/commands/ft.search/
221+
# Response in format:
222+
# [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
223+
# Returned documents in array format:
224+
# [first_field_name, first_field_value, second_field_name, second_field_value ...]
225+
number_of_returned_documents = _value_or_none(response, 0)
226+
_set_span_attribute_if_value(
227+
span, "redis.search.total", number_of_returned_documents
228+
)
229+
if "NOCONTENT" in args or not number_of_returned_documents:
230+
return
231+
for document_number in range(number_of_returned_documents):
232+
document_index = _value_or_none(response, 1 + 2 * document_number)
233+
if document_index:
234+
document = response[2 + 2 * document_number]
235+
for attribute_name_index in range(0, len(document), 2):
236+
_set_span_attribute_if_value(
237+
span,
238+
f"redis.search.xdoc_{document_index}.{document[attribute_name_index]}",
239+
document[attribute_name_index + 1],
240+
)
241+
242+
187243
def _build_span_meta_data_for_pipeline(
188244
instance: PipelineInstance | AsyncPipelineInstance,
189245
) -> tuple[list[Any], str, str]:
@@ -214,11 +270,10 @@ def _build_span_meta_data_for_pipeline(
214270
return command_stack, resource, span_name
215271

216272

217-
# pylint: disable=R0915
218-
def _instrument(
219-
tracer: Tracer,
220-
request_hook: _RequestHookT | None = None,
221-
response_hook: _ResponseHookT | None = None,
273+
def _traced_execute_factory(
274+
tracer,
275+
request_hook: _RequestHookT = None,
276+
response_hook: _ResponseHookT = None,
222277
):
223278
def _traced_execute_command(
224279
func: Callable[..., R],
@@ -247,6 +302,14 @@ def _traced_execute_command(
247302
response_hook(span, instance, response)
248303
return response
249304

305+
return _traced_execute_command
306+
307+
308+
def _traced_execute_pipeline_factory(
309+
tracer,
310+
request_hook: _RequestHookT = None,
311+
response_hook: _ResponseHookT = None,
312+
):
250313
def _traced_execute_pipeline(
251314
func: Callable[..., R],
252315
instance: PipelineInstance,
@@ -284,90 +347,14 @@ def _traced_execute_pipeline(
284347

285348
return response
286349

287-
def _add_create_attributes(span: Span, args: tuple[Any, ...]):
288-
_set_span_attribute_if_value(
289-
span, "redis.create_index.index", _value_or_none(args, 1)
290-
)
291-
# According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
292-
try:
293-
schema_index = args.index("SCHEMA")
294-
except ValueError:
295-
return
296-
schema = args[schema_index:]
297-
field_attribute = ""
298-
# Schema in format:
299-
# [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
300-
field_attribute = "".join(
301-
f"Field(name: {schema[index - 1]}, type: {schema[index]});"
302-
for index in range(1, len(schema))
303-
if schema[index] in _FIELD_TYPES
304-
)
305-
_set_span_attribute_if_value(
306-
span,
307-
"redis.create_index.fields",
308-
field_attribute,
309-
)
310-
311-
def _add_search_attributes(span: Span, response, args):
312-
_set_span_attribute_if_value(
313-
span, "redis.search.index", _value_or_none(args, 1)
314-
)
315-
_set_span_attribute_if_value(
316-
span, "redis.search.query", _value_or_none(args, 2)
317-
)
318-
# Parse response from search
319-
# https://redis.io/docs/latest/commands/ft.search/
320-
# Response in format:
321-
# [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
322-
# Returned documents in array format:
323-
# [first_field_name, first_field_value, second_field_name, second_field_value ...]
324-
number_of_returned_documents = _value_or_none(response, 0)
325-
_set_span_attribute_if_value(
326-
span, "redis.search.total", number_of_returned_documents
327-
)
328-
if "NOCONTENT" in args or not number_of_returned_documents:
329-
return
330-
for document_number in range(number_of_returned_documents):
331-
document_index = _value_or_none(response, 1 + 2 * document_number)
332-
if document_index:
333-
document = response[2 + 2 * document_number]
334-
for attribute_name_index in range(0, len(document), 2):
335-
_set_span_attribute_if_value(
336-
span,
337-
f"redis.search.xdoc_{document_index}.{document[attribute_name_index]}",
338-
document[attribute_name_index + 1],
339-
)
350+
return _traced_execute_pipeline
340351

341-
pipeline_class = (
342-
"BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline"
343-
)
344-
redis_class = "StrictRedis" if redis.VERSION < (3, 0, 0) else "Redis"
345-
346-
wrap_function_wrapper(
347-
"redis", f"{redis_class}.execute_command", _traced_execute_command
348-
)
349-
wrap_function_wrapper(
350-
"redis.client",
351-
f"{pipeline_class}.execute",
352-
_traced_execute_pipeline,
353-
)
354-
wrap_function_wrapper(
355-
"redis.client",
356-
f"{pipeline_class}.immediate_execute_command",
357-
_traced_execute_command,
358-
)
359-
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
360-
wrap_function_wrapper(
361-
"redis.cluster",
362-
"RedisCluster.execute_command",
363-
_traced_execute_command,
364-
)
365-
wrap_function_wrapper(
366-
"redis.cluster",
367-
"ClusterPipeline.execute",
368-
_traced_execute_pipeline,
369-
)
370352

353+
def _async_traced_execute_factory(
354+
tracer,
355+
request_hook: _RequestHookT = None,
356+
response_hook: _ResponseHookT = None,
357+
):
371358
async def _async_traced_execute_command(
372359
func: Callable[..., Awaitable[R]],
373360
instance: AsyncRedisInstance,
@@ -391,6 +378,14 @@ async def _async_traced_execute_command(
391378
response_hook(span, instance, response)
392379
return response
393380

381+
return _async_traced_execute_command
382+
383+
384+
def _async_traced_execute_pipeline_factory(
385+
tracer,
386+
request_hook: _RequestHookT = None,
387+
response_hook: _ResponseHookT = None,
388+
):
394389
async def _async_traced_execute_pipeline(
395390
func: Callable[..., Awaitable[R]],
396391
instance: AsyncPipelineInstance,
@@ -430,6 +425,57 @@ async def _async_traced_execute_pipeline(
430425

431426
return response
432427

428+
return _async_traced_execute_pipeline
429+
430+
431+
# pylint: disable=R0915
432+
def _instrument(
433+
tracer: Tracer,
434+
request_hook: _RequestHookT | None = None,
435+
response_hook: _ResponseHookT | None = None,
436+
):
437+
_traced_execute_command = _traced_execute_factory(
438+
tracer, request_hook, response_hook
439+
)
440+
_traced_execute_pipeline = _traced_execute_pipeline_factory(
441+
tracer, request_hook, response_hook
442+
)
443+
pipeline_class = (
444+
"BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline"
445+
)
446+
redis_class = "StrictRedis" if redis.VERSION < (3, 0, 0) else "Redis"
447+
448+
wrap_function_wrapper(
449+
"redis", f"{redis_class}.execute_command", _traced_execute_command
450+
)
451+
wrap_function_wrapper(
452+
"redis.client",
453+
f"{pipeline_class}.execute",
454+
_traced_execute_pipeline,
455+
)
456+
wrap_function_wrapper(
457+
"redis.client",
458+
f"{pipeline_class}.immediate_execute_command",
459+
_traced_execute_command,
460+
)
461+
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
462+
wrap_function_wrapper(
463+
"redis.cluster",
464+
"RedisCluster.execute_command",
465+
_traced_execute_command,
466+
)
467+
wrap_function_wrapper(
468+
"redis.cluster",
469+
"ClusterPipeline.execute",
470+
_traced_execute_pipeline,
471+
)
472+
473+
_async_traced_execute_command = _async_traced_execute_factory(
474+
tracer, request_hook, response_hook
475+
)
476+
_async_traced_execute_pipeline = _async_traced_execute_pipeline_factory(
477+
tracer, request_hook, response_hook
478+
)
433479
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
434480
wrap_function_wrapper(
435481
"redis.asyncio",
@@ -459,6 +505,94 @@ async def _async_traced_execute_pipeline(
459505
)
460506

461507

508+
def _instrument_client(
509+
client,
510+
tracer,
511+
request_hook: _RequestHookT = None,
512+
response_hook: _ResponseHookT = None,
513+
):
514+
# first, handle async clients
515+
_async_traced_execute_command = _async_traced_execute_factory(
516+
tracer, request_hook, response_hook
517+
)
518+
_async_traced_execute_pipeline = _async_traced_execute_pipeline_factory(
519+
tracer, request_hook, response_hook
520+
)
521+
522+
def _async_pipeline_wrapper(func, instance, args, kwargs):
523+
result = func(*args, **kwargs)
524+
wrap_function_wrapper(
525+
result, "execute", _async_traced_execute_pipeline
526+
)
527+
wrap_function_wrapper(
528+
result, "immediate_execute_command", _async_traced_execute_command
529+
)
530+
return result
531+
532+
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
533+
client_type = (
534+
redis.asyncio.StrictRedis
535+
if redis.VERSION < (3, 0, 0)
536+
else redis.asyncio.Redis
537+
)
538+
539+
if isinstance(client, client_type):
540+
wrap_function_wrapper(
541+
client, "execute_command", _async_traced_execute_command
542+
)
543+
wrap_function_wrapper(client, "pipeline", _async_pipeline_wrapper)
544+
return
545+
546+
def _async_cluster_pipeline_wrapper(func, instance, args, kwargs):
547+
result = func(*args, **kwargs)
548+
wrap_function_wrapper(
549+
result, "execute", _async_traced_execute_pipeline
550+
)
551+
return result
552+
553+
# handle
554+
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION and isinstance(
555+
client, redis.asyncio.RedisCluster
556+
):
557+
wrap_function_wrapper(
558+
client, "execute_command", _async_traced_execute_command
559+
)
560+
wrap_function_wrapper(
561+
client, "pipeline", _async_cluster_pipeline_wrapper
562+
)
563+
return
564+
# for redis.client.Redis, redis.Cluster and v3.0.0 redis.client.StrictRedis
565+
# the wrappers are the same
566+
# client_type = (
567+
# redis.client.StrictRedis if redis.VERSION < (3, 0, 0) else redis.client.Redis
568+
# )
569+
_traced_execute_command = _traced_execute_factory(
570+
tracer, request_hook, response_hook
571+
)
572+
_traced_execute_pipeline = _traced_execute_pipeline_factory(
573+
tracer, request_hook, response_hook
574+
)
575+
576+
def _pipeline_wrapper(func, instance, args, kwargs):
577+
result = func(*args, **kwargs)
578+
wrap_function_wrapper(result, "execute", _traced_execute_pipeline)
579+
wrap_function_wrapper(
580+
result, "immediate_execute_command", _traced_execute_command
581+
)
582+
return result
583+
584+
wrap_function_wrapper(
585+
client,
586+
"execute_command",
587+
_traced_execute_command,
588+
)
589+
wrap_function_wrapper(
590+
client,
591+
"pipeline",
592+
_pipeline_wrapper,
593+
)
594+
595+
462596
class RedisInstrumentor(BaseInstrumentor):
463597
"""An instrumentor for Redis.
464598
@@ -483,11 +617,20 @@ def _instrument(self, **kwargs: Any):
483617
tracer_provider=tracer_provider,
484618
schema_url="https://opentelemetry.io/schemas/1.11.0",
485619
)
486-
_instrument(
487-
tracer,
488-
request_hook=kwargs.get("request_hook"),
489-
response_hook=kwargs.get("response_hook"),
490-
)
620+
redis_client = kwargs.get("client")
621+
if redis_client:
622+
_instrument_client(
623+
redis_client,
624+
tracer,
625+
request_hook=kwargs.get("request_hook"),
626+
response_hook=kwargs.get("response_hook"),
627+
)
628+
else:
629+
_instrument(
630+
tracer,
631+
request_hook=kwargs.get("request_hook"),
632+
response_hook=kwargs.get("response_hook"),
633+
)
491634

492635
def _uninstrument(self, **kwargs: Any):
493636
if redis.VERSION < (3, 0, 0):

0 commit comments

Comments
 (0)