Skip to content

Commit 1c9ff80

Browse files
committed
[receiver/prometheusremotewrite] Parse labels into resource and scope attributes
When translating Prometheus metrics to OTLP, we rely on the metric called 'target_info' that may come in different remote write requests. The target_info metric is used to populate resource attributes of different OTel metrics. Currently, we're not doing supporting the correct usage of 'target_info'. Signed-off-by: Arthur Silva Sens <[email protected]>
1 parent 48cd5d6 commit 1c9ff80

File tree

5 files changed

+220
-7
lines changed

5 files changed

+220
-7
lines changed
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receiver/prometheusremotewrite
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Parse labels from Prometheus Remote Write requests into Resource and Scope Attributes
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35656]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Warning - The HTTP Server still doesn't pass metrics to the next consumer. The component is unusable for now.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api, user]

receiver/prometheusremotewritereceiver/go.mod

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.22.0
55
require (
66
github.com/gogo/protobuf v1.3.2
77
github.com/golang/snappy v0.0.4
8+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0
89
github.com/prometheus/prometheus v0.54.1
910
github.com/stretchr/testify v1.9.0
1011
go.opentelemetry.io/collector/component v0.112.0
@@ -54,6 +55,7 @@ require (
5455
github.com/modern-go/reflect2 v1.0.2 // indirect
5556
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
5657
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
58+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0 // indirect
5759
github.com/pierrec/lz4/v4 v4.1.21 // indirect
5860
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
5961
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
@@ -77,7 +79,7 @@ require (
7779
go.opentelemetry.io/collector/pdata/pprofile v0.112.0 // indirect
7880
go.opentelemetry.io/collector/pipeline v0.112.0 // indirect
7981
go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 // indirect
80-
go.opentelemetry.io/collector/semconv v0.105.0 // indirect
82+
go.opentelemetry.io/collector/semconv v0.112.0 // indirect
8183
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
8284
go.opentelemetry.io/otel v1.31.0 // indirect
8385
go.opentelemetry.io/otel/metric v1.31.0 // indirect

receiver/prometheusremotewritereceiver/go.sum

+8-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/prometheusremotewritereceiver/receiver.go

+71-3
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ import (
1414

1515
"github.com/gogo/protobuf/proto"
1616
promconfig "github.com/prometheus/prometheus/config"
17+
"github.com/prometheus/prometheus/model/labels"
1718
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
1819
promremote "github.com/prometheus/prometheus/storage/remote"
1920
"go.opentelemetry.io/collector/component"
2021
"go.opentelemetry.io/collector/component/componentstatus"
2122
"go.opentelemetry.io/collector/consumer"
23+
"go.opentelemetry.io/collector/pdata/pcommon"
2224
"go.opentelemetry.io/collector/pdata/pmetric"
2325
"go.opentelemetry.io/collector/receiver"
2426
"go.uber.org/zap/zapcore"
@@ -28,6 +30,7 @@ func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsume
2830
return &prometheusRemoteWriteReceiver{
2931
settings: settings,
3032
nextConsumer: nextConsumer,
33+
jobInstanceCache: make(map[string]pmetric.ResourceMetrics),
3134
config: cfg,
3235
server: &http.Server{
3336
ReadTimeout: 60 * time.Second,
@@ -39,6 +42,8 @@ type prometheusRemoteWriteReceiver struct {
3942
settings receiver.Settings
4043
nextConsumer consumer.Metrics
4144

45+
jobInstanceCache map[string]pmetric.ResourceMetrics
46+
4247
config *Config
4348
server *http.Server
4449
}
@@ -150,8 +155,71 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco
150155
}
151156

152157
// translateV2 translates a v2 remote-write request into OTLP metrics.
153-
// For now translateV2 is not implemented and returns an empty metrics.
158+
// translate is not feature complete.
154159
// nolint
155-
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
156-
return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil
160+
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
161+
var (
162+
badRequestErrors []error
163+
otelMetrics = pmetric.NewMetrics()
164+
b = labels.NewScratchBuilder(0)
165+
stats = promremote.WriteResponseStats{}
166+
)
167+
168+
169+
for _, ts := range req.Timeseries {
170+
ls := ts.ToLabels(&b, req.Symbols)
171+
172+
if !ls.Has(labels.MetricName) {
173+
badRequestErrors = append(badRequestErrors, fmt.Errorf("missing metric name in labels"))
174+
continue
175+
} else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate {
176+
badRequestErrors = append(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel))
177+
continue
178+
}
179+
180+
var rm pmetric.ResourceMetrics
181+
// This cache should be populated by the metric 'target_info', but we're not handling it yet.
182+
cacheEntry, ok := prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")]
183+
if ok {
184+
rm = pmetric.NewResourceMetrics()
185+
cacheEntry.CopyTo(rm)
186+
} else {
187+
// A remote-write request can have multiple timeseries with the same instance and job labels.
188+
// While they are different timeseries in Prometheus, we're handling it as the same OTLP metric
189+
// until we support 'target_info'.
190+
rm = otelMetrics.ResourceMetrics().AppendEmpty()
191+
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("instance"), ls.Get("job"))
192+
prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")] = rm
193+
194+
195+
scopeAttributes := rm.ScopeMetrics().AppendEmpty().Scope().Attributes()
196+
for _, l := range ls {
197+
if l.Name == "instance" || l.Name == "job" || l.Name == labels.MetricName {
198+
continue
199+
}
200+
scopeAttributes.PutStr(l.Name, l.Value)
201+
}
202+
}
203+
204+
// Next step is to process metadata and samples.
205+
}
206+
207+
return otelMetrics, stats, errors.Join(badRequestErrors...)
208+
}
209+
210+
// parseJobAndInstance turns the job and instance labels service resource attributes.
211+
// Following the specification at https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/
212+
func parseJobAndInstance(dest pcommon.Map, instance, job string) {
213+
if job != "" {
214+
dest.PutStr("service.namespace", job)
215+
}
216+
if instance != "" {
217+
parts := strings.Split(instance, "/")
218+
if len(parts) == 2 {
219+
dest.PutStr("service.name", parts[0])
220+
dest.PutStr("service.instance.id", parts[1])
221+
return
222+
}
223+
dest.PutStr("service.name", instance)
224+
}
157225
}

receiver/prometheusremotewritereceiver/receiver_test.go

+111-1
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,37 @@ import (
1414
"github.com/golang/snappy"
1515
promconfig "github.com/prometheus/prometheus/config"
1616
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
17+
"github.com/prometheus/prometheus/storage/remote"
1718
"github.com/stretchr/testify/assert"
1819
"go.opentelemetry.io/collector/component/componenttest"
1920
"go.opentelemetry.io/collector/consumer/consumertest"
21+
"go.opentelemetry.io/collector/pdata/pmetric"
2022
"go.opentelemetry.io/collector/receiver/receivertest"
23+
24+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
2125
)
2226

23-
func setupServer(t *testing.T) {
27+
var (
28+
writeV2RequestFixture = &writev2.Request{
29+
Symbols: []string{"", "__name__", "test_metric1", "job", "test", "instance", "service-x/107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"},
30+
Timeseries: []writev2.TimeSeries{
31+
{
32+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
33+
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
34+
},
35+
{
36+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. Should use the same resource metrics.
37+
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
38+
},
39+
{
40+
LabelsRefs: []uint32{1, 2, 3, 9, 5, 10, 7, 8, 9, 10}, // This series has different label values for job and instance.
41+
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
42+
},
43+
},
44+
}
45+
)
46+
47+
func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver {
2448
t.Helper()
2549

2650
factory := NewFactory()
@@ -30,6 +54,13 @@ func setupServer(t *testing.T) {
3054
assert.NoError(t, err)
3155
assert.NotNil(t, prwReceiver, "metrics receiver creation failed")
3256

57+
return prwReceiver.(*prometheusRemoteWriteReceiver)
58+
}
59+
60+
func setupServer(t *testing.T) {
61+
t.Helper()
62+
63+
prwReceiver := setupMetricsReceiver(t)
3364
ctx, cancel := context.WithCancel(context.Background())
3465
t.Cleanup(cancel)
3566

@@ -98,3 +129,82 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) {
98129
})
99130
}
100131
}
132+
133+
func TestTranslateV2(t *testing.T) {
134+
prwReceiver := setupMetricsReceiver(t)
135+
ctx, cancel := context.WithCancel(context.Background())
136+
t.Cleanup(cancel)
137+
138+
for _, tc := range []struct {
139+
name string
140+
request *writev2.Request
141+
expectError string
142+
expectedMetrics pmetric.Metrics
143+
expectedStats remote.WriteResponseStats
144+
}{
145+
{
146+
name: "missing metric name",
147+
request: &writev2.Request{
148+
Symbols: []string{"", "foo", "bar"},
149+
Timeseries: []writev2.TimeSeries{
150+
{
151+
LabelsRefs: []uint32{1, 2},
152+
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
153+
},
154+
},
155+
},
156+
expectError: "missing metric name in labels",
157+
},
158+
{
159+
name: "duplicate label",
160+
request: &writev2.Request{
161+
Symbols: []string{"", "__name__", "test"},
162+
Timeseries: []writev2.TimeSeries{
163+
{
164+
LabelsRefs: []uint32{1, 2, 1, 2},
165+
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
166+
},
167+
},
168+
},
169+
expectError: `duplicate label "__name__" in labels`,
170+
},
171+
{
172+
name: "valid request",
173+
request: writeV2RequestFixture,
174+
expectedMetrics: func() pmetric.Metrics {
175+
expected := pmetric.NewMetrics()
176+
rm1 := expected.ResourceMetrics().AppendEmpty()
177+
rmAttributes1 := rm1.Resource().Attributes()
178+
rmAttributes1.PutStr("service.namespace", "test")
179+
rmAttributes1.PutStr("service.name", "service-x")
180+
rmAttributes1.PutStr("service.instance.id", "107cn001")
181+
smAttributes1 := rm1.ScopeMetrics().AppendEmpty().Scope().Attributes()
182+
smAttributes1.PutStr("d", "e")
183+
smAttributes1.PutStr("foo", "bar")
184+
185+
rm2 := expected.ResourceMetrics().AppendEmpty()
186+
rmAttributes2 := rm2.Resource().Attributes()
187+
rmAttributes2.PutStr("service.namespace", "foo")
188+
rmAttributes2.PutStr("service.name", "bar")
189+
smAttributes2 := rm2.ScopeMetrics().AppendEmpty().Scope().Attributes()
190+
smAttributes2.PutStr("d", "e")
191+
smAttributes2.PutStr("foo", "bar")
192+
193+
return expected
194+
}(),
195+
expectedStats: remote.WriteResponseStats{},
196+
},
197+
} {
198+
t.Run(tc.name, func(t *testing.T) {
199+
metrics, stats, err := prwReceiver.translateV2(ctx, tc.request)
200+
if tc.expectError != "" {
201+
assert.ErrorContains(t, err, tc.expectError)
202+
return
203+
}
204+
205+
assert.NoError(t, err)
206+
assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics))
207+
assert.Equal(t, tc.expectedStats, stats)
208+
})
209+
}
210+
}

0 commit comments

Comments
 (0)