Skip to content

Commit 305087b

Browse files
Merge branch 'main' into s3exporter-canned-acl
2 parents d635048 + bc7f155 commit 305087b

File tree

12 files changed

+256
-38
lines changed

12 files changed

+256
-38
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awsfirehosereceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add `cloud.provider`, `aws.log.group.names` and `aws.log.stream.names` resource attributes to logs and set scope name and version of logs and metrics ingested by awsfirehosereceiver.
11+
12+
13+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
14+
issues: [37968]
15+
16+
# (Optional) One or more lines of additional information to render under the primary note.
17+
# These lines will be padded with 2 spaces and then inserted directly into the document.
18+
# Use pipe (|) for multiline entries.
19+
subtext:
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: [user]
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: testbed
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix batch interval calculation to avoid possible division by zero
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38084]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]

receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go

+18-5
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@ import (
1313

1414
jsoniter "github.com/json-iterator/go"
1515
"github.com/klauspost/compress/gzip"
16+
"go.opentelemetry.io/collector/component"
1617
"go.opentelemetry.io/collector/pdata/pcommon"
1718
"go.opentelemetry.io/collector/pdata/plog"
1819
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
1920
"go.uber.org/zap"
21+
22+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
2023
)
2124

2225
const (
@@ -30,15 +33,16 @@ var errInvalidRecords = errors.New("record format invalid")
3033

3134
// Unmarshaler for the CloudWatch Log JSON record format.
3235
type Unmarshaler struct {
33-
logger *zap.Logger
34-
gzipPool sync.Pool
36+
logger *zap.Logger
37+
buildInfo component.BuildInfo
38+
gzipPool sync.Pool
3539
}
3640

3741
var _ plog.Unmarshaler = (*Unmarshaler)(nil)
3842

3943
// NewUnmarshaler creates a new instance of the Unmarshaler.
40-
func NewUnmarshaler(logger *zap.Logger) *Unmarshaler {
41-
return &Unmarshaler{logger: logger}
44+
func NewUnmarshaler(logger *zap.Logger, buildInfo component.BuildInfo) *Unmarshaler {
45+
return &Unmarshaler{logger: logger, buildInfo: buildInfo}
4246
}
4347

4448
// UnmarshalLogs deserializes the given record as CloudWatch Logs events
@@ -120,10 +124,19 @@ func (u *Unmarshaler) UnmarshalLogs(compressedRecord []byte) (plog.Logs, error)
120124
for resourceKey, logRecords := range byResource {
121125
rl := logs.ResourceLogs().AppendEmpty()
122126
resourceAttrs := rl.Resource().Attributes()
127+
resourceAttrs.PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS)
123128
resourceAttrs.PutStr(conventions.AttributeCloudAccountID, resourceKey.owner)
129+
resourceAttrs.PutEmptySlice(conventions.AttributeAWSLogGroupNames).AppendEmpty().SetStr(resourceKey.logGroup)
130+
resourceAttrs.PutEmptySlice(conventions.AttributeAWSLogStreamNames).AppendEmpty().SetStr(resourceKey.logStream)
131+
// Deprecated: [v0.121.0] Use `conventions.AttributeAWSLogGroupNames` instead
124132
resourceAttrs.PutStr(attributeAWSCloudWatchLogGroupName, resourceKey.logGroup)
133+
// Deprecated: [v0.121.0] Use `conventions.AttributeAWSLogStreamNames` instead
125134
resourceAttrs.PutStr(attributeAWSCloudWatchLogStreamName, resourceKey.logStream)
126-
logRecords.MoveAndAppendTo(rl.ScopeLogs().AppendEmpty().LogRecords())
135+
136+
sl := rl.ScopeLogs().AppendEmpty()
137+
sl.Scope().SetName(metadata.ScopeName)
138+
sl.Scope().SetVersion(u.buildInfo.Version)
139+
logRecords.MoveAndAppendTo(sl.LogRecords())
127140
}
128141
return logs, nil
129142
}

receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go

