diff --git a/CHANGELOG.md b/CHANGELOG.md index b1e78dc46c..9318994a41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3275](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3275)) - `opentelemetry-instrumentation-botocore` Add support for GenAI tool events ([#3302](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3302)) +- `opentelemetry-instrumentation-botocore` Add support for GenAI metrics + ([#3326](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3326)) - `opentelemetry-instrumentation` make it simpler to initialize auto-instrumentation programmatically ([#3273](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3273)) - Add `opentelemetry-instrumentation-vertexai>=2.0b0` to `opentelemetry-bootstrap` diff --git a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml index 8005f4597b..849cae21dc 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml +++ b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml @@ -26,7 +26,7 @@ classifiers = [ "Programming Language :: Python :: 3.13", ] dependencies = [ - "opentelemetry-api ~= 1.12", + "opentelemetry-api ~= 1.30", "opentelemetry-instrumentation == 0.52b0.dev", "opentelemetry-semantic-conventions == 0.52b0.dev", "opentelemetry-propagator-aws-xray ~= 1.0", diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py index 39d339ae0c..038ff01db6 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py @@ -76,6 +76,38 @@ def response_hook(span, service_name, operation_name, result): ) ec2 = self.session.create_client("ec2", region_name="us-west-2") ec2.describe_instances() + +Extensions +---------- + +The instrumentation supports creating extensions for AWS services for enriching what is collected. We have extensions +for the following AWS services: + +- Bedrock Runtime +- DynamoDB +- Lambda +- SNS +- SQS + +Bedrock Runtime +*************** + +This extension implements the GenAI semantic conventions for the following API calls: + +- Converse +- ConverseStream +- InvokeModel +- InvokeModelWithResponseStream + +For the Converse and ConverseStream APIs tracing, events and metrics are implemented. + +For the InvokeModel and InvokeModelWithResponseStream APIs tracing, events and metrics implemented only for a subset of +the available models, namely: +- Amazon Titan models +- Amazon Nova models +- Anthropic Claude + +There is no support for tool calls with Amazon Models for the InvokeModel and InvokeModelWithResponseStream APIs. """ import logging @@ -104,6 +136,7 @@ def response_hook(span, service_name, operation_name, result): suppress_http_instrumentation, unwrap, ) +from opentelemetry.metrics import Instrument, Meter, get_meter from opentelemetry.propagators.aws.aws_xray_propagator import AwsXRayPropagator from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace import get_tracer @@ -134,6 +167,10 @@ def _instrument(self, **kwargs): self._tracers = {} # event_loggers are lazy initialized per-extension in _get_event_logger self._event_loggers = {} + # meters are lazy initialized per-extension in _get_meter + self._meters = {} + # metrics are lazy initialized per-extension in _get_metrics + self._metrics: Dict[str, Dict[str, Instrument]] = {} self.request_hook = kwargs.get("request_hook") self.response_hook = kwargs.get("response_hook") @@ -144,6 +181,7 @@ def _instrument(self, **kwargs): self.tracer_provider = kwargs.get("tracer_provider") self.event_logger_provider = kwargs.get("event_logger_provider") + self.meter_provider = kwargs.get("meter_provider") wrap_function_wrapper( "botocore.client", @@ -201,6 +239,38 @@ def _get_event_logger(self, extension: _AwsSdkExtension): return self._event_loggers[instrumentation_name] + def _get_meter(self, extension: _AwsSdkExtension): + """This is a multiplexer in order to have a meter per extension""" + + instrumentation_name = self._get_instrumentation_name(extension) + meter = self._meters.get(instrumentation_name) + if meter: + return meter + + schema_version = extension.meter_schema_version() + self._meters[instrumentation_name] = get_meter( + instrumentation_name, + "", + schema_url=f"https://opentelemetry.io/schemas/{schema_version}", + meter_provider=self.meter_provider, + ) + + return self._meters[instrumentation_name] + + def _get_metrics( + self, extension: _AwsSdkExtension, meter: Meter + ) -> Dict[str, Instrument]: + """This is a multiplexer for lazy initialization of metrics required by extensions""" + instrumentation_name = self._get_instrumentation_name(extension) + metrics = self._metrics.get(instrumentation_name) + if metrics is not None: + return metrics + + self._metrics.setdefault(instrumentation_name, {}) + metrics = self._metrics[instrumentation_name] + _safe_invoke(extension.setup_metrics, meter, metrics) + return metrics + def _uninstrument(self, **kwargs): unwrap(BaseClient, "_make_api_call") unwrap(Endpoint, "prepare_request") @@ -244,8 +314,11 @@ def _patched_api_call(self, original_func, instance, args, kwargs): tracer = self._get_tracer(extension) event_logger = self._get_event_logger(extension) + meter = self._get_meter(extension) + metrics = self._get_metrics(extension, meter) instrumentor_ctx = _BotocoreInstrumentorContext( - event_logger=event_logger + event_logger=event_logger, + metrics=metrics, ) with tracer.start_as_current_span( call_context.span_name, diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py index 36f1dfdb97..e56624b6d4 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py @@ -21,6 +21,7 @@ import io import json import logging +from timeit import default_timer from typing import Any from botocore.eventstream import EventStream @@ -39,6 +40,7 @@ _BotoClientErrorT, _BotocoreInstrumentorContext, ) +from opentelemetry.metrics import Instrument, Meter from opentelemetry.semconv._incubating.attributes.error_attributes import ( ERROR_TYPE, ) @@ -51,16 +53,56 @@ GEN_AI_REQUEST_TOP_P, GEN_AI_RESPONSE_FINISH_REASONS, GEN_AI_SYSTEM, + GEN_AI_TOKEN_TYPE, GEN_AI_USAGE_INPUT_TOKENS, GEN_AI_USAGE_OUTPUT_TOKENS, GenAiOperationNameValues, GenAiSystemValues, + GenAiTokenTypeValues, +) +from opentelemetry.semconv._incubating.metrics.gen_ai_metrics import ( + GEN_AI_CLIENT_OPERATION_DURATION, + GEN_AI_CLIENT_TOKEN_USAGE, ) from opentelemetry.trace.span import Span from opentelemetry.trace.status import Status, StatusCode _logger = logging.getLogger(__name__) +_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [ + 0.01, + 0.02, + 0.04, + 0.08, + 0.16, + 0.32, + 0.64, + 1.28, + 2.56, + 5.12, + 10.24, + 20.48, + 40.96, + 81.92, +] + +_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [ + 1, + 4, + 16, + 64, + 256, + 1024, + 4096, + 16384, + 65536, + 262144, + 1048576, + 4194304, + 16777216, + 67108864, +] + _MODEL_ID_KEY: str = "modelId" @@ -88,6 +130,40 @@ def should_end_span_on_exit(self): not in self._DONT_CLOSE_SPAN_ON_END_OPERATIONS ) + def setup_metrics(self, meter: Meter, metrics: dict[str, Instrument]): + metrics[GEN_AI_CLIENT_OPERATION_DURATION] = meter.create_histogram( + name=GEN_AI_CLIENT_OPERATION_DURATION, + description="GenAI operation duration", + unit="s", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, + ) + metrics[GEN_AI_CLIENT_TOKEN_USAGE] = meter.create_histogram( + name=GEN_AI_CLIENT_TOKEN_USAGE, + description="Measures number of input and output tokens used", + unit="{token}", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, + ) + + def _extract_metrics_attributes(self) -> _AttributeMapT: + attributes = {GEN_AI_SYSTEM: GenAiSystemValues.AWS_BEDROCK.value} + + model_id = self._call_context.params.get(_MODEL_ID_KEY) + if not model_id: + return attributes + + attributes[GEN_AI_REQUEST_MODEL] = model_id + + # titan in invoke model is a text completion one + if "body" in self._call_context.params and "amazon.titan" in model_id: + attributes[GEN_AI_OPERATION_NAME] = ( + GenAiOperationNameValues.TEXT_COMPLETION.value + ) + else: + attributes[GEN_AI_OPERATION_NAME] = ( + GenAiOperationNameValues.CHAT.value + ) + return attributes + def extract_attributes(self, attributes: _AttributeMapT): if self._call_context.operation not in self._HANDLED_OPERATIONS: return @@ -251,16 +327,18 @@ def before_service_call( for event in message_to_event(message, capture_content): event_logger.emit(event) - if not span.is_recording(): - return + if span.is_recording(): + operation_name = span.attributes.get(GEN_AI_OPERATION_NAME, "") + request_model = span.attributes.get(GEN_AI_REQUEST_MODEL, "") + # avoid setting to an empty string if are not available + if operation_name and request_model: + span.update_name(f"{operation_name} {request_model}") - operation_name = span.attributes.get(GEN_AI_OPERATION_NAME, "") - request_model = span.attributes.get(GEN_AI_REQUEST_MODEL, "") - # avoid setting to an empty string if are not available - if operation_name and request_model: - span.update_name(f"{operation_name} {request_model}") + # this is used to calculate the operation duration metric, duration may be skewed by request_hook + # pylint: disable=attribute-defined-outside-init + self._operation_start = default_timer() - # pylint: disable=no-self-use + # pylint: disable=no-self-use,too-many-locals def _converse_on_success( self, span: Span, @@ -300,6 +378,37 @@ def _converse_on_success( ) ) + metrics = instrumentor_context.metrics + metrics_attributes = self._extract_metrics_attributes() + if operation_duration_histogram := metrics.get( + GEN_AI_CLIENT_OPERATION_DURATION + ): + duration = max((default_timer() - self._operation_start), 0) + operation_duration_histogram.record( + duration, + attributes=metrics_attributes, + ) + + if token_usage_histogram := metrics.get(GEN_AI_CLIENT_TOKEN_USAGE): + if usage := result.get("usage"): + if input_tokens := usage.get("inputTokens"): + input_attributes = { + **metrics_attributes, + GEN_AI_TOKEN_TYPE: GenAiTokenTypeValues.INPUT.value, + } + token_usage_histogram.record( + input_tokens, input_attributes + ) + + if output_tokens := usage.get("outputTokens"): + output_attributes = { + **metrics_attributes, + GEN_AI_TOKEN_TYPE: GenAiTokenTypeValues.COMPLETION.value, + } + token_usage_histogram.record( + output_tokens, output_attributes + ) + def _invoke_model_on_success( self, span: Span, @@ -338,12 +447,31 @@ def _invoke_model_on_success( if original_body is not None: original_body.close() - def _on_stream_error_callback(self, span: Span, exception): + def _on_stream_error_callback( + self, + span: Span, + exception, + instrumentor_context: _BotocoreInstrumentorContext, + ): span.set_status(Status(StatusCode.ERROR, str(exception))) if span.is_recording(): span.set_attribute(ERROR_TYPE, type(exception).__qualname__) span.end() + metrics = instrumentor_context.metrics + metrics_attributes = { + **self._extract_metrics_attributes(), + ERROR_TYPE: type(exception).__qualname__, + } + if operation_duration_histogram := metrics.get( + GEN_AI_CLIENT_OPERATION_DURATION + ): + duration = max((default_timer() - self._operation_start), 0) + operation_duration_histogram.record( + duration, + attributes=metrics_attributes, + ) + def on_success( self, span: Span, @@ -367,7 +495,9 @@ def stream_done_callback(response): span.end() def stream_error_callback(exception): - self._on_stream_error_callback(span, exception) + self._on_stream_error_callback( + span, exception, instrumentor_context + ) result["stream"] = ConverseStreamWrapper( result["stream"], @@ -405,7 +535,9 @@ def invoke_model_stream_done_callback(response): span.end() def invoke_model_stream_error_callback(exception): - self._on_stream_error_callback(span, exception) + self._on_stream_error_callback( + span, exception, instrumentor_context + ) result["body"] = InvokeModelWithResponseStreamWrapper( result["body"], @@ -415,7 +547,7 @@ def invoke_model_stream_error_callback(exception): ) return - # pylint: disable=no-self-use + # pylint: disable=no-self-use,too-many-locals def _handle_amazon_titan_response( self, span: Span, @@ -445,7 +577,38 @@ def _handle_amazon_titan_response( ) event_logger.emit(choice.to_choice_event()) - # pylint: disable=no-self-use + metrics = instrumentor_context.metrics + metrics_attributes = self._extract_metrics_attributes() + if operation_duration_histogram := metrics.get( + GEN_AI_CLIENT_OPERATION_DURATION + ): + duration = max((default_timer() - self._operation_start), 0) + operation_duration_histogram.record( + duration, + attributes=metrics_attributes, + ) + + if token_usage_histogram := metrics.get(GEN_AI_CLIENT_TOKEN_USAGE): + if input_tokens := response_body.get("inputTextTokenCount"): + input_attributes = { + **metrics_attributes, + GEN_AI_TOKEN_TYPE: GenAiTokenTypeValues.INPUT.value, + } + token_usage_histogram.record( + input_tokens, input_attributes + ) + + if results := response_body.get("results"): + if output_tokens := results[0].get("tokenCount"): + output_attributes = { + **metrics_attributes, + GEN_AI_TOKEN_TYPE: GenAiTokenTypeValues.COMPLETION.value, + } + token_usage_histogram.record( + output_tokens, output_attributes + ) + + # pylint: disable=no-self-use,too-many-locals def _handle_amazon_nova_response( self, span: Span, @@ -472,6 +635,37 @@ def _handle_amazon_nova_response( choice = _Choice.from_converse(response_body, capture_content) event_logger.emit(choice.to_choice_event()) + metrics = instrumentor_context.metrics + metrics_attributes = self._extract_metrics_attributes() + if operation_duration_histogram := metrics.get( + GEN_AI_CLIENT_OPERATION_DURATION + ): + duration = max((default_timer() - self._operation_start), 0) + operation_duration_histogram.record( + duration, + attributes=metrics_attributes, + ) + + if token_usage_histogram := metrics.get(GEN_AI_CLIENT_TOKEN_USAGE): + if usage := response_body.get("usage"): + if input_tokens := usage.get("inputTokens"): + input_attributes = { + **metrics_attributes, + GEN_AI_TOKEN_TYPE: GenAiTokenTypeValues.INPUT.value, + } + token_usage_histogram.record( + input_tokens, input_attributes + ) + + if output_tokens := usage.get("outputTokens"): + output_attributes = { + **metrics_attributes, + GEN_AI_TOKEN_TYPE: GenAiTokenTypeValues.COMPLETION.value, + } + token_usage_histogram.record( + output_tokens, output_attributes + ) + # pylint: disable=no-self-use def _handle_anthropic_claude_response( self, @@ -500,6 +694,37 @@ def _handle_anthropic_claude_response( ) event_logger.emit(choice.to_choice_event()) + metrics = instrumentor_context.metrics + metrics_attributes = self._extract_metrics_attributes() + if operation_duration_histogram := metrics.get( + GEN_AI_CLIENT_OPERATION_DURATION + ): + duration = max((default_timer() - self._operation_start), 0) + operation_duration_histogram.record( + duration, + attributes=metrics_attributes, + ) + + if token_usage_histogram := metrics.get(GEN_AI_CLIENT_TOKEN_USAGE): + if usage := response_body.get("usage"): + if input_tokens := usage.get("input_tokens"): + input_attributes = { + **metrics_attributes, + GEN_AI_TOKEN_TYPE: GenAiTokenTypeValues.INPUT.value, + } + token_usage_histogram.record( + input_tokens, input_attributes + ) + + if output_tokens := usage.get("output_tokens"): + output_attributes = { + **metrics_attributes, + GEN_AI_TOKEN_TYPE: GenAiTokenTypeValues.COMPLETION.value, + } + token_usage_histogram.record( + output_tokens, output_attributes + ) + def on_error( self, span: Span, @@ -515,3 +740,17 @@ def on_error( if not self.should_end_span_on_exit(): span.end() + + metrics = instrumentor_context.metrics + metrics_attributes = { + **self._extract_metrics_attributes(), + ERROR_TYPE: type(exception).__qualname__, + } + if operation_duration_histogram := metrics.get( + GEN_AI_CLIENT_OPERATION_DURATION + ): + duration = max((default_timer() - self._operation_start), 0) + operation_duration_histogram.record( + duration, + attributes=metrics_attributes, + ) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py index 7de2ac9c23..9017e79612 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py @@ -12,10 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import logging from typing import Any, Dict, Optional, Tuple from opentelemetry._events import EventLogger +from opentelemetry.metrics import Instrument, Meter from opentelemetry.trace import SpanKind from opentelemetry.trace.span import Span from opentelemetry.util.types import AttributeValue @@ -91,8 +94,13 @@ def _get_attr(obj, name: str, default=None): class _BotocoreInstrumentorContext: - def __init__(self, event_logger: EventLogger): + def __init__( + self, + event_logger: EventLogger, + metrics: Dict[str, Instrument] | None = None, + ): self.event_logger = event_logger + self.metrics = metrics or {} class _AwsSdkExtension: @@ -109,6 +117,11 @@ def event_logger_schema_version() -> str: """Returns the event logger OTel schema version the extension is following""" return "1.30.0" + @staticmethod + def meter_schema_version() -> str: + """Returns the meter OTel schema version the extension is following""" + return "1.30.0" + def should_trace_service_call(self) -> bool: # pylint:disable=no-self-use """Returns if the AWS SDK service call should be traced or not @@ -125,6 +138,12 @@ def should_end_span_on_exit(self) -> bool: # pylint:disable=no-self-use """ return True + def setup_metrics(self, meter: Meter, metrics: Dict[str, Instrument]): + """Callback which gets invoked to setup metrics. + + Extensions might override this function to add to the metrics dictionary all the metrics + they want to receive later in _BotocoreInstrumentorContext.""" + def extract_attributes(self, attributes: _AttributeMapT): """Callback which gets invoked before the span is created. diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py index 54666c1d28..b520171fc3 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py @@ -19,6 +19,11 @@ from botocore.response import StreamingBody +from opentelemetry.instrumentation.botocore.extensions.bedrock import ( + _GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, + _GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, +) +from opentelemetry.sdk.metrics._internal.point import ResourceMetrics from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.semconv._incubating.attributes import ( event_attributes as EventAttributes, @@ -26,6 +31,13 @@ from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAIAttributes, ) +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) +from opentelemetry.semconv._incubating.metrics.gen_ai_metrics import ( + GEN_AI_CLIENT_OPERATION_DURATION, + GEN_AI_CLIENT_TOKEN_USAGE, +) # pylint: disable=too-many-branches, too-many-locals @@ -259,3 +271,107 @@ def assert_message_in_logs(log, event_name, expected_content, parent_span): expected_content ), dict(log.log_record.body) assert_log_parent(log, parent_span) + + +def assert_all_metric_attributes( + data_point, operation_name: str, model: str, error_type: str | None = None +): + assert GenAIAttributes.GEN_AI_OPERATION_NAME in data_point.attributes + assert ( + data_point.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME] + == operation_name + ) + assert GenAIAttributes.GEN_AI_SYSTEM in data_point.attributes + assert ( + data_point.attributes[GenAIAttributes.GEN_AI_SYSTEM] + == GenAIAttributes.GenAiSystemValues.AWS_BEDROCK.value + ) + assert GenAIAttributes.GEN_AI_REQUEST_MODEL in data_point.attributes + assert data_point.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] == model + + if error_type is not None: + assert ERROR_TYPE in data_point.attributes + assert data_point.attributes[ERROR_TYPE] == error_type + else: + assert ERROR_TYPE not in data_point.attributes + + +def assert_metrics( + resource_metrics: ResourceMetrics, + operation_name: str, + model: str, + input_tokens: float | None = None, + output_tokens: float | None = None, + error_type: str | None = None, +): + assert len(resource_metrics) == 1 + + metric_data = resource_metrics[0].scope_metrics[0].metrics + if input_tokens is not None or output_tokens is not None: + expected_metrics_data_len = 2 + else: + expected_metrics_data_len = 1 + assert len(metric_data) == expected_metrics_data_len + + duration_metric = next( + (m for m in metric_data if m.name == GEN_AI_CLIENT_OPERATION_DURATION), + None, + ) + assert duration_metric is not None + + duration_point = duration_metric.data.data_points[0] + assert duration_point.sum > 0 + assert_all_metric_attributes( + duration_point, operation_name, model, error_type + ) + assert duration_point.explicit_bounds == tuple( + _GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS + ) + + if input_tokens is not None: + token_usage_metric = next( + (m for m in metric_data if m.name == GEN_AI_CLIENT_TOKEN_USAGE), + None, + ) + assert token_usage_metric is not None + + input_token_usage = next( + ( + d + for d in token_usage_metric.data.data_points + if d.attributes[GenAIAttributes.GEN_AI_TOKEN_TYPE] + == GenAIAttributes.GenAiTokenTypeValues.INPUT.value + ), + None, + ) + assert input_token_usage is not None + assert input_token_usage.sum == input_tokens + + assert input_token_usage.explicit_bounds == tuple( + _GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS + ) + assert_all_metric_attributes(input_token_usage, operation_name, model) + + if output_tokens is not None: + token_usage_metric = next( + (m for m in metric_data if m.name == GEN_AI_CLIENT_TOKEN_USAGE), + None, + ) + assert token_usage_metric is not None + + output_token_usage = next( + ( + d + for d in token_usage_metric.data.data_points + if d.attributes[GenAIAttributes.GEN_AI_TOKEN_TYPE] + == GenAIAttributes.GenAiTokenTypeValues.COMPLETION.value + ), + None, + ) + assert output_token_usage is not None + assert output_token_usage.sum == output_tokens + + assert output_token_usage.explicit_bounds == tuple( + _GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS + ) + assert_all_metric_attributes(output_token_usage, operation_name, model) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/conftest.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/conftest.py index 5ecf8248b5..0c3f98fe2f 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/tests/conftest.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/conftest.py @@ -17,6 +17,12 @@ InMemoryLogExporter, SimpleLogRecordProcessor, ) +from opentelemetry.sdk.metrics import ( + MeterProvider, +) +from opentelemetry.sdk.metrics.export import ( + InMemoryMetricReader, +) from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( @@ -36,6 +42,12 @@ def fixture_log_exporter(): yield exporter +@pytest.fixture(scope="function", name="metric_reader") +def fixture_metric_reader(): + reader = InMemoryMetricReader() + yield reader + + @pytest.fixture(scope="function", name="tracer_provider") def fixture_tracer_provider(span_exporter): provider = TracerProvider() @@ -52,6 +64,15 @@ def fixture_event_logger_provider(log_exporter): return event_logger_provider +@pytest.fixture(scope="function", name="meter_provider") +def fixture_meter_provider(metric_reader): + meter_provider = MeterProvider( + metric_readers=[metric_reader], + ) + + return meter_provider + + @pytest.fixture def bedrock_runtime_client(): return boto3.client("bedrock-runtime") @@ -81,7 +102,9 @@ def vcr_config(): @pytest.fixture(scope="function") -def instrument_no_content(tracer_provider, event_logger_provider): +def instrument_no_content( + tracer_provider, event_logger_provider, meter_provider +): os.environ.update( {OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: "False"} ) @@ -90,6 +113,7 @@ def instrument_no_content(tracer_provider, event_logger_provider): instrumentor.instrument( tracer_provider=tracer_provider, event_logger_provider=event_logger_provider, + meter_provider=meter_provider, ) yield instrumentor @@ -98,7 +122,9 @@ def instrument_no_content(tracer_provider, event_logger_provider): @pytest.fixture(scope="function") -def instrument_with_content(tracer_provider, event_logger_provider): +def instrument_with_content( + tracer_provider, event_logger_provider, meter_provider +): os.environ.update( {OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: "True"} ) @@ -106,6 +132,7 @@ def instrument_with_content(tracer_provider, event_logger_provider): instrumentor.instrument( tracer_provider=tracer_provider, event_logger_provider=event_logger_provider, + meter_provider=meter_provider, ) yield instrumentor diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py index 600fab7441..795c4c5d8a 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py @@ -34,6 +34,7 @@ assert_completion_attributes_from_streaming_body, assert_converse_completion_attributes, assert_message_in_logs, + assert_metrics, assert_stream_completion_attributes, ) @@ -51,9 +52,11 @@ def filter_message_keys(message, keys): def test_converse_with_content( span_exporter, log_exporter, + metric_reader, bedrock_runtime_client, instrument_with_content, ): + # pylint:disable=too-many-locals messages = [{"role": "user", "content": [{"text": "Say this is a test"}]}] llm_model_value = "amazon.titan-text-lite-v1" @@ -95,6 +98,13 @@ def test_converse_with_content( } assert_message_in_logs(logs[1], "gen_ai.choice", choice_body, span) + input_tokens = response["usage"]["inputTokens"] + output_tokens = response["usage"]["outputTokens"] + metrics = metric_reader.get_metrics_data().resource_metrics + assert_metrics( + metrics, "chat", llm_model_value, input_tokens, output_tokens + ) + @pytest.mark.skipif( BOTO3_VERSION < (1, 35, 56), reason="Converse API not available" @@ -103,6 +113,7 @@ def test_converse_with_content( def test_converse_with_content_different_events( span_exporter, log_exporter, + metric_reader, bedrock_runtime_client, instrument_with_content, ): @@ -150,6 +161,13 @@ def test_converse_with_content_different_events( } assert_message_in_logs(logs[4], "gen_ai.choice", choice_body, span) + input_tokens = response["usage"]["inputTokens"] + output_tokens = response["usage"]["outputTokens"] + metrics = metric_reader.get_metrics_data().resource_metrics + assert_metrics( + metrics, "chat", llm_model_value, input_tokens, output_tokens + ) + def converse_tool_call( span_exporter, log_exporter, bedrock_runtime_client, expect_content @@ -452,6 +470,7 @@ def test_converse_tool_call_no_content( def test_converse_with_invalid_model( span_exporter, log_exporter, + metric_reader, bedrock_runtime_client, instrument_with_content, ): @@ -479,6 +498,11 @@ def test_converse_with_invalid_model( user_content = filter_message_keys(messages[0], ["content"]) assert_message_in_logs(logs[0], "gen_ai.user.message", user_content, span) + metrics = metric_reader.get_metrics_data().resource_metrics + assert_metrics( + metrics, "chat", llm_model_value, error_type="ValidationException" + ) + @pytest.mark.skipif( BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available" @@ -487,6 +511,7 @@ def test_converse_with_invalid_model( def test_converse_stream_with_content( span_exporter, log_exporter, + metric_reader, bedrock_runtime_client, instrument_with_content, ): @@ -553,6 +578,11 @@ def test_converse_stream_with_content( } assert_message_in_logs(logs[1], "gen_ai.choice", choice_body, span) + metrics = metric_reader.get_metrics_data().resource_metrics + assert_metrics( + metrics, "chat", llm_model_value, input_tokens, output_tokens + ) + @pytest.mark.skipif( BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available" @@ -561,6 +591,7 @@ def test_converse_stream_with_content( def test_converse_stream_with_content_different_events( span_exporter, log_exporter, + metric_reader, bedrock_runtime_client, instrument_with_content, ): @@ -614,6 +645,9 @@ def test_converse_stream_with_content_different_events( } assert_message_in_logs(logs[4], "gen_ai.choice", choice_body, span) + metrics = metric_reader.get_metrics_data().resource_metrics + assert_metrics(metrics, "chat", llm_model_value, mock.ANY, mock.ANY) + def _rebuild_stream_message(response): message = {"content": []} @@ -986,6 +1020,7 @@ def test_converse_stream_no_content_tool_call( def test_converse_stream_handles_event_stream_error( span_exporter, log_exporter, + metric_reader, bedrock_runtime_client, instrument_with_content, ): @@ -1039,6 +1074,11 @@ def test_converse_stream_handles_event_stream_error( user_content = filter_message_keys(messages[0], ["content"]) assert_message_in_logs(logs[0], "gen_ai.user.message", user_content, span) + metrics = metric_reader.get_metrics_data().resource_metrics + assert_metrics( + metrics, "chat", llm_model_value, error_type="EventStreamError" + ) + @pytest.mark.skipif( BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available"