Skip to content

Commit f9f40d8

Browse files
yshenglivincentfree
authored andcommitted
feat: add support for mapping s3 bucket prefix to OTel resource attributes (open-telemetry#39634)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Introduced the `resource_attrs_to_s3/s3_prefix` configuration field to enable mapping the S3 bucket prefix to an OpenTelemetry resource attribute. When this field is set, the S3 prefix is determined by the specified resource attribute, with `s3uploader/s3_prefix` used as a fallback if the attribute is missing or empty. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#37858 <!--Describe what testing was performed and which tests were added.--> #### Testing - Added unit tests to verify the functionality of the related code changes - Built the OpenTelemetry Collector locally to ensure the component behaves as intended <!--Describe the documentation added.--> #### Documentation Updated the README to document the new configuration field: - Described its impact on the component’s behavior - Clarified how it interacts with the `s3uploader/s3_prefix` setting - Included an example demonstrating how to use it <!--Please delete paragraphs that you did not use before submitting.-->
1 parent d1d5f8a commit f9f40d8

14 files changed

+392
-45
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awss3exporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: add configuration field `resource_attrs_to_s3/s3_prefix` to support mapping s3 bucket prefix to OTel resource attributes
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37858]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: If `resource_attrs_to_s3/s3_prefix` is configured, s3 prefix will be determined based on the specified resource attribute and `s3uploader/s3_prefix` will serve as a fallback.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user, api]

exporter/awss3exporter/README.md

+32-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ The following exporter configuration parameters are supported.
3838
| `compression` | should the file be compressed | none |
3939
| `sending_queue` | [exporters common queuing](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) | disabled |
4040
| `timeout` | [exporters common timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) | 5s |
41-
41+
| `resource_attrs_to_s3` | determines the mapping of S3 configuration values to resource attribute values for uploading operations. | |
4242

4343
### Marshaler
4444

@@ -61,6 +61,12 @@ See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/
6161
- `none` (default): No compression will be applied
6262
- `gzip`: Files will be compressed with gzip. **This does not support `sumo_ic`marshaler.**
6363

64+
### resource_attrs_to_s3
65+
- `s3_prefix`: Defines which resource attribute's value should be used as the S3 prefix.
66+
When this option is set, it dynamically overrides `s3uploader/s3_prefix`.
67+
If the specified resource attribute exists in the data,
68+
its value will be used as the prefix; otherwise, `s3uploader/s3_prefix` will serve as the fallback.
69+
6470
# Example Configurations
6571

6672
Following example configuration defines to store output in 'eu-central' region and bucket named 'databucket'.
@@ -110,6 +116,31 @@ In this case, logs and traces would be stored in the following path format.
110116
metric/YYYY/MM/DD/HH/mm
111117
```
112118

119+
## Data routing based on resource attributes
120+
When `resource_attrs_to_s3/s3_prefix` is configured, the S3 prefix is dynamically derived from a specified resource attribute in your data.
121+
If the attribute value is unavailable, the prefix will fall back to the value defined in `s3uploader/s3_prefix`.
122+
```yaml
123+
exporters:
124+
awss3:
125+
s3uploader:
126+
region: 'eu-central-1'
127+
s3_bucket: 'databucket'
128+
s3_prefix: 'metric'
129+
s3_partition_format: '%Y/%m/%d/%H/%M'
130+
resource_attrs_to_s3:
131+
s3_prefix: "com.awss3.prefix"
132+
```
133+
In this case, metrics, logs and traces would be stored in the following path format examples:
134+
135+
```console
136+
prefix1/YYYY/MM/DD/HH/mm
137+
foo-prefix/YYYY/MM/DD/HH/mm
138+
prefix-bar/YYYY/MM/DD/HH/mm
139+
metric/YYYY/MM/DD/HH/mm
140+
...
141+
```
142+
143+
113144
## AWS Credential Configuration
114145

115146
This exporter follows default credential resolution for the

exporter/awss3exporter/config.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ const (
5151
Body MarshalerType = "body"
5252
)
5353

54+
// ResourceAttrsToS3 defines the mapping of S3 uploading configuration values to resource attribute values.
55+
type ResourceAttrsToS3 struct {
56+
// S3Prefix indicates the mapping of the key (directory) prefix used for writing into the bucket to a specific resource attribute value.
57+
S3Prefix string `mapstructure:"s3_prefix"`
58+
}
59+
5460
// Config contains the main configuration options for the s3 exporter
5561
type Config struct {
5662
QueueSettings exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"`
@@ -59,8 +65,9 @@ type Config struct {
5965
MarshalerName MarshalerType `mapstructure:"marshaler"`
6066

6167
// Encoding to apply. If present, overrides the marshaler configuration option.
62-
Encoding *component.ID `mapstructure:"encoding"`
63-
EncodingFileExtension string `mapstructure:"encoding_file_extension"`
68+
Encoding *component.ID `mapstructure:"encoding"`
69+
EncodingFileExtension string `mapstructure:"encoding_file_extension"`
70+
ResourceAttrsToS3 ResourceAttrsToS3 `mapstructure:"resource_attrs_to_s3"`
6471
}
6572

