Skip to content

Commit 17df302

Browse files
committed
[receiver/prometheusremotewrite] Parse labels into resource attributes
Signed-off-by: Arthur Silva Sens <[email protected]>
1 parent 64def58 commit 17df302

File tree

5 files changed

+189
-3
lines changed

5 files changed

+189
-3
lines changed
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receiver/prometheusremotewrite
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Parse labels from Prometheus Remote Write requests into Resource Attributes
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35656]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Warning - The HTTP Server still doesn't pass metrics to the next consumer. The component is unusable for now.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api, user]

receiver/prometheusremotewritereceiver/go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.22.0
55
require (
66
github.com/gogo/protobuf v1.3.2
77
github.com/golang/snappy v0.0.4
8+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0
89
github.com/prometheus/prometheus v0.54.1
910
github.com/stretchr/testify v1.9.0
1011
go.opentelemetry.io/collector/component v0.111.0
@@ -54,6 +55,7 @@ require (
5455
github.com/modern-go/reflect2 v1.0.2 // indirect
5556
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
5657
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
58+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0 // indirect
5759
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
5860
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
5961
github.com/prometheus/client_golang v1.19.1 // indirect

receiver/prometheusremotewritereceiver/go.sum

+6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/prometheusremotewritereceiver/receiver.go

+30-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/gogo/protobuf/proto"
1616
promconfig "github.com/prometheus/prometheus/config"
17+
"github.com/prometheus/prometheus/model/labels"
1718
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
1819
promremote "github.com/prometheus/prometheus/storage/remote"
1920
"go.opentelemetry.io/collector/component"
@@ -152,6 +153,33 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco
152153
// translateV2 translates a v2 remote-write request into OTLP metrics.
153154
// For now translateV2 is not implemented and returns an empty metrics.
154155
// nolint
155-
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
156-
return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil
156+
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
157+
var (
158+
badRequestErrors []error
159+
otelMetrics = pmetric.NewMetrics()
160+
b = labels.NewScratchBuilder(0)
161+
stats = promremote.WriteResponseStats{}
162+
)
163+
164+
resourceMetrics := otelMetrics.ResourceMetrics().AppendEmpty()
165+
166+
for _, ts := range req.Timeseries {
167+
ls := ts.ToLabels(&b, req.Symbols)
168+
169+
if !ls.Has(labels.MetricName) {
170+
badRequestErrors = append(badRequestErrors, fmt.Errorf("missing metric name in labels"))
171+
continue
172+
} else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate {
173+
badRequestErrors = append(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel))
174+
continue
175+
}
176+
177+
ls = ls.DropMetricName()
178+
for _, label := range ls {
179+
resourceMetrics.Resource().Attributes().PutStr(label.Name, label.Value)
180+
}
181+
182+
}
183+
184+
return otelMetrics, stats, errors.Join(badRequestErrors...)
157185
}

receiver/prometheusremotewritereceiver/receiver_test.go

+124-1
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,63 @@ import (
1313
"github.com/gogo/protobuf/proto"
1414
"github.com/golang/snappy"
1515
promconfig "github.com/prometheus/prometheus/config"
16+
"github.com/prometheus/prometheus/model/histogram"
1617
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
18+
"github.com/prometheus/prometheus/storage/remote"
1719
"github.com/stretchr/testify/assert"
1820
"go.opentelemetry.io/collector/component/componenttest"
1921
"go.opentelemetry.io/collector/consumer/consumertest"
22+
"go.opentelemetry.io/collector/pdata/pmetric"
2023
"go.opentelemetry.io/collector/receiver/receivertest"
24+
25+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
2126
)
2227

