Skip to content

Commit 1af5afa

Browse files
axwAneurysm9
andauthored
[receiver/awsfirehose] Refactor to use pdata unmarshaler interfaces (#37361)
#### Description Refactor unmarshallers to fit into the encoding framework. The internal unmarshallers now implement `plog.Unmarshaler` and `pmetric.Unmarshaler`. This will enable us to use encoding extensions in a followup, and will enable us to extract the unmarshallers later as encoding extensions. As a result of the interface change, the unmarshallers now unmarshal a single record at a time, which means we cannot merge resources/metrics as we go, and only within each record. This impacts performance, so to offset that we implement various optimisations: - Use json-iterator for decoding JSON - Use klauspost/compress for decompressing gzip - Pool gzip readers - Remove pointer type from cwMetricValue to avoid allocation - Don't read the whole request body into memory - Reuse buffer for decoding base64; decode as we go There are more optimisations we can make to reduce memory allocations, e.g. avoid reflection when decoding JSON. There's a fix for a subtle bug in the cwmetrics unmarshaller where the unit of a metric was not considered part of its identity, and so two metrics that differed only by unit would be merged. #### Link to tracking issue Preparation for #37113 #### Testing - Added tests for consuming requests containing multiple records. - Added benchmarks for the full request/response flow, for cwlogs and cwmetrics record types. There's an increase in memory usage for cwmetrics unmarshalling, and decrease for cwlogs unmarshalling. CPU usage is better across the board. ``` goos: linux goarch: amd64 pkg: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver cpu: AMD Ryzen 7 PRO 5850U with Radeon Graphics │ /tmp/old.txt │ /tmp/new.txt │ │ sec/op │ sec/op vs base │ LogsConsumer_cwlogs/10resources_10records_1logs-16 142.03µ ± 8% 75.85µ ± 16% -46.60% (p=0.002 n=6) LogsConsumer_cwlogs/10resources_10records_10logs-16 413.1µ ± 5% 208.0µ ± 23% -49.65% (p=0.002 n=6) LogsConsumer_cwlogs/10resources_100records_1logs-16 1435.8µ ± 8% 728.0µ ± 11% -49.30% (p=0.002 n=6) LogsConsumer_cwlogs/10resources_100records_10logs-16 4.200m ± 2% 1.875m ± 4% -55.34% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_10records_1metrics-16 104.30µ ± 16% 70.26µ ± 12% -32.64% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_10records_10metrics-16 821.7µ ± 7% 505.5µ ± 5% -38.48% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_100records_1metrics-16 780.2µ ± 5% 647.0µ ± 8% -17.07% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_100records_10metrics-16 7.823m ± 5% 5.297m ± 11% -32.29% (p=0.002 n=6) geomean 809.9µ 475.7µ -41.26% │ /tmp/old.txt │ /tmp/new.txt │ │ B/op │ B/op vs base │ LogsConsumer_cwlogs/10resources_10records_1logs-16 437.78Ki ± 0% 61.70Ki ± 1% -85.91% (p=0.002 n=6) LogsConsumer_cwlogs/10resources_10records_10logs-16 533.7Ki ± 0% 124.9Ki ± 2% -76.60% (p=0.002 n=6) LogsConsumer_cwlogs/10resources_100records_1logs-16 4319.1Ki ± 0% 550.1Ki ± 0% -87.26% (p=0.002 n=6) LogsConsumer_cwlogs/10resources_100records_10logs-16 5.167Mi ± 0% 1.172Mi ± 1% -77.31% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_10records_1metrics-16 47.88Ki ± 5% 83.71Ki ± 0% +74.84% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_10records_10metrics-16 390.0Ki ± 1% 415.4Ki ± 1% +6.51% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_100records_1metrics-16 358.9Ki ± 0% 772.9Ki ± 0% +115.37% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_100records_10metrics-16 3.730Mi ± 0% 3.962Mi ± 0% +6.23% (p=0.002 n=6) geomean 779.7Ki 391.8Ki -49.76% │ /tmp/old.txt │ /tmp/new.txt │ │ allocs/op │ allocs/op vs base │ LogsConsumer_cwlogs/10resources_10records_1logs-16 406.0 ± 2% 348.0 ± 2% -14.29% (p=0.002 n=6) LogsConsumer_cwlogs/10resources_10records_10logs-16 2.025k ± 0% 1.971k ± 1% -2.64% (p=0.002 n=6) LogsConsumer_cwlogs/10resources_100records_1logs-16 2.922k ± 0% 3.094k ± 0% +5.89% (p=0.002 n=6) LogsConsumer_cwlogs/10resources_100records_10logs-16 18.46k ± 0% 19.46k ± 1% +5.45% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_10records_1metrics-16 548.5 ± 8% 653.0 ± 0% +19.05% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_10records_10metrics-16 3.795k ± 3% 4.729k ± 1% +24.63% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_100records_1metrics-16 3.281k ± 0% 6.161k ± 0% +87.78% (p=0.002 n=6) MetricsConsumer_cwmetrics/10resources_100records_10metrics-16 28.02k ± 0% 46.90k ± 0% +67.39% (p=0.002 n=6) geomean 3.098k 3.722k +20.16% ``` #### Documentation N/A --------- Co-authored-by: Anthony Mirabella <[email protected]>
1 parent 1f8c1ee commit 1af5afa

27 files changed

+994
-862
lines changed
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awsfirehosereceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Refactor unmarshallers to implement pdata unmarshaler interfaces
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37361]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package awsfirehosereceiver
5+
6+
import (
7+
"bytes"
8+
"compress/gzip"
9+
"context"
10+
"encoding/base64"
11+
"encoding/json"
12+
"fmt"
13+
"math/rand/v2"
14+
"net/http"
15+
"net/http/httptest"
16+
"testing"
17+
18+
"github.com/stretchr/testify/assert"
19+
"github.com/stretchr/testify/require"
20+
"go.opentelemetry.io/collector/component/componenttest"
21+
"go.opentelemetry.io/collector/consumer/consumertest"
22+
"go.opentelemetry.io/collector/receiver/receivertest"
23+
)
24+
25+
func BenchmarkLogsConsumer_cwlogs(b *testing.B) {
26+
// numLogGroups is the maximum number of unique log groups
27+
// to use across the generated logs, using a random generator
28+
// with fixed seeds for repeatability.
29+
const numLogGroups = 10
30+
rng := rand.New(rand.NewPCG(1, 2))
31+
32+
// numRecords is the number of records in the Firehose envelope.
33+
for _, numRecords := range []int{10, 100} {
34+
// numLogs is the number of CoudWatch log records within a Firehose record.
35+
for _, numLogs := range []int{1, 10} {
36+
b.Run(fmt.Sprintf("%dresources_%drecords_%dlogs", numLogGroups, numRecords, numLogs), func(b *testing.B) {
37+
config := createDefaultConfig().(*Config)
38+
config.Endpoint = "localhost:0"
39+
r, err := createLogsReceiver(
40+
context.Background(),
41+
receivertest.NewNopSettings(),
42+
config,
43+
consumertest.NewNop(),
44+
)
45+
require.NoError(b, err)
46+
47+
err = r.Start(context.Background(), componenttest.NewNopHost())
48+
require.NoError(b, err)
49+
b.Cleanup(func() {
50+
err = r.Shutdown(context.Background())
51+
assert.NoError(b, err)
52+
})
53+
54+
records := make([]firehoseRecord, numRecords)
55+
for i := range records {
56+
records[i] = firehoseRecord{
57+
Data: base64.StdEncoding.EncodeToString(
58+
makeCloudWatchLogRecord(rng, numLogs, numLogGroups),
59+
),
60+
}
61+
}
62+
fr := testFirehoseRequest(testFirehoseRequestID, records)
63+
body, err := json.Marshal(fr)
64+
require.NoError(b, err)
65+
66+
b.ResetTimer()
67+
for i := 0; i < b.N; i++ {
68+
req := newTestRequest(body)
69+
recorder := httptest.NewRecorder()
70+
r.(http.Handler).ServeHTTP(recorder, req)
71+
if recorder.Code != http.StatusOK {
72+
b.Fatalf("expected status code 200, got %d", recorder.Code)
73+
}
74+
}
75+
})
76+
}
77+
}
78+
}
79+
80+
func BenchmarkMetricsConsumer_cwmetrics(b *testing.B) {
81+
// numStreams is the maximum number of unique metric streams
82+
// to use across the generated metrics, using a random generator
83+
// with fixed seeds for repeatability.
84+
const numStreams = 10
85+
rng := rand.New(rand.NewPCG(1, 2))
86+
87+
// numRecords is the number of records in the Firehose envelope.
88+
for _, numRecords := range []int{10, 100} {
89+
// numMetrics is the number of CoudWatch metrics within a Firehose record.
90+
for _, numMetrics := range []int{1, 10} {
91+
b.Run(fmt.Sprintf("%dresources_%drecords_%dmetrics", numStreams, numRecords, numMetrics), func(b *testing.B) {
92+
config := createDefaultConfig().(*Config)
93+
config.Endpoint = "localhost:0"
94+
r, err := createMetricsReceiver(
95+
context.Background(),
96+
receivertest.NewNopSettings(),
97+
config,
98+
consumertest.NewNop(),
99+
)
100+
require.NoError(b, err)
101+
102+
err = r.Start(context.Background(), componenttest.NewNopHost())
103+
require.NoError(b, err)
104+
b.Cleanup(func() {
105+
err = r.Shutdown(context.Background())
106+
assert.NoError(b, err)
107+
})
108+
109+
records := make([]firehoseRecord, numRecords)
110+
for i := range records {
111+
records[i] = firehoseRecord{
112+
Data: base64.StdEncoding.EncodeToString(
113+
makeCloudWatchMetricRecord(rng, numMetrics, numStreams),
114+
),
115+
}
116+
}
117+
118+
fr := testFirehoseRequest(testFirehoseRequestID, records)
119+
body, err := json.Marshal(fr)
120+
require.NoError(b, err)
121+
122+
b.ResetTimer()
123+
for i := 0; i < b.N; i++ {
124+
req := newTestRequest(body)
125+
recorder := httptest.NewRecorder()
126+
r.(http.Handler).ServeHTTP(recorder, req)
127+
if recorder.Code != http.StatusOK {
128+
b.Fatalf("expected status code 200, got %d", recorder.Code)
129+
}
130+
}
131+
})
132+
}
133+
}
134+
}
135+
136+
func makeCloudWatchLogRecord(rng *rand.Rand, numLogs, numLogGroups int) []byte {
137+
var buf bytes.Buffer
138+
w := gzip.NewWriter(&buf)
139+
for i := 0; i < numLogs; i++ {
140+
group := rng.IntN(numLogGroups)
141+
fmt.Fprintf(w,
142+
`{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"group_%d","logStream":"stream","logEvents":[{"id":"the_id","timestamp":1725594035523,"message":"message %d"}]}`,
143+
group, i,
144+
)
145+
fmt.Fprintln(w)
146+
}
147+
if err := w.Close(); err != nil {
148+
panic(err)
149+
}
150+
return buf.Bytes()
151+
}
152+
153+
func makeCloudWatchMetricRecord(rng *rand.Rand, numMetrics, numStreams int) []byte {
154+
var buf bytes.Buffer
155+
for i := 0; i < numMetrics; i++ {
156+
stream := rng.IntN(numStreams)
157+
fmt.Fprintf(&buf,
158+
`{"metric_stream_name":"stream_%d","account_id":"1234567890","region":"us-east-1","namespace":"AWS/NATGateway","metric_name":"metric_%d","dimensions":{"NatGatewayId":"nat-01a4160dfb995b990"},"timestamp":1643916720000,"value":{"max":0.0,"min":0.0,"sum":0.0,"count":2.0},"unit":"Count"}`,
159+
stream, i,
160+
)
161+
fmt.Fprintln(&buf)
162+
}
163+
return buf.Bytes()
164+
}

