Skip to content

Commit eaa3fd4

Browse files
ArthurSenssbylica-splunk
authored andcommitted
[receiver/prometheusremotewrite] Parse labels (open-telemetry#35656)
#### Description This PR builds on top of open-telemetry#35535, open-telemetry#35565 and open-telemetry#35624. Here we're parsing labels into resource/metric attributes. It's still not great because resource attributes (with exception to `service.namespace`, `service.name` and `service.name.id`) are encoded into a special metric called `target_info`. Metrics related to specific target infos may arrive in separate write requests, so it may be impossible to build the full OTLP metric in a stateless way. In this PR I'm ignoring this problem 😛, and transforming `job` and `instance` labels into resource attributes, while all other labels become scope attributes. Please focus on the latest commit when reviewing this PR :) 1c9ff80 --------- Signed-off-by: Arthur Silva Sens <[email protected]>
1 parent 2bf3d6f commit eaa3fd4

File tree

4 files changed

+266
-5
lines changed

4 files changed

+266
-5
lines changed
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receiver/prometheusremotewrite
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Parse labels from Prometheus Remote Write requests into Resource and Metric Attributes.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35656]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Warning - The HTTP Server still doesn't pass metrics to the next consumer. The component is unusable for now.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/prometheusremotewritereceiver/go.mod

+9-1
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/promet
33
go 1.22.0
44