+67-19
Original file line numberDiff line numberDiff line change
@@ -10,46 +10,62 @@ import (
1010
"path/filepath"
1111
"testing"
1212

13+
"github.com/stretchr/testify/assert"
1314
"github.com/stretchr/testify/require"
15+
"go.opentelemetry.io/collector/component"
16+
"go.opentelemetry.io/collector/pdata/pcommon"
17+
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
1418
"go.uber.org/zap"
19+
20+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
1521
)
1622

1723
func TestType(t *testing.T) {
18-
unmarshaler := NewUnmarshaler(zap.NewNop())
24+
unmarshaler := NewUnmarshaler(zap.NewNop(), component.NewDefaultBuildInfo())
1925
require.Equal(t, TypeStr, unmarshaler.Type())
2026
}
2127

2228
func TestUnmarshal(t *testing.T) {
23-
unmarshaler := NewUnmarshaler(zap.NewNop())
29+
unmarshaler := NewUnmarshaler(zap.NewNop(), component.NewDefaultBuildInfo())
2430
testCases := map[string]struct {
25-
filename string
26-
wantResourceCount int
27-
wantLogCount int
28-
wantErr error
31+
filename string
32+
wantResourceCount int
33+
wantLogCount int
34+
wantErr error
35+
wantResourceLogGroups [][]string
36+
wantResourceLogStreams [][]string
2937
}{
3038
"WithMultipleRecords": {
31-
filename: "multiple_records",
32-
wantResourceCount: 1,
33-
wantLogCount: 2,
39+
filename: "multiple_records",
40+
wantResourceCount: 1,
41+
wantLogCount: 2,
42+
wantResourceLogGroups: [][]string{{"test"}},
43+
wantResourceLogStreams: [][]string{{"test"}},
3444
},
3545
"WithSingleRecord": {
36-
filename: "single_record",
37-
wantResourceCount: 1,
38-
wantLogCount: 1,
46+
filename: "single_record",
47+
wantResourceCount: 1,
48+
wantLogCount: 1,
49+
wantResourceLogGroups: [][]string{{"test"}},
50+
wantResourceLogStreams: [][]string{{"test"}},
3951
},
4052
"WithInvalidRecords": {
4153
filename: "invalid_records",
4254
wantErr: errInvalidRecords,
4355
},
4456
"WithSomeInvalidRecords": {
45-
filename: "some_invalid_records",
46-
wantResourceCount: 1,
47-
wantLogCount: 2,
57+
filename: "some_invalid_records",
58+
wantResourceCount: 1,
59+
wantLogCount: 2,
60+
wantResourceLogGroups: [][]string{{"test"}},
61+
wantResourceLogStreams: [][]string{{"test"}},
4862
},
4963
"WithMultipleResources": {
50-
filename: "multiple_resources",
51-
wantResourceCount: 3,
52-
wantLogCount: 6,
64+
filename: "multiple_resources",
65+
wantResourceCount: 3,
66+
wantLogCount: 6,
67+
wantResourceLogGroups: nil, // not checking log group names because logs are unordered
68+
wantResourceLogStreams: nil, // not checking log stream names because logs are unordered
5369
},
5470
}
5571
for name, testCase := range testCases {
@@ -72,7 +88,18 @@ func TestUnmarshal(t *testing.T) {
7288
for i := 0; i < got.ResourceLogs().Len(); i++ {
7389
rm := got.ResourceLogs().At(i)
7490
require.Equal(t, 1, rm.ScopeLogs().Len())
91+
attrs := rm.Resource().Attributes()
92+
assertString(t, attrs, conventions.AttributeCloudProvider, "aws")
93+
assertString(t, attrs, conventions.AttributeCloudAccountID, "123")
94+
if testCase.wantResourceLogGroups != nil {
95+
assertStringArray(t, attrs, conventions.AttributeAWSLogGroupNames, testCase.wantResourceLogGroups[i])
96+
}
97+
if testCase.wantResourceLogStreams != nil {
98+
assertStringArray(t, attrs, conventions.AttributeAWSLogStreamNames, testCase.wantResourceLogStreams[i])
99+
}
75100
ilm := rm.ScopeLogs().At(0)
101+
assert.Equal(t, metadata.ScopeName, ilm.Scope().Name())
102+
assert.Equal(t, component.NewDefaultBuildInfo().Version, ilm.Scope().Version())
76103
gotLogCount += ilm.LogRecords().Len()
77104
}
78105
require.Equal(t, testCase.wantLogCount, gotLogCount)
@@ -82,7 +109,7 @@ func TestUnmarshal(t *testing.T) {
82109
}
83110

84111
func TestLogTimestamp(t *testing.T) {
85-
unmarshaler := NewUnmarshaler(zap.NewNop())
112+
unmarshaler := NewUnmarshaler(zap.NewNop(), component.NewDefaultBuildInfo())
86113
record, err := os.ReadFile(filepath.Join(".", "testdata", "single_record"))
87114
require.NoError(t, err)
88115

@@ -114,3 +141,24 @@ func gzipData(data []byte) ([]byte, error) {
114141
}
115142
return b.Bytes(), nil
116143
}
144+
145+
func assertString(t *testing.T, m pcommon.Map, key, expected string) {
146+
t.Helper()
147+
148+
v, ok := m.Get(key)
149+
require.True(t, ok)
150+
assert.Equal(t, expected, v.AsRaw())
151+
}
152+
153+
func assertStringArray(t *testing.T, m pcommon.Map, key string, expected []string) {
154+
t.Helper()
155+
156+
v, ok := m.Get(key)
157+
require.True(t, ok)
158+
s := v.Slice().AsRaw()
159+
vAsStrings := make([]string, len(s))
160+
for i, v := range s {
161+
vAsStrings[i] = v.(string)
162+
}
163+
assert.ElementsMatch(t, expected, vAsStrings)
164+
}

receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ import (
1111
"time"
1212

1313
jsoniter "github.com/json-iterator/go"
14+
"go.opentelemetry.io/collector/component"
1415
"go.opentelemetry.io/collector/pdata/pcommon"
1516
"go.opentelemetry.io/collector/pdata/pmetric"
1617
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
1718
"go.uber.org/zap"
19+
20+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
1821
)
1922

2023
const (
@@ -33,13 +36,15 @@ var errInvalidRecords = errors.New("record format invalid")
3336
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-json.html
3437
type Unmarshaler struct {
3538
logger *zap.Logger
39+
40+
buildInfo component.BuildInfo
3641
}
3742

3843
var _ pmetric.Unmarshaler = (*Unmarshaler)(nil)
3944

4045
// NewUnmarshaler creates a new instance of the Unmarshaler.
41-
func NewUnmarshaler(logger *zap.Logger) *Unmarshaler {
42-
return &Unmarshaler{logger}
46+
func NewUnmarshaler(logger *zap.Logger, buildInfo component.BuildInfo) *Unmarshaler {
47+
return &Unmarshaler{logger, buildInfo}
4348
}
4449

4550
// UnmarshalMetrics deserializes the record in CloudWatch Metric Stream JSON
@@ -122,6 +127,8 @@ func (u Unmarshaler) UnmarshalMetrics(record []byte) (pmetric.Metrics, error) {
122127
rm := metrics.ResourceMetrics().AppendEmpty()
123128
setResourceAttributes(resourceKey, rm.Resource())
124129
scopeMetrics := rm.ScopeMetrics().AppendEmpty()
130+
scopeMetrics.Scope().SetName(metadata.ScopeName)
131+
scopeMetrics.Scope().SetVersion(u.buildInfo.Version)
125132
for _, metric := range metricsMap {
126133
metric.MoveTo(scopeMetrics.Metrics().AppendEmpty())
127134
}

receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@ import (
1010

1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/collector/component"
1314
"go.opentelemetry.io/collector/pdata/pcommon"
1415
"go.opentelemetry.io/collector/pdata/pmetric"
1516
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
1617
"go.uber.org/zap"
18+
19+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
1720
)
1821

1922
const (
@@ -24,12 +27,12 @@ const (
2427
)
2528

2629
func TestType(t *testing.T) {
27-
unmarshaler := NewUnmarshaler(zap.NewNop())
30+
unmarshaler := NewUnmarshaler(zap.NewNop(), component.NewDefaultBuildInfo())
2831
require.Equal(t, TypeStr, unmarshaler.Type())
2932
}
3033

3134
func TestUnmarshal(t *testing.T) {
32-
unmarshaler := NewUnmarshaler(zap.NewNop())
35+
unmarshaler := NewUnmarshaler(zap.NewNop(), component.NewDefaultBuildInfo())
3336
testCases := map[string]struct {
3437
filename string
3538
wantResourceCount int
@@ -93,7 +96,7 @@ func TestUnmarshal(t *testing.T) {
9396
}
9497

9598
func TestUnmarshal_SingleRecord(t *testing.T) {
96-
unmarshaler := NewUnmarshaler(zap.NewNop())
99+
unmarshaler := NewUnmarshaler(zap.NewNop(), component.NewDefaultBuildInfo())
97100

98101
record, err := os.ReadFile(filepath.Join("testdata", "single_record"))
99102
require.NoError(t, err)
@@ -112,6 +115,8 @@ func TestUnmarshal_SingleRecord(t *testing.T) {
112115
assert.Equal(t, conventions.AttributeCloudProviderAWS, cloudProvider.Str())
113116
require.Equal(t, 1, rm.ScopeMetrics().Len())
114117
sm := rm.ScopeMetrics().At(0)
118+
assert.Equal(t, metadata.ScopeName, sm.Scope().Name())
119+
assert.Equal(t, component.NewDefaultBuildInfo().Version, sm.Scope().Version())
115120

116121
require.Equal(t, 1, sm.Metrics().Len())
117122
metric := sm.Metrics().At(0)

receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ import (
88
"fmt"
99

1010
"github.com/gogo/protobuf/proto"
11+
"go.opentelemetry.io/collector/component"
1112
"go.opentelemetry.io/collector/pdata/pmetric"
1213
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
1314
"go.uber.org/zap"
15+
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
1417
)
1518

1619
const (
@@ -25,14 +28,15 @@ var errInvalidOTLPFormatStart = errors.New("unable to decode data length from me
2528
// More details can be found at:
2629
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html
2730
type Unmarshaler struct {
28-
logger *zap.Logger
31+
logger *zap.Logger
32+
buildInfo component.BuildInfo
2933
}
3034

3135
var _ pmetric.Unmarshaler = (*Unmarshaler)(nil)
3236

3337
// NewUnmarshaler creates a new instance of the Unmarshaler.
34-
func NewUnmarshaler(logger *zap.Logger) *Unmarshaler {
35-
return &Unmarshaler{logger}
38+
func NewUnmarshaler(logger *zap.Logger, buildInfo component.BuildInfo) *Unmarshaler {
39+
return &Unmarshaler{logger, buildInfo}
3640
}
3741

3842
// UnmarshalMetrics deserializes the recordsas a length-delimited sequence of
@@ -52,6 +56,14 @@ func (u Unmarshaler) UnmarshalMetrics(record []byte) (pmetric.Metrics, error) {
5256
if err != nil {
5357
return pmetric.Metrics{}, fmt.Errorf("unable to unmarshal input: %w", err)
5458
}
59+
for i := 0; i < req.Metrics().ResourceMetrics().Len(); i++ {
60+
rm := req.Metrics().ResourceMetrics().At(i)
61+
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
62+
sm := rm.ScopeMetrics().At(j)
63+
sm.Scope().SetName(metadata.ScopeName)
64+
sm.Scope().SetVersion(u.buildInfo.Version)
65+
}
66+
}
5567
req.Metrics().ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
5668
}
5769

0 commit comments

Comments
 (0)