23-
func setupServer(t *testing.T) {
28+
var (
29+
testHistogram = histogram.Histogram{
30+
Schema: 2,
31+
ZeroThreshold: 1e-128,
32+
ZeroCount: 0,
33+
Count: 0,
34+
Sum: 20,
35+
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
36+
PositiveBuckets: []int64{1},
37+
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
38+
NegativeBuckets: []int64{-1},
39+
}
40+
41+
writeV2RequestFixture = &writev2.Request{
42+
Symbols: []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"},
43+
Timeseries: []writev2.TimeSeries{
44+
{
45+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
46+
Metadata: writev2.Metadata{
47+
Type: writev2.Metadata_METRIC_TYPE_GAUGE, // writeV2RequestSeries1Metadata.Type.
48+
49+
HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
50+
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
51+
},
52+
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
53+
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}},
54+
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
55+
},
56+
{
57+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first.
58+
Metadata: writev2.Metadata{
59+
Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries2Metadata.Type.
60+
61+
HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help.
62+
// No unit.
63+
},
64+
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
65+
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}},
66+
Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
67+
},
68+
},
69+
}
70+
)
71+
72+
func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver {
2473
t.Helper()
2574

2675
factory := NewFactory()
@@ -30,6 +79,13 @@ func setupServer(t *testing.T) {
3079
assert.NoError(t, err)
3180
assert.NotNil(t, prwReceiver, "metrics receiver creation failed")
3281

82+
return prwReceiver.(*prometheusRemoteWriteReceiver)
83+
}
84+
85+
func setupServer(t *testing.T) {
86+
t.Helper()
87+
88+
prwReceiver := setupMetricsReceiver(t)
3389
ctx, cancel := context.WithCancel(context.Background())
3490
t.Cleanup(cancel)
3591

@@ -98,3 +154,70 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) {
98154
})
99155
}
100156
}
157+
158+
func TestTranslateV2(t *testing.T) {
159+
prwReceiver := setupMetricsReceiver(t)
160+
ctx, cancel := context.WithCancel(context.Background())
161+
t.Cleanup(cancel)
162+
163+
for _, tc := range []struct {
164+
name string
165+
request *writev2.Request
166+
expectError string
167+
expectedMetrics pmetric.Metrics
168+
expectedStats remote.WriteResponseStats
169+
}{
170+
{
171+
name: "missing metric name",
172+
request: &writev2.Request{
173+
Symbols: []string{"", "foo", "bar"},
174+
Timeseries: []writev2.TimeSeries{
175+
{
176+
LabelsRefs: []uint32{1, 2},
177+
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
178+
},
179+
},
180+
},
181+
expectError: "missing metric name in labels",
182+
},
183+
{
184+
name: "duplicate label",
185+
request: &writev2.Request{
186+
Symbols: []string{"", "__name__", "test"},
187+
Timeseries: []writev2.TimeSeries{
188+
{
189+
LabelsRefs: []uint32{1, 2, 1, 2},
190+
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
191+
},
192+
},
193+
},
194+
expectError: `duplicate label "__name__" in labels`,
195+
},
196+
{
197+
name: "valid request",
198+
request: writeV2RequestFixture,
199+
expectedMetrics: func() pmetric.Metrics {
200+
expected := pmetric.NewMetrics()
201+
rmAttributes := expected.ResourceMetrics().AppendEmpty().Resource().Attributes()
202+
rmAttributes.PutStr("b", "c")
203+
rmAttributes.PutStr("baz", "qux")
204+
rmAttributes.PutStr("d", "e")
205+
rmAttributes.PutStr("foo", "bar")
206+
return expected
207+
}(),
208+
expectedStats: remote.WriteResponseStats{},
209+
},
210+
} {
211+
t.Run(tc.name, func(t *testing.T) {
212+
metrics, stats, err := prwReceiver.translateV2(ctx, tc.request)
213+
if tc.expectError != "" {
214+
assert.ErrorContains(t, err, tc.expectError)
215+
return
216+
}
217+
218+
assert.NoError(t, err)
219+
assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics))
220+
assert.Equal(t, tc.expectedStats, stats)
221+
})
222+
}
223+
}

0 commit comments

Comments
 (0)