receiver/awsfirehosereceiver/factory.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ import (
1010
"go.opentelemetry.io/collector/component"
1111
"go.opentelemetry.io/collector/config/confighttp"
1212
"go.opentelemetry.io/collector/consumer"
13+
"go.opentelemetry.io/collector/pdata/plog"
14+
"go.opentelemetry.io/collector/pdata/pmetric"
1315
"go.opentelemetry.io/collector/receiver"
1416
"go.uber.org/zap"
1517

1618
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
17-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler"
1819
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog"
1920
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream"
2021
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
@@ -54,19 +55,19 @@ func validateRecordType(recordType string) error {
5455

5556
// defaultMetricsUnmarshalers creates a map of the available metrics
5657
// unmarshalers.
57-
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.MetricsUnmarshaler {
58+
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]pmetric.Unmarshaler {
5859
cwmsu := cwmetricstream.NewUnmarshaler(logger)
5960
otlpv1msu := otlpmetricstream.NewUnmarshaler(logger)
60-
return map[string]unmarshaler.MetricsUnmarshaler{
61+
return map[string]pmetric.Unmarshaler{
6162
cwmsu.Type(): cwmsu,
6263
otlpv1msu.Type(): otlpv1msu,
6364
}
6465
}
6566

6667
// defaultLogsUnmarshalers creates a map of the available logs unmarshalers.
67-
func defaultLogsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.LogsUnmarshaler {
68+
func defaultLogsUnmarshalers(logger *zap.Logger) map[string]plog.Unmarshaler {
6869
u := cwlog.NewUnmarshaler(logger)
69-
return map[string]unmarshaler.LogsUnmarshaler{
70+
return map[string]plog.Unmarshaler{
7071
u.Type(): u,
7172
}
7273
}

receiver/awsfirehosereceiver/go.mod

+11-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ go 1.22.0
44

55
require (
66
github.com/gogo/protobuf v1.3.2
7+
github.com/json-iterator/go v1.1.12
8+
github.com/klauspost/compress v1.17.11
9+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.119.0
710
github.com/stretchr/testify v1.10.0
811
go.opentelemetry.io/collector/component v0.119.0
912
go.opentelemetry.io/collector/component/componentstatus v0.119.0
@@ -24,6 +27,7 @@ require (
2427
)
2528

2629
require (
30+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2731
github.com/davecgh/go-spew v1.1.1 // indirect
2832
github.com/felixge/httpsnoop v1.0.4 // indirect
2933
github.com/fsnotify/fsnotify v1.8.0 // indirect
@@ -32,15 +36,14 @@ require (
3236
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
3337
github.com/golang/snappy v0.0.4 // indirect
3438
github.com/google/uuid v1.6.0 // indirect
35-
github.com/json-iterator/go v1.1.12 // indirect
36-
github.com/klauspost/compress v1.17.11 // indirect
3739
github.com/knadh/koanf/maps v0.1.1 // indirect
3840
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
3941
github.com/knadh/koanf/v2 v2.1.2 // indirect
4042
github.com/mitchellh/copystructure v1.2.0 // indirect
4143
github.com/mitchellh/reflectwalk v1.0.2 // indirect
4244
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
4345
github.com/modern-go/reflect2 v1.0.2 // indirect
46+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.119.0 // indirect
4447
github.com/pierrec/lz4/v4 v4.1.22 // indirect
4548
github.com/pmezard/go-difflib v1.0.0 // indirect
4649
github.com/rs/cors v1.11.1 // indirect
@@ -76,3 +79,9 @@ retract (
7679
v0.76.1
7780
v0.65.0
7881
)
82+
83+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
84+
85+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
86+
87+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

receiver/awsfirehosereceiver/go.sum

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go

-48
This file was deleted.

receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go

-54
This file was deleted.

0 commit comments

Comments
 (0)