Skip to content

Remove the deprecated StreamServerInterceptor function from otelgrpc #7107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
4 tasks
MrAlias opened this issue Apr 2, 2025 · 1 comment · May be fixed by #7112
Open
4 tasks

Remove the deprecated StreamServerInterceptor function from otelgrpc #7107

MrAlias opened this issue Apr 2, 2025 · 1 comment · May be fixed by #7112

Comments

@MrAlias
Copy link
Contributor

MrAlias commented Apr 2, 2025

This function was deprecated in the last year and should be removed.

  • Remove the deprecated UnaryServerInterceptor function:
    // StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
    // for use in a grpc.NewServer call.
    //
    // Deprecated: Use [NewServerHandler] instead.
    func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
    cfg := newConfig(opts, "server")
    tracer := cfg.TracerProvider.Tracer(
    ScopeName,
    trace.WithInstrumentationVersion(Version()),
    )
    return func(
    srv interface{},
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler,
    ) error {
    ctx := ss.Context()
    i := &InterceptorInfo{
    StreamServerInfo: info,
    Type: StreamServer,
    }
    if cfg.InterceptorFilter != nil && !cfg.InterceptorFilter(i) {
    return handler(srv, wrapServerStream(ctx, ss, cfg))
    }
    ctx = extract(ctx, cfg.Propagators)
    name, attr, _ := telemetryAttributes(info.FullMethod, peerFromCtx(ctx))
    startOpts := append([]trace.SpanStartOption{
    trace.WithSpanKind(trace.SpanKindServer),
    trace.WithAttributes(attr...),
    },
    cfg.SpanStartOptions...,
    )
    ctx, span := tracer.Start(
    trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
    name,
    startOpts...,
    )
    defer span.End()
    err := handler(srv, wrapServerStream(ctx, ss, cfg))
    if err != nil {
    s, _ := status.FromError(err)
    statusCode, msg := serverStatus(s)
    span.SetStatus(statusCode, msg)
    span.SetAttributes(statusCodeAttr(s.Code()))
    } else {
    span.SetAttributes(statusCodeAttr(grpc_codes.OK))
    }
    return err
    }
    }
  • Remove the benchmark test:
    func BenchmarkStreamServerInterceptor(b *testing.B) {
    benchmark(b, nil, []grpc.ServerOption{
    grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor(
    otelgrpc.WithTracerProvider(tracerProvider),
    )),
    })
    }
  • Remove tests:
    • // TestStreamServerInterceptor tests the server interceptor for streaming RPCs.
      func TestStreamServerInterceptor(t *testing.T) {
      for _, check := range serverChecks {
      name := check.grpcCode.String()
      t.Run(name, func(t *testing.T) {
      sr := tracetest.NewSpanRecorder()
      tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
      //nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
      usi := otelgrpc.StreamServerInterceptor(
      otelgrpc.WithTracerProvider(tp),
      )
      // call the stream interceptor
      grpcErr := status.Error(check.grpcCode, check.grpcCode.String())
      handler := func(_ interface{}, _ grpc.ServerStream) error {
      return grpcErr
      }
      err := usi(&grpc_testing.SimpleRequest{}, &mockServerStream{}, &grpc.StreamServerInfo{FullMethod: name}, handler)
      assert.Equal(t, grpcErr, err)
      // validate span
      span, ok := getSpanFromRecorder(sr, name)
      require.True(t, ok, "missing span %s", name)
      assertServerSpan(t, check.wantSpanCode, check.wantSpanStatusDescription, check.grpcCode, span)
      })
      }
      }
      func TestStreamServerInterceptorEvents(t *testing.T) {
      testCases := []struct {
      Name string
      Events []otelgrpc.Event
      }{
      {Name: "With events", Events: []otelgrpc.Event{otelgrpc.ReceivedEvents, otelgrpc.SentEvents}},
      {Name: "With only sent events", Events: []otelgrpc.Event{otelgrpc.SentEvents}},
      {Name: "With only received events", Events: []otelgrpc.Event{otelgrpc.ReceivedEvents}},
      {Name: "No events", Events: []otelgrpc.Event{}},
      }
      for _, testCase := range testCases {
      t.Run(testCase.Name, func(t *testing.T) {
      sr := tracetest.NewSpanRecorder()
      tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
      opts := []otelgrpc.Option{
      otelgrpc.WithTracerProvider(tp),
      }
      if len(testCase.Events) > 0 {
      opts = append(opts, otelgrpc.WithMessageEvents(testCase.Events...))
      }
      //nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
      usi := otelgrpc.StreamServerInterceptor(opts...)
      stream := &mockServerStream{}
      grpcCode := grpc_codes.OK
      name := grpcCode.String()
      // call the stream interceptor
      grpcErr := status.Error(grpcCode, name)
      handler := func(_ interface{}, handlerStream grpc.ServerStream) error {
      var msg grpc_testing.SimpleRequest
      err := handlerStream.RecvMsg(&msg)
      require.NoError(t, err)
      err = handlerStream.SendMsg(&msg)
      require.NoError(t, err)
      return grpcErr
      }
      err := usi(&grpc_testing.SimpleRequest{}, stream, &grpc.StreamServerInfo{FullMethod: name}, handler)
      require.Equal(t, grpcErr, err)
      // validate span
      span, ok := getSpanFromRecorder(sr, name)
      require.True(t, ok, "missing span %s", name)
      if len(testCase.Events) == 0 {
      assert.Empty(t, span.Events())
      } else {
      var eventsAttr []map[attribute.Key]attribute.Value
      for _, event := range testCase.Events {
      switch event {
      case otelgrpc.SentEvents:
      eventsAttr = append(eventsAttr, map[attribute.Key]attribute.Value{
      otelgrpc.RPCMessageTypeKey: attribute.StringValue("SENT"),
      otelgrpc.RPCMessageIDKey: attribute.IntValue(1),
      })
      case otelgrpc.ReceivedEvents:
      eventsAttr = append(eventsAttr, map[attribute.Key]attribute.Value{
      otelgrpc.RPCMessageTypeKey: attribute.StringValue("RECEIVED"),
      otelgrpc.RPCMessageIDKey: attribute.IntValue(1),
      })
      }
      }
      assert.Len(t, span.Events(), len(eventsAttr))
      assert.Equal(t, eventsAttr, eventAttrMap(span.Events()))
      }
      })
      }
      }
      func assertServerMetrics(t *testing.T, reader metric.Reader, serviceName, name string, code grpc_codes.Code) {
      want := metricdata.ScopeMetrics{
      Scope: wantInstrumentationScope,
      Metrics: []metricdata.Metrics{
      {
      Name: "rpc.server.duration",
      Description: "Measures the duration of inbound RPC.",
      Unit: "ms",
      Data: metricdata.Histogram[float64]{
      Temporality: metricdata.CumulativeTemporality,
      DataPoints: []metricdata.HistogramDataPoint[float64]{
      {
      Attributes: attribute.NewSet(
      semconv.RPCMethod(name),
      semconv.RPCService(serviceName),
      otelgrpc.RPCSystemGRPC,
      otelgrpc.GRPCStatusCodeKey.Int64(int64(code)),
      ),
      },
      },
      },
      },
      },
      }
      rm := metricdata.ResourceMetrics{}
      err := reader.Collect(context.Background(), &rm)
      assert.NoError(t, err)
      require.Len(t, rm.ScopeMetrics, 1)
      metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
      }
    • //nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
      grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor(
      otelgrpc.WithTracerProvider(serverStreamTP),
      otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
      )),
    • t.Run("StreamServerSpans", func(t *testing.T) {
      checkStreamServerSpans(t, serverStreamSR.Ended())
      })
    • func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
      require.Len(t, spans, 3)
      streamInput := spans[0]
      assert.False(t, streamInput.EndTime().IsZero())
      assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name())
      // sizes from reqSizes in "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/test".
      assertEvents(t, []trace.Event{
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(2),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(3),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(4),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      }, streamInput.Events())
      port, ok := findAttribute(streamInput.Attributes(), semconv.NetSockPeerPortKey)
      assert.True(t, ok)
      assert.ElementsMatch(t, []attribute.KeyValue{
      semconv.RPCMethod("StreamingInputCall"),
      semconv.RPCService("grpc.testing.TestService"),
      otelgrpc.RPCSystemGRPC,
      otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
      semconv.NetSockPeerAddr("127.0.0.1"),
      port,
      }, streamInput.Attributes())
      streamOutput := spans[1]
      assert.False(t, streamOutput.EndTime().IsZero())
      assert.Equal(t, "grpc.testing.TestService/StreamingOutputCall", streamOutput.Name())
      // sizes from respSizes in "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/test".
      assertEvents(t, []trace.Event{
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(2),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(3),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(4),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      }, streamOutput.Events())
      port, ok = findAttribute(streamOutput.Attributes(), semconv.NetSockPeerPortKey)
      assert.True(t, ok)
      assert.ElementsMatch(t, []attribute.KeyValue{
      semconv.RPCMethod("StreamingOutputCall"),
      semconv.RPCService("grpc.testing.TestService"),
      otelgrpc.RPCSystemGRPC,
      otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
      semconv.NetSockPeerAddr("127.0.0.1"),
      port,
      }, streamOutput.Attributes())
      pingPong := spans[2]
      assert.False(t, pingPong.EndTime().IsZero())
      assert.Equal(t, "grpc.testing.TestService/FullDuplexCall", pingPong.Name())
      assertEvents(t, []trace.Event{
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(2),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(2),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(3),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(3),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(4),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(4),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      }, pingPong.Events())
      port, ok = findAttribute(pingPong.Attributes(), semconv.NetSockPeerPortKey)
      assert.True(t, ok)
      assert.ElementsMatch(t, []attribute.KeyValue{
      semconv.RPCMethod("FullDuplexCall"),
      semconv.RPCService("grpc.testing.TestService"),
      otelgrpc.RPCSystemGRPC,
      otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
      semconv.NetSockPeerAddr("127.0.0.1"),
      port,
      }, pingPong.Attributes())
      }
  • Add a changelog entry in the ### Removed section of the unreleased changelog for the removal.
@tasdikrahman
Copy link

Would like to work on this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants