Skip to content

Commit f3cd422

Browse files
authored
Add a notion of Request to otlp to decouple data from protocol specific metadata (open-telemetry#4050)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent bcbaeaf commit f3cd422

File tree

14 files changed

+131
-67
lines changed

14 files changed

+131
-67
lines changed

config/configgrpc/configgrpc_test.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
"go.opentelemetry.io/collector/config/confignet"
3535
"go.opentelemetry.io/collector/config/configtls"
3636
"go.opentelemetry.io/collector/model/otlpgrpc"
37-
"go.opentelemetry.io/collector/model/pdata"
3837
)
3938

4039
func TestDefaultGrpcClientSettings(t *testing.T) {
@@ -481,7 +480,7 @@ func TestHttpReception(t *testing.T) {
481480
assert.NoError(t, errDial)
482481
client := otlpgrpc.NewTracesClient(grpcClientConn)
483482
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
484-
resp, errResp := client.Export(ctx, pdata.NewTraces(), grpc.WaitForReady(true))
483+
resp, errResp := client.Export(ctx, otlpgrpc.NewTracesRequest(), grpc.WaitForReady(true))
485484
if tt.hasError {
486485
assert.Error(t, errResp)
487486
} else {
@@ -528,7 +527,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
528527
assert.NoError(t, errDial)
529528
client := otlpgrpc.NewTracesClient(grpcClientConn)
530529
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
531-
resp, errResp := client.Export(ctx, pdata.NewTraces(), grpc.WaitForReady(true))
530+
resp, errResp := client.Export(ctx, otlpgrpc.NewTracesRequest(), grpc.WaitForReady(true))
532531
assert.NoError(t, errResp)
533532
assert.NotNil(t, resp)
534533
cancelFunc()
@@ -537,7 +536,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
537536

538537
type grpcTraceServer struct{}
539538

540-
func (gts *grpcTraceServer) Export(context.Context, pdata.Traces) (otlpgrpc.TracesResponse, error) {
539+
func (gts *grpcTraceServer) Export(context.Context, otlpgrpc.TracesRequest) (otlpgrpc.TracesResponse, error) {
541540
return otlpgrpc.NewTracesResponse(), nil
542541
}
543542

exporter/otlpexporter/otlp.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,23 @@ func (gs *grpcSender) stop() error {
123123
}
124124

125125
func (gs *grpcSender) exportTrace(ctx context.Context, td pdata.Traces) error {
126-
_, err := gs.traceExporter.Export(gs.enhanceContext(ctx), td, gs.callOptions...)
126+
req := otlpgrpc.NewTracesRequest()
127+
req.SetTraces(td)
128+
_, err := gs.traceExporter.Export(gs.enhanceContext(ctx), req, gs.callOptions...)
127129
return processError(err)
128130
}
129131

130132
func (gs *grpcSender) exportMetrics(ctx context.Context, md pdata.Metrics) error {
131-
_, err := gs.metricExporter.Export(gs.enhanceContext(ctx), md, gs.callOptions...)
133+
req := otlpgrpc.NewMetricsRequest()
134+
req.SetMetrics(md)
135+
_, err := gs.metricExporter.Export(gs.enhanceContext(ctx), req, gs.callOptions...)
132136
return processError(err)
133137
}
134138

135139
func (gs *grpcSender) exportLogs(ctx context.Context, ld pdata.Logs) error {
136-
_, err := gs.logExporter.Export(gs.enhanceContext(ctx), ld, gs.callOptions...)
140+
req := otlpgrpc.NewLogsRequest()
141+
req.SetLogs(ld)
142+
_, err := gs.logExporter.Export(gs.enhanceContext(ctx), req, gs.callOptions...)
137143
return processError(err)
138144
}
139145

exporter/otlpexporter/otlp_test.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ type mockTracesReceiver struct {
5858
lastRequest pdata.Traces
5959
}
6060

61-
func (r *mockTracesReceiver) Export(ctx context.Context, td pdata.Traces) (otlpgrpc.TracesResponse, error) {
61+
func (r *mockTracesReceiver) Export(ctx context.Context, req otlpgrpc.TracesRequest) (otlpgrpc.TracesResponse, error) {
6262
atomic.AddInt32(&r.requestCount, 1)
63+
td := req.Traces()
6364
atomic.AddInt32(&r.totalItems, int32(td.SpanCount()))
6465
r.mux.Lock()
6566
defer r.mux.Unlock()
@@ -110,8 +111,9 @@ type mockLogsReceiver struct {
110111
lastRequest pdata.Logs
111112
}
112113

113-
func (r *mockLogsReceiver) Export(ctx context.Context, ld pdata.Logs) (otlpgrpc.LogsResponse, error) {
114+
func (r *mockLogsReceiver) Export(ctx context.Context, req otlpgrpc.LogsRequest) (otlpgrpc.LogsResponse, error) {
114115
atomic.AddInt32(&r.requestCount, 1)
116+
ld := req.Logs()
115117
atomic.AddInt32(&r.totalItems, int32(ld.LogRecordCount()))
116118
r.mux.Lock()
117119
defer r.mux.Unlock()
@@ -147,7 +149,8 @@ type mockMetricsReceiver struct {
147149
lastRequest pdata.Metrics
148150
}
149151

150-
func (r *mockMetricsReceiver) Export(ctx context.Context, md pdata.Metrics) (otlpgrpc.MetricsResponse, error) {
152+
func (r *mockMetricsReceiver) Export(ctx context.Context, req otlpgrpc.MetricsRequest) (otlpgrpc.MetricsResponse, error) {
153+
md := req.Metrics()
151154
atomic.AddInt32(&r.requestCount, 1)
152155
atomic.AddInt32(&r.totalItems, int32(md.DataPointCount()))
153156
r.mux.Lock()

model/otlpgrpc/logs.go

+23-5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,24 @@ func NewLogsResponse() LogsResponse {
3636
return LogsResponse{orig: &otlpcollectorlog.ExportLogsServiceResponse{}}
3737
}
3838

39+
// LogsRequest represents the response for gRPC client/server.
40+
type LogsRequest struct {
41+
orig *otlpcollectorlog.ExportLogsServiceRequest
42+
}
43+
44+
// NewLogsRequest returns an empty LogsRequest.
45+
func NewLogsRequest() LogsRequest {
46+
return LogsRequest{orig: &otlpcollectorlog.ExportLogsServiceRequest{}}
47+
}
48+
49+
func (lr LogsRequest) SetLogs(ld pdata.Logs) {
50+
lr.orig.ResourceLogs = internal.LogsToOtlp(ld.InternalRep()).ResourceLogs
51+
}
52+
53+
func (lr LogsRequest) Logs() pdata.Logs {
54+
return pdata.LogsFromInternalRep(internal.LogsFromOtlp(lr.orig))
55+
}
56+
3957
// LogsClient is the client API for OTLP-GRPC Logs service.
4058
//
4159
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
@@ -44,7 +62,7 @@ type LogsClient interface {
4462
//
4563
// For performance reasons, it is recommended to keep this RPC
4664
// alive for the entire life of the application.
47-
Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (LogsResponse, error)
65+
Export(ctx context.Context, request LogsRequest, opts ...grpc.CallOption) (LogsResponse, error)
4866
}
4967

5068
type logsClient struct {
@@ -56,8 +74,8 @@ func NewLogsClient(cc *grpc.ClientConn) LogsClient {
5674
return &logsClient{rawClient: otlpcollectorlog.NewLogsServiceClient(cc)}
5775
}
5876

59-
func (c *logsClient) Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (LogsResponse, error) {
60-
rsp, err := c.rawClient.Export(ctx, internal.LogsToOtlp(in.InternalRep()), opts...)
77+
func (c *logsClient) Export(ctx context.Context, request LogsRequest, opts ...grpc.CallOption) (LogsResponse, error) {
78+
rsp, err := c.rawClient.Export(ctx, request.orig, opts...)
6179
return LogsResponse{orig: rsp}, err
6280
}
6381

@@ -67,7 +85,7 @@ type LogsServer interface {
6785
//
6886
// For performance reasons, it is recommended to keep this RPC
6987
// alive for the entire life of the application.
70-
Export(context.Context, pdata.Logs) (LogsResponse, error)
88+
Export(context.Context, LogsRequest) (LogsResponse, error)
7189
}
7290

7391
// RegisterLogsServer registers the LogsServer to the grpc.Server.
@@ -80,6 +98,6 @@ type rawLogsServer struct {
8098
}
8199

82100
func (s rawLogsServer) Export(ctx context.Context, request *otlpcollectorlog.ExportLogsServiceRequest) (*otlpcollectorlog.ExportLogsServiceResponse, error) {
83-
rsp, err := s.srv.Export(ctx, pdata.LogsFromInternalRep(internal.LogsFromOtlp(request)))
101+
rsp, err := s.srv.Export(ctx, LogsRequest{orig: request})
84102
return rsp.orig, err
85103
}

model/otlpgrpc/metrics.go

+23-5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,24 @@ func NewMetricsResponse() MetricsResponse {
3636
return MetricsResponse{orig: &otlpcollectormetrics.ExportMetricsServiceResponse{}}
3737
}
3838

39+
// MetricsRequest represents the response for gRPC client/server.
40+
type MetricsRequest struct {
41+
orig *otlpcollectormetrics.ExportMetricsServiceRequest
42+
}
43+
44+
// NewMetricsRequest returns an empty MetricsRequest.
45+
func NewMetricsRequest() MetricsRequest {
46+
return MetricsRequest{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{}}
47+
}
48+
49+
func (lr MetricsRequest) SetMetrics(ld pdata.Metrics) {
50+
lr.orig.ResourceMetrics = internal.MetricsToOtlp(ld.InternalRep()).ResourceMetrics
51+
}
52+
53+
func (lr MetricsRequest) Metrics() pdata.Metrics {
54+
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(lr.orig))
55+
}
56+
3957
// MetricsClient is the client API for OTLP-GRPC Metrics service.
4058
//
4159
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
@@ -44,7 +62,7 @@ type MetricsClient interface {
4462
//
4563
// For performance reasons, it is recommended to keep this RPC
4664
// alive for the entire life of the application.
47-
Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (MetricsResponse, error)
65+
Export(ctx context.Context, request MetricsRequest, opts ...grpc.CallOption) (MetricsResponse, error)
4866
}
4967

5068
type metricsClient struct {
@@ -56,8 +74,8 @@ func NewMetricsClient(cc *grpc.ClientConn) MetricsClient {
5674
return &metricsClient{rawClient: otlpcollectormetrics.NewMetricsServiceClient(cc)}
5775
}
5876

59-
func (c *metricsClient) Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (MetricsResponse, error) {
60-
rsp, err := c.rawClient.Export(ctx, internal.MetricsToOtlp(in.InternalRep()), opts...)
77+
func (c *metricsClient) Export(ctx context.Context, request MetricsRequest, opts ...grpc.CallOption) (MetricsResponse, error) {
78+
rsp, err := c.rawClient.Export(ctx, request.orig, opts...)
6179
return MetricsResponse{orig: rsp}, err
6280
}
6381

@@ -67,7 +85,7 @@ type MetricsServer interface {
6785
//
6886
// For performance reasons, it is recommended to keep this RPC
6987
// alive for the entire life of the application.
70-
Export(context.Context, pdata.Metrics) (MetricsResponse, error)
88+
Export(context.Context, MetricsRequest) (MetricsResponse, error)
7189
}
7290

7391
// RegisterMetricsServer registers the MetricsServer to the grpc.Server.
@@ -80,6 +98,6 @@ type rawMetricsServer struct {
8098
}
8199

82100
func (s rawMetricsServer) Export(ctx context.Context, request *otlpcollectormetrics.ExportMetricsServiceRequest) (*otlpcollectormetrics.ExportMetricsServiceResponse, error) {
83-
rsp, err := s.srv.Export(ctx, pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(request)))
101+
rsp, err := s.srv.Export(ctx, MetricsRequest{orig: request})
84102
return rsp.orig, err
85103
}

model/otlpgrpc/traces.go

+23-5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,24 @@ func NewTracesResponse() TracesResponse {
3636
return TracesResponse{orig: &otlpcollectortrace.ExportTraceServiceResponse{}}
3737
}
3838

39+
// TracesRequest represents the response for gRPC client/server.
40+
type TracesRequest struct {
41+
orig *otlpcollectortrace.ExportTraceServiceRequest
42+
}
43+
44+
// NewTracesRequest returns an empty TracesRequest.
45+
func NewTracesRequest() TracesRequest {
46+
return TracesRequest{orig: &otlpcollectortrace.ExportTraceServiceRequest{}}
47+
}
48+
49+
func (lr TracesRequest) SetTraces(ld pdata.Traces) {
50+
lr.orig.ResourceSpans = internal.TracesToOtlp(ld.InternalRep()).ResourceSpans
51+
}
52+
53+
func (lr TracesRequest) Traces() pdata.Traces {
54+
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(lr.orig))
55+
}
56+
3957
// TracesClient is the client API for OTLP-GRPC Traces service.
4058
//
4159
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
@@ -44,7 +62,7 @@ type TracesClient interface {
4462
//
4563
// For performance reasons, it is recommended to keep this RPC
4664
// alive for the entire life of the application.
47-
Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (TracesResponse, error)
65+
Export(ctx context.Context, request TracesRequest, opts ...grpc.CallOption) (TracesResponse, error)
4866
}
4967

5068
type tracesClient struct {
@@ -57,8 +75,8 @@ func NewTracesClient(cc *grpc.ClientConn) TracesClient {
5775
}
5876

5977
// Export implements the TracesClient interface.
60-
func (c *tracesClient) Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (TracesResponse, error) {
61-
rsp, err := c.rawClient.Export(ctx, internal.TracesToOtlp(in.InternalRep()), opts...)
78+
func (c *tracesClient) Export(ctx context.Context, request TracesRequest, opts ...grpc.CallOption) (TracesResponse, error) {
79+
rsp, err := c.rawClient.Export(ctx, request.orig, opts...)
6280
return TracesResponse{orig: rsp}, err
6381
}
6482

@@ -68,7 +86,7 @@ type TracesServer interface {
6886
//
6987
// For performance reasons, it is recommended to keep this RPC
7088
// alive for the entire life of the application.
71-
Export(context.Context, pdata.Traces) (TracesResponse, error)
89+
Export(context.Context, TracesRequest) (TracesResponse, error)
7290
}
7391

7492
// RegisterTracesServer registers the TracesServer to the grpc.Server.
@@ -81,6 +99,6 @@ type rawTracesServer struct {
8199
}
82100

83101
func (s rawTracesServer) Export(ctx context.Context, request *otlpcollectortrace.ExportTraceServiceRequest) (*otlpcollectortrace.ExportTraceServiceResponse, error) {
84-
rsp, err := s.srv.Export(ctx, pdata.TracesFromInternalRep(internal.TracesFromOtlp(request)))
102+
rsp, err := s.srv.Export(ctx, TracesRequest{orig: request})
85103
return rsp.orig, err
86104
}

receiver/otlpreceiver/internal/logs/otlp.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"go.opentelemetry.io/collector/config"
2222
"go.opentelemetry.io/collector/consumer"
2323
"go.opentelemetry.io/collector/model/otlpgrpc"
24-
"go.opentelemetry.io/collector/model/pdata"
2524
"go.opentelemetry.io/collector/obsreport"
2625
)
2726

@@ -45,7 +44,8 @@ func New(id config.ComponentID, nextConsumer consumer.Logs) *Receiver {
4544
}
4645

4746
// Export implements the service Export logs func.
48-
func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (otlpgrpc.LogsResponse, error) {
47+
func (r *Receiver) Export(ctx context.Context, req otlpgrpc.LogsRequest) (otlpgrpc.LogsResponse, error) {
48+
ld := req.Logs()
4949
numSpans := ld.LogRecordCount()
5050
if numSpans == 0 {
5151
return otlpgrpc.NewLogsResponse(), nil

receiver/otlpreceiver/internal/logs/otlp_test.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,9 @@ import (
2929
"go.opentelemetry.io/collector/consumer/consumertest"
3030
"go.opentelemetry.io/collector/internal/testdata"
3131
"go.opentelemetry.io/collector/model/otlpgrpc"
32-
"go.opentelemetry.io/collector/model/pdata"
3332
)
3433

3534
func TestExport(t *testing.T) {
36-
// given
37-
3835
logSink := new(consumertest.LogsSink)
3936

4037
addr, doneFn := otlpReceiverOnGRPCServer(t, logSink)
@@ -44,10 +41,12 @@ func TestExport(t *testing.T) {
4441
require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err)
4542
defer traceClientDoneFn()
4643

47-
req := testdata.GenerateLogsOneLogRecord()
44+
ld := testdata.GenerateLogsOneLogRecord()
4845
// Keep log data to compare the test result against it
4946
// Clone needed because OTLP proto XXX_ fields are altered in the GRPC downstream
50-
logData := req.Clone()
47+
logData := ld.Clone()
48+
req := otlpgrpc.NewLogsRequest()
49+
req.SetLogs(ld)
5150

5251
resp, err := traceClient.Export(context.Background(), req)
5352
require.NoError(t, err, "Failed to export trace: %v", err)
@@ -68,7 +67,7 @@ func TestExport_EmptyRequest(t *testing.T) {
6867
require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err)
6968
defer logClientDoneFn()
7069

71-
resp, err := logClient.Export(context.Background(), pdata.NewLogs())
70+
resp, err := logClient.Export(context.Background(), otlpgrpc.NewLogsRequest())
7271
assert.NoError(t, err, "Failed to export trace: %v", err)
7372
assert.NotNil(t, resp, "The response is missing")
7473
}
@@ -81,7 +80,9 @@ func TestExport_ErrorConsumer(t *testing.T) {
8180
require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err)
8281
defer logClientDoneFn()
8382

84-
req := testdata.GenerateLogsOneLogRecord()
83+
ld := testdata.GenerateLogsOneLogRecord()
84+
req := otlpgrpc.NewLogsRequest()
85+
req.SetLogs(ld)
8586

8687
resp, err := logClient.Export(context.Background(), req)
8788
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")

receiver/otlpreceiver/internal/metrics/otlp.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"go.opentelemetry.io/collector/config"
2222
"go.opentelemetry.io/collector/consumer"
2323
"go.opentelemetry.io/collector/model/otlpgrpc"
24-
"go.opentelemetry.io/collector/model/pdata"
2524
"go.opentelemetry.io/collector/obsreport"
2625
)
2726

@@ -45,7 +44,8 @@ func New(id config.ComponentID, nextConsumer consumer.Metrics) *Receiver {
4544
}
4645

4746
// Export implements the service Export metrics func.
48-
func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (otlpgrpc.MetricsResponse, error) {
47+
func (r *Receiver) Export(ctx context.Context, req otlpgrpc.MetricsRequest) (otlpgrpc.MetricsResponse, error) {
48+
md := req.Metrics()
4949
dataPointCount := md.DataPointCount()
5050
if dataPointCount == 0 {
5151
return otlpgrpc.NewMetricsResponse(), nil

0 commit comments

Comments
 (0)