Skip to content

fix(ollama): Implemented meter in the instrumentation #2741

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 11, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import logging
import os
import json
import time
from typing import Collection
from opentelemetry.instrumentation.ollama.config import Config
from opentelemetry.instrumentation.ollama.utils import dont_throw
from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api
from opentelemetry.trace import get_tracer, SpanKind
from opentelemetry.trace import get_tracer, SpanKind, Tracer
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.metrics import Histogram, Meter, get_meter

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
Expand All @@ -22,6 +24,7 @@
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY,
SpanAttributes,
LLMRequestTypeValues,
Meters
)
from opentelemetry.instrumentation.ollama.version import __version__

Expand Down Expand Up @@ -145,7 +148,7 @@ def _set_input_attributes(span, llm_request_type, kwargs):


@dont_throw
def _set_response_attributes(span, llm_request_type, response):
def _set_response_attributes(span, token_histogram, llm_request_type, response):
if should_send_prompts():
if llm_request_type == LLMRequestTypeValues.COMPLETION:
_set_span_attribute(
Expand Down Expand Up @@ -189,9 +192,42 @@ def _set_response_attributes(span, llm_request_type, response):
SpanAttributes.LLM_USAGE_PROMPT_TOKENS,
input_tokens,
)
_set_span_attribute(
span,
SpanAttributes.LLM_SYSTEM,
"Ollama"
)

if (
token_histogram is not None
and isinstance(input_tokens, int)
and input_tokens >= 0
):
token_histogram.record(
input_tokens,
attributes={
SpanAttributes.LLM_SYSTEM: "Ollama",
SpanAttributes.LLM_TOKEN_TYPE: "input",
SpanAttributes.LLM_RESPONSE_MODEL: response.get("model"),
},
)

if (
token_histogram is not None
and isinstance(output_tokens, int)
and output_tokens >= 0
):
token_histogram.record(
output_tokens,
attributes={
SpanAttributes.LLM_SYSTEM: "Ollama",
SpanAttributes.LLM_TOKEN_TYPE: "output",
SpanAttributes.LLM_RESPONSE_MODEL: response.get("model"),
},
)


def _accumulate_streaming_response(span, llm_request_type, response):
def _accumulate_streaming_response(span, token_histogram, llm_request_type, response):
if llm_request_type == LLMRequestTypeValues.CHAT:
accumulated_response = {"message": {"content": "", "role": ""}}
elif llm_request_type == LLMRequestTypeValues.COMPLETION:
Expand All @@ -206,11 +242,11 @@ def _accumulate_streaming_response(span, llm_request_type, response):
elif llm_request_type == LLMRequestTypeValues.COMPLETION:
accumulated_response["response"] += res["response"]

_set_response_attributes(span, llm_request_type, res | accumulated_response)
_set_response_attributes(span, token_histogram, llm_request_type, res | accumulated_response)
span.end()


async def _aaccumulate_streaming_response(span, llm_request_type, response):
async def _aaccumulate_streaming_response(span, token_histogram, llm_request_type, response):
if llm_request_type == LLMRequestTypeValues.CHAT:
accumulated_response = {"message": {"content": "", "role": ""}}
elif llm_request_type == LLMRequestTypeValues.COMPLETION:
Expand All @@ -225,16 +261,25 @@ async def _aaccumulate_streaming_response(span, llm_request_type, response):
elif llm_request_type == LLMRequestTypeValues.COMPLETION:
accumulated_response["response"] += res["response"]

_set_response_attributes(span, llm_request_type, res | accumulated_response)
_set_response_attributes(span, token_histogram, llm_request_type, res | accumulated_response)
span.end()


def _with_tracer_wrapper(func):
"""Helper for providing tracer for wrapper functions."""

def _with_tracer(tracer, to_wrap):
def _with_tracer(tracer, token_histogram, duration_histogram, to_wrap):
def wrapper(wrapped, instance, args, kwargs):
return func(tracer, to_wrap, wrapped, instance, args, kwargs)
return func(
tracer,
token_histogram,
duration_histogram,
to_wrap,
wrapped,
instance,
args,
kwargs,
)

return wrapper

Expand All @@ -253,7 +298,16 @@ def _llm_request_type_by_method(method_name):


@_with_tracer_wrapper
def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
def _wrap(
tracer: Tracer,
token_histogram: Histogram,
duration_histogram: Histogram,
to_wrap,
wrapped,
instance,
args,
kwargs,
):
"""Instruments and calls every function defined in TO_WRAP."""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value(
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY
Expand All @@ -273,22 +327,43 @@ def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
if span.is_recording():
_set_input_attributes(span, llm_request_type, kwargs)

start_time = time.perf_counter()
response = wrapped(*args, **kwargs)
end_time = time.perf_counter()

if response:
if duration_histogram:
duration = end_time - start_time
duration_histogram.record(
duration,
attributes={
SpanAttributes.LLM_SYSTEM: "Ollama",
SpanAttributes.LLM_RESPONSE_MODEL: kwargs.get("model"),
},
)

if span.is_recording():
if kwargs.get("stream"):
return _accumulate_streaming_response(span, llm_request_type, response)
return _accumulate_streaming_response(span, token_histogram, llm_request_type, response)

_set_response_attributes(span, llm_request_type, response)
_set_response_attributes(span, token_histogram, llm_request_type, response)
span.set_status(Status(StatusCode.OK))

span.end()
return response


@_with_tracer_wrapper
async def _awrap(tracer, to_wrap, wrapped, instance, args, kwargs):
async def _awrap(
tracer: Tracer,
token_histogram: Histogram,
duration_histogram: Histogram,
to_wrap,
wrapped,
instance,
args,
kwargs,
):
"""Instruments and calls every function defined in TO_WRAP."""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value(
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY
Expand All @@ -309,20 +384,51 @@ async def _awrap(tracer, to_wrap, wrapped, instance, args, kwargs):
if span.is_recording():
_set_input_attributes(span, llm_request_type, kwargs)

start_time = time.perf_counter()
response = await wrapped(*args, **kwargs)

end_time = time.perf_counter()
if response:
if duration_histogram:
duration = end_time - start_time
duration_histogram.record(
duration,
attributes={
SpanAttributes.LLM_SYSTEM: "Ollama",
SpanAttributes.LLM_RESPONSE_MODEL: kwargs.get("model"),
},
)

if span.is_recording():
if kwargs.get("stream"):
return _aaccumulate_streaming_response(span, llm_request_type, response)
return _aaccumulate_streaming_response(span, token_histogram, llm_request_type, response)

_set_response_attributes(span, llm_request_type, response)
_set_response_attributes(span, token_histogram, llm_request_type, response)
span.set_status(Status(StatusCode.OK))

span.end()
return response


def _build_metrics(meter: Meter):
token_histogram = meter.create_histogram(
name=Meters.LLM_TOKEN_USAGE,
unit="token",
description="Measures number of input and output tokens used",
)

duration_histogram = meter.create_histogram(
name=Meters.LLM_OPERATION_DURATION,
unit="s",
description="GenAI operation duration",
)

return token_histogram, duration_histogram


def is_metrics_collection_enabled() -> bool:
return (os.getenv("TRACELOOP_METRICS_ENABLED") or "true").lower() == "true"


class OllamaInstrumentor(BaseInstrumentor):
"""An instrumentor for Ollama's client library."""

Expand All @@ -336,22 +442,37 @@ def instrumentation_dependencies(self) -> Collection[str]:
def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)

meter_provider = kwargs.get("meter_provider")
meter = get_meter(__name__, __version__, meter_provider)

if is_metrics_collection_enabled():
(
token_histogram,
duration_histogram,
) = _build_metrics(meter)
else:
(
token_histogram,
duration_histogram,
) = (None, None)

for wrapped_method in WRAPPED_METHODS:
wrap_method = wrapped_method.get("method")
wrap_function_wrapper(
"ollama._client",
f"Client.{wrap_method}",
_wrap(tracer, wrapped_method),
_wrap(tracer, token_histogram, duration_histogram, wrapped_method),
)
wrap_function_wrapper(
"ollama._client",
f"AsyncClient.{wrap_method}",
_awrap(tracer, wrapped_method),
_awrap(tracer, token_histogram, duration_histogram, wrapped_method),
)
wrap_function_wrapper(
"ollama",
f"{wrap_method}",
_wrap(tracer, wrapped_method),
_wrap(tracer, token_histogram, duration_histogram, wrapped_method),
)

def _uninstrument(self, **kwargs):
Expand Down