55
require (
6+
github.com/cespare/xxhash/v2 v2.3.0
67
github.com/gogo/protobuf v1.3.2
78
github.com/golang/snappy v0.0.4
9+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.114.0
810
github.com/prometheus/prometheus v0.54.1
911
github.com/stretchr/testify v1.10.0
1012
go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8
@@ -29,7 +31,6 @@ require (
2931
github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect
3032
github.com/aws/aws-sdk-go v1.54.19 // indirect
3133
github.com/beorn7/perks v1.0.1 // indirect
32-
github.com/cespare/xxhash/v2 v2.3.0 // indirect
3334
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3435
github.com/dennwc/varint v1.0.0 // indirect
3536
github.com/felixge/httpsnoop v1.0.4 // indirect
@@ -56,6 +57,7 @@ require (
5657
github.com/modern-go/reflect2 v1.0.2 // indirect
5758
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
5859
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
60+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.115.0 // indirect
5961
github.com/pierrec/lz4/v4 v4.1.21 // indirect
6062
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
6163
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
@@ -105,3 +107,9 @@ require (
105107
k8s.io/klog/v2 v2.130.1 // indirect
106108
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
107109
)
110+
111+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
112+
113+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
114+
115+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

receiver/prometheusremotewritereceiver/receiver.go

+111-3
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@ import (
1212
"strings"
1313
"time"
1414

15+
"github.com/cespare/xxhash/v2"
1516
"github.com/gogo/protobuf/proto"
1617
promconfig "github.com/prometheus/prometheus/config"
18+
"github.com/prometheus/prometheus/model/labels"
1719
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
1820
promremote "github.com/prometheus/prometheus/storage/remote"
1921
"go.opentelemetry.io/collector/component"
2022
"go.opentelemetry.io/collector/component/componentstatus"
2123
"go.opentelemetry.io/collector/consumer"
24+
"go.opentelemetry.io/collector/pdata/pcommon"
2225
"go.opentelemetry.io/collector/pdata/pmetric"
2326
"go.opentelemetry.io/collector/receiver"
2427
"go.uber.org/zap/zapcore"
@@ -150,8 +153,113 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco
150153
}
151154

152155
// translateV2 translates a v2 remote-write request into OTLP metrics.
153-
// For now translateV2 is not implemented and returns an empty metrics.
156+
// translate is not feature complete.
154157
// nolint
155-
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
156-
return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil
158+
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
159+
var (
160+
badRequestErrors error
161+
otelMetrics = pmetric.NewMetrics()
162+
labelsBuilder = labels.NewScratchBuilder(0)
163+
stats = promremote.WriteResponseStats{}
164+
// Prometheus Remote-Write can send multiple time series with the same labels in the same request.
165+
// Instead of creating a whole new OTLP metric, we just append the new sample to the existing OTLP metric.
166+
// This cache is called "intra" because in the future we'll have a "interRequestCache" to cache resourceAttributes
167+
// between requests based on the metric "target_info".
168+
intraRequestCache = make(map[uint64]pmetric.ResourceMetrics)
169+
)
170+
171+
for _, ts := range req.Timeseries {
172+
ls := ts.ToLabels(&labelsBuilder, req.Symbols)
173+
174+
if !ls.Has(labels.MetricName) {
175+
badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("missing metric name in labels"))
176+
continue
177+
} else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate {
178+
badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel))
179+
continue
180+
}
181+
182+
var rm pmetric.ResourceMetrics
183+
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
184+
intraCacheEntry, ok := intraRequestCache[hashedLabels]
185+
if ok {
186+
// We found the same time series in the same request, so we should append to the same OTLP metric.
187+
rm = intraCacheEntry
188+
} else {
189+
rm = otelMetrics.ResourceMetrics().AppendEmpty()
190+
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance"))
191+
intraRequestCache[hashedLabels] = rm
192+
}
193+
194+
switch ts.Metadata.Type {
195+
case writev2.Metadata_METRIC_TYPE_COUNTER:
196+
addCounterDatapoints(rm, ls, ts)
197+
case writev2.Metadata_METRIC_TYPE_GAUGE:
198+
addGaugeDatapoints(rm, ls, ts)
199+
case writev2.Metadata_METRIC_TYPE_SUMMARY:
200+
addSummaryDatapoints(rm, ls, ts)
201+
case writev2.Metadata_METRIC_TYPE_HISTOGRAM:
202+
addHistogramDatapoints(rm, ls, ts)
203+
default:
204+
badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, ls.Get(labels.MetricName)))
205+
}
206+
}
207+
208+
return otelMetrics, stats, badRequestErrors
209+
}
210+
211+
// parseJobAndInstance turns the job and instance labels service resource attributes.
212+
// Following the specification at https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/
213+
func parseJobAndInstance(dest pcommon.Map, job, instance string) {
214+
if instance != "" {
215+
dest.PutStr("service.instance.id", instance)
216+
}
217+
if job != "" {
218+
parts := strings.Split(job, "/")
219+
if len(parts) == 2 {
220+
dest.PutStr("service.namespace", parts[0])
221+
dest.PutStr("service.name", parts[1])
222+
return
223+
}
224+
dest.PutStr("service.name", job)
225+
}
226+
}
227+
228+
func addCounterDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
229+
// TODO: Implement this function
230+
}
231+
232+
func addGaugeDatapoints(rm pmetric.ResourceMetrics, ls labels.Labels, ts writev2.TimeSeries) {
233+
// TODO: Cache metric name+type+unit and look up cache before creating new empty metric.
234+
// In OTel name+type+unit is the unique identifier of a metric and we should not create
235+
// a new metric if it already exists.
236+
237+
// TODO: Check if Scope is already present by comparing labels "otel_scope_name" and "otel_scope_version"
238+
// with Scope.Name and Scope.Version. If it is present, we should append to the existing Scope.
239+
m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge()
240+
addDatapoints(m.DataPoints(), ls, ts)
241+
}
242+
243+
func addSummaryDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
244+
// TODO: Implement this function
245+
}
246+
247+
func addHistogramDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
248+
// TODO: Implement this function
249+
}
250+
251+
// addDatapoints adds the labels to the datapoints attributes.
252+
// TODO: We're still not handling several fields that make a datapoint complete, e.g. StartTimestamp,
253+
// Timestamp, Value, etc.
254+
func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, _ writev2.TimeSeries) {
255+
attributes := datapoints.AppendEmpty().Attributes()
256+
257+
for _, l := range ls {
258+
if l.Name == "instance" || l.Name == "job" || // Become resource attributes "service.name", "service.instance.id" and "service.namespace"
259+
l.Name == labels.MetricName || // Becomes metric name
260+
l.Name == "otel_scope_name" || l.Name == "otel_scope_version" { // Becomes scope name and version
261+
continue
262+
}
263+
attributes.PutStr(l.Name, l.Value)
264+
}
157265
}

receiver/prometheusremotewritereceiver/receiver_test.go

+119-1
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,38 @@ import (
1414
"github.com/golang/snappy"
1515
promconfig "github.com/prometheus/prometheus/config"
1616
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
17+
"github.com/prometheus/prometheus/storage/remote"
1718
"github.com/stretchr/testify/assert"
1819
"go.opentelemetry.io/collector/component/componenttest"
1920
"go.opentelemetry.io/collector/consumer/consumertest"
21+
"go.opentelemetry.io/collector/pdata/pmetric"
2022
"go.opentelemetry.io/collector/receiver/receivertest"
23+
24+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
2125
)
2226

23-
func setupServer(t *testing.T) {
27+
var writeV2RequestFixture = &writev2.Request{
28+
Symbols: []string{"", "__name__", "test_metric1", "job", "service-x/test", "instance", "107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"},
29+
Timeseries: []writev2.TimeSeries{
30+
{
31+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
32+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
33+
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
34+
},
35+
{
36+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
37+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. Should use the same resource metrics.
38+
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
39+
},
40+
{
41+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
42+
LabelsRefs: []uint32{1, 2, 3, 9, 5, 10, 7, 8, 9, 10}, // This series has different label values for job and instance.
43+
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
44+
},
45+
},
46+
}
47+
48+
func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver {
2449
t.Helper()
2550

2651
factory := NewFactory()
@@ -30,6 +55,13 @@ func setupServer(t *testing.T) {
3055
assert.NoError(t, err)
3156
assert.NotNil(t, prwReceiver, "metrics receiver creation failed")
3257

58+
return prwReceiver.(*prometheusRemoteWriteReceiver)
59+
}
60+
61+
func setupServer(t *testing.T) {
62+
t.Helper()
63+
64+
prwReceiver := setupMetricsReceiver(t)
3365
ctx, cancel := context.WithCancel(context.Background())
3466
t.Cleanup(cancel)
3567

@@ -98,3 +130,89 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) {
98130
})
99131
}
100132
}
133+
134+
func TestTranslateV2(t *testing.T) {
135+
prwReceiver := setupMetricsReceiver(t)
136+
ctx, cancel := context.WithCancel(context.Background())
137+
t.Cleanup(cancel)
138+
139+
for _, tc := range []struct {
140+
name string
141+
request *writev2.Request
142+
expectError string
143+
expectedMetrics pmetric.Metrics
144+
expectedStats remote.WriteResponseStats
145+
}{
146+
{
147+
name: "missing metric name",
148+
request: &writev2.Request{
149+
Symbols: []string{"", "foo", "bar"},
150+
Timeseries: []writev2.TimeSeries{
151+
{
152+
LabelsRefs: []uint32{1, 2},
153+
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
154+
},
155+
},
156+
},
157+
expectError: "missing metric name in labels",
158+
},
159+
{
160+
name: "duplicate label",
161+
request: &writev2.Request{
162+
Symbols: []string{"", "__name__", "test"},
163+
Timeseries: []writev2.TimeSeries{
164+
{
165+
LabelsRefs: []uint32{1, 2, 1, 2},
166+
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
167+
},
168+
},
169+
},
170+
expectError: `duplicate label "__name__" in labels`,
171+
},
172+
{
173+
name: "valid request",
174+
request: writeV2RequestFixture,
175+
expectedMetrics: func() pmetric.Metrics {
176+
expected := pmetric.NewMetrics()
177+
rm1 := expected.ResourceMetrics().AppendEmpty()
178+
rmAttributes1 := rm1.Resource().Attributes()
179+
rmAttributes1.PutStr("service.namespace", "service-x")
180+
rmAttributes1.PutStr("service.name", "test")
181+
rmAttributes1.PutStr("service.instance.id", "107cn001")
182+
sm1 := rm1.ScopeMetrics().AppendEmpty()
183+
sm1Attributes := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes()
184+
sm1Attributes.PutStr("d", "e")
185+
sm1Attributes.PutStr("foo", "bar")
186+
// Since we don't check "scope_name" and "scope_version", we end up with duplicated scope metrics for repeated series.
187+
// TODO: Properly handle scope metrics.
188+
sm2 := rm1.ScopeMetrics().AppendEmpty()
189+
sm2Attributes := sm2.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes()
190+
sm2Attributes.PutStr("d", "e")
191+
sm2Attributes.PutStr("foo", "bar")
192+
193+
rm2 := expected.ResourceMetrics().AppendEmpty()
194+
rmAttributes2 := rm2.Resource().Attributes()
195+
rmAttributes2.PutStr("service.name", "foo")
196+
rmAttributes2.PutStr("service.instance.id", "bar")
197+
mAttributes2 := rm2.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes()
198+
mAttributes2.PutStr("d", "e")
199+
mAttributes2.PutStr("foo", "bar")
200+
201+
return expected
202+
}(),
203+
expectedStats: remote.WriteResponseStats{},
204+
},
205+
} {
206+
t.Run(tc.name, func(t *testing.T) {
207+
metrics, stats, err := prwReceiver.translateV2(ctx, tc.request)
208+
if tc.expectError != "" {
209+
assert.ErrorContains(t, err, tc.expectError)
210+
return
211+
}
212+
213+
assert.NoError(t, err)
214+
assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics))
215+
assert.Equal(t, tc.expectedStats, stats)
216+
})
217+
}
218+
}

0 commit comments

Comments
 (0)