6673
func (c *Config) Validate() error {

exporter/awss3exporter/config_test.go

+40-2
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func TestConfigS3ACL(t *testing.T) {
135135
factories.Exporters[factory.Type()] = factory
136136
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594
137137
cfg, err := otelcoltest.LoadConfigAndValidate(
138-
filepath.Join("testdata", "config-s3_storage_class.yaml"), factories)
138+
filepath.Join("testdata", "config-s3_acl.yaml"), factories)
139139

140140
require.NoError(t, err)
141141
require.NotNil(t, cfg)
@@ -152,7 +152,8 @@ func TestConfigS3ACL(t *testing.T) {
152152
S3Prefix: "bar",
153153
S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M",
154154
Endpoint: "http://endpoint.com",
155-
StorageClass: "STANDARD_IA",
155+
StorageClass: "STANDARD",
156+
ACL: "bucket-owner-read",
156157
},
157158
QueueSettings: queueCfg,
158159
TimeoutSettings: timeoutCfg,
@@ -408,3 +409,40 @@ func TestCompressionName(t *testing.T) {
408409
}, e,
409410
)
410411
}
412+
413+
func TestResourceAttrsToS3(t *testing.T) {
414+
factories, err := otelcoltest.NopFactories()
415+
assert.NoError(t, err)
416+
417+
factory := NewFactory()
418+
factories.Exporters[factory.Type()] = factory
419+
cfg, err := otelcoltest.LoadConfigAndValidate(
420+
filepath.Join("testdata", "config-s3_resource-attrs-to-s3.yaml"), factories)
421+
422+
require.NoError(t, err)
423+
require.NotNil(t, cfg)
424+
425+
queueCfg := exporterhelper.NewDefaultQueueConfig()
426+
queueCfg.Enabled = false
427+
timeoutCfg := exporterhelper.NewDefaultTimeoutConfig()
428+
429+
e := cfg.Exporters[component.MustNewID("awss3")].(*Config)
430+
431+
assert.Equal(t, &Config{
432+
QueueSettings: queueCfg,
433+
TimeoutSettings: timeoutCfg,
434+
S3Uploader: S3UploaderConfig{
435+
Region: "us-east-1",
436+
S3Bucket: "foo",
437+
S3Prefix: "bar",
438+
S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M",
439+
Endpoint: "http://endpoint.com",
440+
StorageClass: "STANDARD",
441+
},
442+
MarshalerName: "otlp_json",
443+
ResourceAttrsToS3: ResourceAttrsToS3{
444+
S3Prefix: "com.awss3.prefix",
445+
},
446+
}, e,
447+
)
448+
}

exporter/awss3exporter/exporter.go

+22-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"go.opentelemetry.io/collector/component"
1111
"go.opentelemetry.io/collector/consumer"
1212
"go.opentelemetry.io/collector/exporter"
13+
"go.opentelemetry.io/collector/pdata/pcommon"
1314
"go.opentelemetry.io/collector/pdata/plog"
1415
"go.opentelemetry.io/collector/pdata/pmetric"
1516
"go.opentelemetry.io/collector/pdata/ptrace"
@@ -39,6 +40,19 @@ func newS3Exporter(
3940
return s3Exporter
4041
}
4142

43+
func (e *s3Exporter) getUploadOpts(res pcommon.Resource) *upload.UploadOptions {
44+
s3Prefix := ""
45+
if s3PrefixKey := e.config.ResourceAttrsToS3.S3Prefix; s3PrefixKey != "" {
46+
if value, ok := res.Attributes().Get(s3PrefixKey); ok {
47+
s3Prefix = value.AsString()
48+
}
49+
}
50+
uploadOpts := &upload.UploadOptions{
51+
OverridePrefix: s3Prefix,
52+
}
53+
return uploadOpts
54+
}
55+
4256
func (e *s3Exporter) start(ctx context.Context, host component.Host) error {
4357
var m marshaler
4458
var err error
@@ -72,7 +86,8 @@ func (e *s3Exporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) err
7286
return err
7387
}
7488

75-
return e.uploader.Upload(ctx, buf)
89+
uploadOpts := e.getUploadOpts(md.ResourceMetrics().At(0).Resource())
90+
return e.uploader.Upload(ctx, buf, uploadOpts)
7691
}
7792

7893
func (e *s3Exporter) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
@@ -81,7 +96,9 @@ func (e *s3Exporter) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
8196
return err
8297
}
8398

84-
return e.uploader.Upload(ctx, buf)
99+
uploadOpts := e.getUploadOpts(logs.ResourceLogs().At(0).Resource())
100+
101+
return e.uploader.Upload(ctx, buf, uploadOpts)
85102
}
86103

87104
func (e *s3Exporter) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
@@ -90,5 +107,7 @@ func (e *s3Exporter) ConsumeTraces(ctx context.Context, traces ptrace.Traces) er
90107
return err
91108
}
92109

93-
return e.uploader.Upload(ctx, buf)
110+
uploadOpts := e.getUploadOpts(traces.ResourceSpans().At(0).Resource())
111+
112+
return e.uploader.Upload(ctx, buf, uploadOpts)
94113
}

exporter/awss3exporter/exporter_test.go

+39-2
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,29 @@ package awss3exporter
55

66
import (
77
"context"
8+
"fmt"
89
"testing"
910

1011
"github.com/stretchr/testify/assert"
1112
"go.opentelemetry.io/collector/pdata/plog"
1213
"go.uber.org/zap"
14+
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter/internal/upload"
1316
)
1417

15-
var testLogs = []byte(`{"resourceLogs":[{"resource":{"attributes":[{"key":"_sourceCategory","value":{"stringValue":"logfile"}},{"key":"_sourceHost","value":{"stringValue":"host"}}]},"scopeLogs":[{"scope":{},"logRecords":[{"observedTimeUnixNano":"1654257420681895000","body":{"stringValue":"2022-06-03 13:57:00.62739 +0200 CEST m=+14.018296742 log entry14"},"attributes":[{"key":"log.file.path_resolved","value":{"stringValue":"logwriter/data.log"}}],"traceId":"","spanId":""}]}],"schemaUrl":"https://opentelemetry.io/schemas/1.6.1"}]}`)
18+
var (
19+
s3PrefixKey = "_sourceHost"
20+
overridePrefix = "host"
21+
testLogs = []byte(fmt.Sprintf(`{"resourceLogs":[{"resource":{"attributes":[{"key":"_sourceCategory","value":{"stringValue":"logfile"}},{"key":"%s","value":{"stringValue":"%s"}}]},"scopeLogs":[{"scope":{},"logRecords":[{"observedTimeUnixNano":"1654257420681895000","body":{"stringValue":"2022-06-03 13:57:00.62739 +0200 CEST m=+14.018296742 log entry14"},"attributes":[{"key":"log.file.path_resolved","value":{"stringValue":"logwriter/data.log"}}],"traceId":"","spanId":""}]}],"schemaUrl":"https://opentelemetry.io/schemas/1.6.1"}]}`, s3PrefixKey, overridePrefix))
22+
)
1623

1724
type TestWriter struct {
1825
t *testing.T
1926
}
2027

21-
func (testWriter *TestWriter) Upload(_ context.Context, buf []byte) error {
28+
func (testWriter *TestWriter) Upload(_ context.Context, buf []byte, uploadOpts *upload.UploadOptions) error {
2229
assert.Equal(testWriter.t, testLogs, buf)
30+
assert.Equal(testWriter.t, &upload.UploadOptions{OverridePrefix: ""}, uploadOpts)
2331
return nil
2432
}
2533

@@ -46,3 +54,32 @@ func TestLog(t *testing.T) {
4654
exporter := getLogExporter(t)
4755
assert.NoError(t, exporter.ConsumeLogs(context.Background(), logs))
4856
}
57+
58+
type TestWriterWithResourceAttrs struct {
59+
t *testing.T
60+
}
61+
62+
func (testWriterWO *TestWriterWithResourceAttrs) Upload(_ context.Context, buf []byte, uploadOpts *upload.UploadOptions) error {
63+
assert.Equal(testWriterWO.t, testLogs, buf)
64+
assert.Equal(testWriterWO.t, &upload.UploadOptions{OverridePrefix: overridePrefix}, uploadOpts)
65+
return nil
66+
}
67+
68+
func getLogExporterWithResourceAttrs(t *testing.T) *s3Exporter {
69+
marshaler, _ := newMarshaler("otlp_json", zap.NewNop())
70+
config := createDefaultConfig().(*Config)
71+
config.ResourceAttrsToS3.S3Prefix = s3PrefixKey
72+
exporter := &s3Exporter{
73+
config: config,
74+
uploader: &TestWriterWithResourceAttrs{t},
75+
logger: zap.NewNop(),
76+
marshaler: marshaler,
77+
}
78+
return exporter
79+
}
80+
81+
func TestLogWithResourceAttrs(t *testing.T) {
82+
logs := getTestLogs(t)
83+
exporter := getLogExporterWithResourceAttrs(t)
84+
assert.NoError(t, exporter.ConsumeLogs(context.Background(), logs))
85+
}

0 commit comments

Comments
 (0)