Skip to content

Commit 586a87a

Browse files
committed
Review comments. Move types to types.py, move some functions to util.py. Tested and verified docs.
1 parent d2c4fb7 commit 586a87a

File tree

3 files changed

+179
-150
lines changed

3 files changed

+179
-150
lines changed

instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py

+24-150
Original file line numberDiff line numberDiff line change
@@ -126,187 +126,61 @@ def response_hook(span, instance, response):
126126
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
127127
from opentelemetry.instrumentation.redis.package import _instruments
128128
from opentelemetry.instrumentation.redis.util import (
129-
_extract_conn_attributes,
129+
_add_create_attributes,
130+
_add_search_attributes,
131+
_build_span_meta_data_for_pipeline,
132+
_build_span_name,
130133
_format_command_args,
131-
_set_span_attribute_if_value,
132-
_value_or_none,
134+
_set_connection_attributes,
133135
)
134136
from opentelemetry.instrumentation.redis.version import __version__
135137
from opentelemetry.instrumentation.utils import unwrap
136138
from opentelemetry.semconv.trace import SpanAttributes
137139
from opentelemetry.trace import (
138-
Span,
139140
StatusCode,
140141
Tracer,
141142
TracerProvider,
142143
get_tracer,
143144
)
144145

145146
if TYPE_CHECKING:
146-
from typing import Awaitable, TypeVar
147+
from typing import Awaitable
147148

148149
import redis.asyncio.client
149150
import redis.asyncio.cluster
150151
import redis.client
151152
import redis.cluster
152153
import redis.connection
153154

154-
RequestHook = Callable[
155-
[Span, redis.connection.Connection, list[Any], dict[str, Any]], None
156-
]
157-
ResponseHook = Callable[[Span, redis.connection.Connection, Any], None]
158-
159-
AsyncPipelineInstance = TypeVar(
160-
"AsyncPipelineInstance",
161-
redis.asyncio.client.Pipeline,
162-
redis.asyncio.cluster.ClusterPipeline,
163-
)
164-
AsyncRedisInstance = TypeVar(
165-
"AsyncRedisInstance", redis.asyncio.Redis, redis.asyncio.RedisCluster
166-
)
167-
PipelineInstance = TypeVar(
168-
"PipelineInstance",
169-
redis.client.Pipeline,
170-
redis.cluster.ClusterPipeline,
171-
)
172-
RedisInstance = TypeVar(
173-
"RedisInstance", redis.client.Redis, redis.cluster.RedisCluster
155+
from opentelemetry.instrumentation.redis.types import (
156+
AsyncPipelineInstance,
157+
AsyncRedisInstance,
158+
PipelineInstance,
159+
R,
160+
RedisInstance,
161+
RequestHook,
162+
ResponseHook,
174163
)
175-
R = TypeVar("R")
176164

177165

178-
_DEFAULT_SERVICE = "redis"
179166
_logger = logging.getLogger(__name__)
180167

181168
_REDIS_ASYNCIO_VERSION = (4, 2, 0)
182169
_REDIS_CLUSTER_VERSION = (4, 1, 0)
183170
_REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 2)
184171

185-
_FIELD_TYPES = ["NUMERIC", "TEXT", "GEO", "TAG", "VECTOR"]
186172

187173
_CLIENT_ASYNCIO_SUPPORT = redis.VERSION >= _REDIS_ASYNCIO_VERSION
188174
_CLIENT_ASYNCIO_CLUSTER_SUPPORT = (
189175
redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION
190176
)
191177
_CLIENT_CLUSTER_SUPPORT = redis.VERSION >= _REDIS_CLUSTER_VERSION
192-
_CLIENT_BEFORE_3_0_0 = redis.VERSION < (3, 0, 0)
178+
_CLIENT_BEFORE_V3 = redis.VERSION < (3, 0, 0)
193179

194180
if _CLIENT_ASYNCIO_SUPPORT:
195181
import redis.asyncio
196182

