Skip to content

Commit 55f569d

Browse files
authored
[chore] Create service/internal/obsconsumer package (#12817)
Subset of #12812 This internal package defines wrappers around consumers. These are useful for instrumenting the component graph, so that we can generate telemetry describing data as it is passed in between components. Currently, this supports only a single counter metric, but in the near future it can be enhanced to automatically capture multiple metrics (e.g. item count & size), and potentially spans and/or logs as well.
1 parent f01b1ea commit 55f569d

File tree

10 files changed

+1272
-0
lines changed

10 files changed

+1272
-0
lines changed

service/internal/obsconsumer/logs.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package obsconsumer // import "go.opentelemetry.io/collector/service/internal/obsconsumer"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/otel/metric"
10+
11+
"go.opentelemetry.io/collector/consumer"
12+
"go.opentelemetry.io/collector/pdata/plog"
13+
)
14+
15+
var _ consumer.Logs = logs{}
16+
17+
func NewLogs(consumer consumer.Logs, itemCounter metric.Int64Counter, opts ...Option) consumer.Logs {
18+
o := options{}
19+
for _, opt := range opts {
20+
opt.apply(&o)
21+
}
22+
return logs{
23+
consumer: consumer,
24+
itemCounter: itemCounter,
25+
compiledOptions: o.compile(),
26+
}
27+
}
28+
29+
type logs struct {
30+
consumer consumer.Logs
31+
itemCounter metric.Int64Counter
32+
compiledOptions
33+
}
34+
35+
func (c logs) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
36+
// Measure before calling ConsumeLogs because the data may be mutated downstream
37+
itemCount := ld.LogRecordCount()
38+
err := c.consumer.ConsumeLogs(ctx, ld)
39+
if err == nil {
40+
c.itemCounter.Add(ctx, int64(itemCount), c.withSuccessAttrs)
41+
} else {
42+
c.itemCounter.Add(ctx, int64(itemCount), c.withFailureAttrs)
43+
}
44+
return err
45+
}
46+
47+
func (c logs) Capabilities() consumer.Capabilities {
48+
return c.consumer.Capabilities()
49+
}
+251
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package obsconsumer_test
5+
6+
import (
7+
"context"
8+
"errors"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/otel/attribute"
14+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
15+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
16+
17+
"go.opentelemetry.io/collector/consumer"
18+
"go.opentelemetry.io/collector/pdata/plog"
19+
"go.opentelemetry.io/collector/service/internal/obsconsumer"
20+
)
21+
22+
type mockLogsConsumer struct {
23+
err error
24+
capabilities consumer.Capabilities
25+
}
26+
27+
func (m *mockLogsConsumer) ConsumeLogs(_ context.Context, _ plog.Logs) error {
28+
return m.err
29+
}
30+
31+
func (m *mockLogsConsumer) Capabilities() consumer.Capabilities {
32+
return m.capabilities
33+
}
34+
35+
func TestLogsConsumeSuccess(t *testing.T) {
36+
ctx := context.Background()
37+
mockConsumer := &mockLogsConsumer{}
38+
39+
reader := sdkmetric.NewManualReader()
40+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
41+
meter := mp.Meter("test")
42+
counter, err := meter.Int64Counter("test_counter")
43+
require.NoError(t, err)
44+
45+
consumer := obsconsumer.NewLogs(mockConsumer, counter)
46+
47+
ld := plog.NewLogs()
48+
r := ld.ResourceLogs().AppendEmpty()
49+
sl := r.ScopeLogs().AppendEmpty()
50+
sl.LogRecords().AppendEmpty()
51+
52+
err = consumer.ConsumeLogs(ctx, ld)
53+
require.NoError(t, err)
54+
55+
var rm metricdata.ResourceMetrics
56+
err = reader.Collect(ctx, &rm)
57+
require.NoError(t, err)
58+
require.Len(t, rm.ScopeMetrics, 1)
59+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
60+
61+
metric := rm.ScopeMetrics[0].Metrics[0]
62+
require.Equal(t, "test_counter", metric.Name)
63+
64+
data := metric.Data.(metricdata.Sum[int64])
65+
require.Len(t, data.DataPoints, 1)
66+
require.Equal(t, int64(1), data.DataPoints[0].Value)
67+
68+
attrs := data.DataPoints[0].Attributes
69+
require.Equal(t, 1, attrs.Len())
70+
val, ok := attrs.Value(attribute.Key("outcome"))
71+
require.True(t, ok)
72+
require.Equal(t, "success", val.Emit())
73+
}
74+
75+
func TestLogsConsumeFailure(t *testing.T) {
76+
ctx := context.Background()
77+
expectedErr := errors.New("test error")
78+
mockConsumer := &mockLogsConsumer{err: expectedErr}
79+
80+
reader := sdkmetric.NewManualReader()
81+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
82+
meter := mp.Meter("test")
83+
counter, err := meter.Int64Counter("test_counter")
84+
require.NoError(t, err)
85+
86+
consumer := obsconsumer.NewLogs(mockConsumer, counter)
87+
88+
ld := plog.NewLogs()
89+
r := ld.ResourceLogs().AppendEmpty()
90+
sl := r.ScopeLogs().AppendEmpty()
91+
sl.LogRecords().AppendEmpty()
92+
93+
err = consumer.ConsumeLogs(ctx, ld)
94+
assert.Equal(t, expectedErr, err)
95+
96+
var rm metricdata.ResourceMetrics
97+
err = reader.Collect(ctx, &rm)
98+
require.NoError(t, err)
99+
require.Len(t, rm.ScopeMetrics, 1)
100+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
101+
102+
metric := rm.ScopeMetrics[0].Metrics[0]
103+
require.Equal(t, "test_counter", metric.Name)
104+
105+
data := metric.Data.(metricdata.Sum[int64])
106+
require.Len(t, data.DataPoints, 1)
107+
require.Equal(t, int64(1), data.DataPoints[0].Value)
108+
109+
attrs := data.DataPoints[0].Attributes
110+
require.Equal(t, 1, attrs.Len())
111+
val, ok := attrs.Value(attribute.Key("outcome"))
112+
require.True(t, ok)
113+
require.Equal(t, "failure", val.Emit())
114+
}
115+
116+
func TestLogsWithStaticAttributes(t *testing.T) {
117+
ctx := context.Background()
118+
mockConsumer := &mockLogsConsumer{}
119+
120+
reader := sdkmetric.NewManualReader()
121+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
122+
meter := mp.Meter("test")
123+
counter, err := meter.Int64Counter("test_counter")
124+
require.NoError(t, err)
125+
126+
staticAttr := attribute.String("test", "value")
127+
consumer := obsconsumer.NewLogs(mockConsumer, counter, obsconsumer.WithStaticDataPointAttribute(staticAttr))
128+
129+
ld := plog.NewLogs()
130+
r := ld.ResourceLogs().AppendEmpty()
131+
sl := r.ScopeLogs().AppendEmpty()
132+
sl.LogRecords().AppendEmpty()
133+
134+
err = consumer.ConsumeLogs(ctx, ld)
135+
require.NoError(t, err)
136+
137+
var rm metricdata.ResourceMetrics
138+
err = reader.Collect(ctx, &rm)
139+
require.NoError(t, err)
140+
require.Len(t, rm.ScopeMetrics, 1)
141+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
142+
143+
metric := rm.ScopeMetrics[0].Metrics[0]
144+
require.Equal(t, "test_counter", metric.Name)
145+
146+
data := metric.Data.(metricdata.Sum[int64])
147+
require.Len(t, data.DataPoints, 1)
148+
require.Equal(t, int64(1), data.DataPoints[0].Value)
149+
150+
attrs := data.DataPoints[0].Attributes
151+
require.Equal(t, 2, attrs.Len())
152+
val, ok := attrs.Value(attribute.Key("test"))
153+
require.True(t, ok)
154+
require.Equal(t, "value", val.Emit())
155+
val, ok = attrs.Value(attribute.Key("outcome"))
156+
require.True(t, ok)
157+
require.Equal(t, "success", val.Emit())
158+
}
159+
160+
func TestLogsMultipleItemsMixedOutcomes(t *testing.T) {
161+
ctx := context.Background()
162+
expectedErr := errors.New("test error")
163+
mockConsumer := &mockLogsConsumer{}
164+
165+
reader := sdkmetric.NewManualReader()
166+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
167+
meter := mp.Meter("test")
168+
counter, err := meter.Int64Counter("test_counter")
169+
require.NoError(t, err)
170+
171+
consumer := obsconsumer.NewLogs(mockConsumer, counter)
172+
173+
// First batch: 2 successful items
174+
ld1 := plog.NewLogs()
175+
for range 2 {
176+
r := ld1.ResourceLogs().AppendEmpty()
177+
sl := r.ScopeLogs().AppendEmpty()
178+
sl.LogRecords().AppendEmpty()
179+
}
180+
err = consumer.ConsumeLogs(ctx, ld1)
181+
require.NoError(t, err)
182+
183+
// Second batch: 1 failed item
184+
mockConsumer.err = expectedErr
185+
ld2 := plog.NewLogs()
186+
r := ld2.ResourceLogs().AppendEmpty()
187+
sl := r.ScopeLogs().AppendEmpty()
188+
sl.LogRecords().AppendEmpty()
189+
err = consumer.ConsumeLogs(ctx, ld2)
190+
assert.Equal(t, expectedErr, err)
191+
192+
// Third batch: 2 successful items
193+
mockConsumer.err = nil
194+
ld3 := plog.NewLogs()
195+
for range 2 {
196+
r = ld3.ResourceLogs().AppendEmpty()
197+
sl = r.ScopeLogs().AppendEmpty()
198+
sl.LogRecords().AppendEmpty()
199+
}
200+
err = consumer.ConsumeLogs(ctx, ld3)
201+
require.NoError(t, err)
202+
203+
// Fourth batch: 1 failed item
204+
mockConsumer.err = expectedErr
205+
ld4 := plog.NewLogs()
206+
r = ld4.ResourceLogs().AppendEmpty()
207+
sl = r.ScopeLogs().AppendEmpty()
208+
sl.LogRecords().AppendEmpty()
209+
err = consumer.ConsumeLogs(ctx, ld4)
210+
assert.Equal(t, expectedErr, err)
211+
212+
var rm metricdata.ResourceMetrics
213+
err = reader.Collect(ctx, &rm)
214+
require.NoError(t, err)
215+
require.Len(t, rm.ScopeMetrics, 1)
216+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
217+
218+
metric := rm.ScopeMetrics[0].Metrics[0]
219+
require.Equal(t, "test_counter", metric.Name)
220+
221+
data := metric.Data.(metricdata.Sum[int64])
222+
require.Len(t, data.DataPoints, 2)
223+
224+
// Find success and failure data points
225+
var successDP, failureDP metricdata.DataPoint[int64]
226+
for _, dp := range data.DataPoints {
227+
val, ok := dp.Attributes.Value(attribute.Key("outcome"))
228+
if ok && val.Emit() == "success" {
229+
successDP = dp
230+
} else {
231+
failureDP = dp
232+
}
233+
}
234+
235+
require.Equal(t, int64(4), successDP.Value)
236+
require.Equal(t, int64(2), failureDP.Value)
237+
}
238+
239+
func TestLogsCapabilities(t *testing.T) {
240+
mockConsumer := &mockLogsConsumer{
241+
capabilities: consumer.Capabilities{MutatesData: true},
242+
}
243+
reader := sdkmetric.NewManualReader()
244+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
245+
meter := mp.Meter("test")
246+
counter, err := meter.Int64Counter("test_counter")
247+
require.NoError(t, err)
248+
249+
consumer := obsconsumer.NewLogs(mockConsumer, counter)
250+
require.Equal(t, consumer.Capabilities(), mockConsumer.capabilities)
251+
}
+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package obsconsumer // import "go.opentelemetry.io/collector/service/internal/obsconsumer"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/otel/metric"
10+
11+
"go.opentelemetry.io/collector/consumer"
12+
"go.opentelemetry.io/collector/pdata/pmetric"
13+
)
14+
15+
var _ consumer.Metrics = metrics{}
16+
17+
func NewMetrics(consumer consumer.Metrics, itemCounter metric.Int64Counter, opts ...Option) consumer.Metrics {
18+
o := options{}
19+
for _, opt := range opts {
20+
opt.apply(&o)
21+
}
22+
return metrics{
23+
consumer: consumer,
24+
itemCounter: itemCounter,
25+
compiledOptions: o.compile(),
26+
}
27+
}
28+
29+
type metrics struct {
30+
consumer consumer.Metrics
31+
itemCounter metric.Int64Counter
32+
compiledOptions
33+
}
34+
35+
func (c metrics) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
36+
// Measure before calling ConsumeMetrics because the data may be mutated downstream
37+
itemCount := md.DataPointCount()
38+
err := c.consumer.ConsumeMetrics(ctx, md)
39+
if err == nil {
40+
c.itemCounter.Add(ctx, int64(itemCount), c.withSuccessAttrs)
41+
} else {
42+
c.itemCounter.Add(ctx, int64(itemCount), c.withFailureAttrs)
43+
}
44+
return err
45+
}
46+
47+
func (c metrics) Capabilities() consumer.Capabilities {
48+
return c.consumer.Capabilities()
49+
}

0 commit comments

Comments
 (0)