197-
INSTRUMENTATION_ATTR = "_is_instrumented_by_opentelemetry"
198-
199-
200-
def _set_connection_attributes(
201-
span: Span, conn: RedisInstance | AsyncRedisInstance
202-
) -> None:
203-
if not span.is_recording() or not hasattr(conn, "connection_pool"):
204-
return
205-
for key, value in _extract_conn_attributes(
206-
conn.connection_pool.connection_kwargs
207-
).items():
208-
span.set_attribute(key, value)
209-
210-
211-
def _build_span_name(
212-
instance: RedisInstance | AsyncRedisInstance, cmd_args: tuple[Any, ...]
213-
) -> str:
214-
if len(cmd_args) > 0 and cmd_args[0]:
215-
if cmd_args[0] == "FT.SEARCH":
216-
name = "redis.search"
217-
elif cmd_args[0] == "FT.CREATE":
218-
name = "redis.create_index"
219-
else:
220-
name = cmd_args[0]
221-
else:
222-
name = instance.connection_pool.connection_kwargs.get("db", 0)
223-
return name
224-
225-
226-
def _add_create_attributes(span: Span, args: tuple[Any, ...]):
227-
_set_span_attribute_if_value(
228-
span, "redis.create_index.index", _value_or_none(args, 1)
229-
)
230-
# According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
231-
try:
232-
schema_index = args.index("SCHEMA")
233-
except ValueError:
234-
return
235-
schema = args[schema_index:]
236-
field_attribute = ""
237-
# Schema in format:
238-
# [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
239-
field_attribute = "".join(
240-
f"Field(name: {schema[index - 1]}, type: {schema[index]});"
241-
for index in range(1, len(schema))
242-
if schema[index] in _FIELD_TYPES
243-
)
244-
_set_span_attribute_if_value(
245-
span,
246-
"redis.create_index.fields",
247-
field_attribute,
248-
)
249-
250-
251-
def _add_search_attributes(span: Span, response, args):
252-
_set_span_attribute_if_value(
253-
span, "redis.search.index", _value_or_none(args, 1)
254-
)
255-
_set_span_attribute_if_value(
256-
span, "redis.search.query", _value_or_none(args, 2)
257-
)
258-
# Parse response from search
259-
# https://redis.io/docs/latest/commands/ft.search/
260-
# Response in format:
261-
# [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
262-
# Returned documents in array format:
263-
# [first_field_name, first_field_value, second_field_name, second_field_value ...]
264-
number_of_returned_documents = _value_or_none(response, 0)
265-
_set_span_attribute_if_value(
266-
span, "redis.search.total", number_of_returned_documents
267-
)
268-
if "NOCONTENT" in args or not number_of_returned_documents:
269-
return
270-
for document_number in range(number_of_returned_documents):
271-
document_index = _value_or_none(response, 1 + 2 * document_number)
272-
if document_index:
273-
document = response[2 + 2 * document_number]
274-
for attribute_name_index in range(0, len(document), 2):
275-
_set_span_attribute_if_value(
276-
span,
277-
f"redis.search.xdoc_{document_index}.{document[attribute_name_index]}",
278-
document[attribute_name_index + 1],
279-
)
280-
281-
282-
def _build_span_meta_data_for_pipeline(
283-
instance: PipelineInstance | AsyncPipelineInstance,
284-
) -> tuple[list[Any], str, str]:
285-
try:
286-
command_stack = (
287-
instance.command_stack
288-
if hasattr(instance, "command_stack")
289-
else instance._command_stack
290-
)
291-
292-
cmds = [
293-
_format_command_args(c.args if hasattr(c, "args") else c[0])
294-
for c in command_stack
295-
]
296-
resource = "\n".join(cmds)
297-
298-
span_name = " ".join(
299-
[
300-
(c.args[0] if hasattr(c, "args") else c[0][0])
301-
for c in command_stack
302-
]
303-
)
304-
except (AttributeError, IndexError):
305-
command_stack = []
306-
resource = ""
307-
span_name = ""
308-
309-
return command_stack, resource, span_name
183+
_INSTRUMENTATION_ATTR = "_is_instrumented_by_opentelemetry"
310184

311185

312186
def _traced_execute_factory(
@@ -479,8 +353,8 @@ def _instrument(
479353
_traced_execute_pipeline = _traced_execute_pipeline_factory(
480354
tracer, request_hook, response_hook
481355
)
482-
pipeline_class = "BasePipeline" if _CLIENT_BEFORE_3_0_0 else "Pipeline"
483-
redis_class = "StrictRedis" if _CLIENT_BEFORE_3_0_0 else "Redis"
356+
pipeline_class = "BasePipeline" if _CLIENT_BEFORE_V3 else "Pipeline"
357+
redis_class = "StrictRedis" if _CLIENT_BEFORE_V3 else "Redis"
484358

485359
wrap_function_wrapper(
486360
"redis", f"{redis_class}.execute_command", _traced_execute_command
@@ -677,7 +551,7 @@ def _instrument(self, **kwargs: Any):
677551
)
678552

679553
def _uninstrument(self, **kwargs: Any):
680-
if _CLIENT_BEFORE_3_0_0:
554+
if _CLIENT_BEFORE_V3:
681555
unwrap(redis.StrictRedis, "execute_command")
682556
unwrap(redis.StrictRedis, "pipeline")
683557
unwrap(redis.Redis, "pipeline")
@@ -739,16 +613,16 @@ def instrument_client(
739613
740614
The ``args`` represents the response.
741615
"""
742-
if not hasattr(client, INSTRUMENTATION_ATTR):
743-
setattr(client, INSTRUMENTATION_ATTR, False)
744-
if not getattr(client, INSTRUMENTATION_ATTR):
616+
if not hasattr(client, _INSTRUMENTATION_ATTR):
617+
setattr(client, _INSTRUMENTATION_ATTR, False)
618+
if not getattr(client, _INSTRUMENTATION_ATTR):
745619
_instrument_client(
746620
client,
747621
RedisInstrumentor._get_tracer(tracer_provider=tracer_provider),
748622
request_hook=request_hook,
749623
response_hook=response_hook,
750624
)
751-
setattr(client, INSTRUMENTATION_ATTR, True)
625+
setattr(client, _INSTRUMENTATION_ATTR, True)
752626
else:
753627
_logger.warning(
754628
"Attempting to instrument Redis connection while already instrumented"
@@ -767,7 +641,7 @@ def uninstrument_client(
767641
Args:
768642
client: The redis client
769643
"""
770-
if getattr(client, INSTRUMENTATION_ATTR):
644+
if getattr(client, _INSTRUMENTATION_ATTR):
771645
# for all clients we need to unwrap execute_command and pipeline functions
772646
unwrap(client, "execute_command")
773647
# the method was creating a pipeline and wrapping the functions of the
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from typing import Any, Callable, TypeVar
2+
3+
import redis.asyncio.client
4+
import redis.asyncio.cluster
5+
import redis.client
6+
import redis.cluster
7+
import redis.connection
8+
9+
from opentelemetry.trace import Span
10+
11+
RequestHook = Callable[
12+
[Span, redis.connection.Connection, list[Any], dict[str, Any]], None
13+
]
14+
ResponseHook = Callable[[Span, redis.connection.Connection, Any], None]
15+
16+
AsyncPipelineInstance = TypeVar(
17+
"AsyncPipelineInstance",
18+
redis.asyncio.client.Pipeline,
19+
redis.asyncio.cluster.ClusterPipeline,
20+
)
21+
AsyncRedisInstance = TypeVar(
22+
"AsyncRedisInstance", redis.asyncio.Redis, redis.asyncio.RedisCluster
23+
)
24+
PipelineInstance = TypeVar(
25+
"PipelineInstance",
26+
redis.client.Pipeline,
27+
redis.cluster.ClusterPipeline,
28+
)
29+
RedisInstance = TypeVar(
30+
"RedisInstance", redis.client.Redis, redis.cluster.RedisCluster
31+
)
32+
R = TypeVar("R")

0 commit comments

Comments
 (0)