Skip to content

Commit 6fa4ddc

Browse files
hgaolFiery-Fenix
authored andcommitted
[exporter/azureblob] support append blob for azure blob exporter (open-telemetry#39076)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Support append blob for azure blob exporter. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#39075 <!--Describe what testing was performed and which tests were added.--> #### Testing 1. unit test 2. e2e test using real azure blob account <!--Describe the documentation added.--> #### Documentation See README.md <!--Please delete paragraphs that you did not use before submitting.-->
1 parent e958b1e commit 6fa4ddc

File tree

8 files changed

+254
-12
lines changed

8 files changed

+254
-12
lines changed

.chloggen/39075.yaml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: azureblobexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: support for append blob in azure blob storage exporter
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39075]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/azureblobexporter/README.md

+12-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ The following settings can be optionally configured and have default values:
4040
- logs (default `nil`): encoding component id.
4141
- metrics (default `nil`): encoding component id.
4242
- traces (default `nil`): encoding component id.
43+
- append_blob: configures append blob behavior. When enabled, telemetry data is appended to a single blob instead of creating new blobs. This can be useful for aggregating data or reducing the number of blobs created.
44+
- enabled (default `false`): determines whether to use append blob mode.
45+
- separator (default `\n`): string to insert between appended data blocks.
4346
- `retry_on_failure`
4447
- `enabled` (default = true)
4548
- `initial_interval` (default = 5s): Time to wait after the first failure before retrying; ignored if `enabled` is `false`
@@ -67,7 +70,15 @@ exporter:
6770
auth:
6871
type: "connection_string"
6972
connection_string: "DefaultEndpointsProtocol=https;AccountName=<your-acount>;AccountKey=<account-key>;EndpointSuffix=core.windows.net"
70-
traces: "test"
7173
encodings:
7274
logs: text_encoding
75+
append_blob:
76+
enabled: true
77+
separator: "\n"
7378
```
79+
80+
When `append_blob` is enabled:
81+
- The exporter will create append blobs instead of block blobs
82+
- New data will be appended to existing blobs rather than creating new ones
83+
- The configured separator will be inserted between data blocks
84+
- If the blob doesn't exist, it will be created automatically

exporter/azureblobexporter/config.go

+8
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ type BlobNameFormat struct {
3434
Params map[string]string `mapstructure:"params"`
3535
}
3636

37+
type AppendBlob struct {
38+
Enabled bool `mapstructure:"enabled"`
39+
Separator string `mapstructure:"separator"`
40+
}
41+
3742
type Authentication struct {
3843
// Type is the authentication type. supported values are connection_string, service_principal, system_managed_identity and user_managed_identity
3944
Type AuthType `mapstructure:"type"`
@@ -75,6 +80,9 @@ type Config struct {
7580
// FormatType is the format of encoded telemetry data. Supported values are json and proto.
7681
FormatType string `mapstructure:"format"`
7782

83+
// AppendBlob configures append blob behavior
84+
AppendBlob *AppendBlob `mapstructure:"append_blob"`
85+
7886
// Encoding extension to apply for logs/metrics/traces. If present, overrides the marshaler configuration option and format.
7987
Encodings *Encodings `mapstructure:"encodings"`
8088

exporter/azureblobexporter/config_test.go

+16
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func TestLoadConfig(t *testing.T) {
5353
FormatType: "json",
5454
Encodings: &Encodings{},
5555
BackOffConfig: configretry.NewDefaultBackOffConfig(),
56+
AppendBlob: &AppendBlob{
57+
Enabled: false,
58+
Separator: "\n",
59+
},
5660
},
5761
},
5862
{
@@ -77,6 +81,10 @@ func TestLoadConfig(t *testing.T) {
7781
FormatType: "proto",
7882
Encodings: &Encodings{},
7983
BackOffConfig: configretry.NewDefaultBackOffConfig(),
84+
AppendBlob: &AppendBlob{
85+
Enabled: false,
86+
Separator: "\n",
87+
},
8088
},
8189
},
8290
{
@@ -102,6 +110,10 @@ func TestLoadConfig(t *testing.T) {
102110
FormatType: "json",
103111
Encodings: &Encodings{},
104112
BackOffConfig: configretry.NewDefaultBackOffConfig(),
113+
AppendBlob: &AppendBlob{
114+
Enabled: false,
115+
Separator: "\n",
116+
},
105117
},
106118
},
107119
{
@@ -126,6 +138,10 @@ func TestLoadConfig(t *testing.T) {
126138
FormatType: "json",
127139
Encodings: &Encodings{},
128140
BackOffConfig: configretry.NewDefaultBackOffConfig(),
141+
AppendBlob: &AppendBlob{
142+
Enabled: false,
143+
Separator: "\n",
144+
},
129145
},
130146
},
131147
{

exporter/azureblobexporter/exporter.go

+87-9
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@ package azureblobexporter // import "github.com/open-telemetry/opentelemetry-col
66
import (
77
"bytes"
88
"context"
9+
"errors"
910
"fmt"
1011
"io"
1112
"math/rand/v2"
1213
"time"
1314

15+
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
1416
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
1517
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
18+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
1619
"go.opentelemetry.io/collector/component"
1720
"go.opentelemetry.io/collector/consumer"
1821
"go.opentelemetry.io/collector/pdata/plog"
@@ -33,6 +36,45 @@ type azureBlobExporter struct {
3336
type azblobClient interface {
3437
UploadStream(ctx context.Context, containerName string, blobName string, body io.Reader, o *azblob.UploadStreamOptions) (azblob.UploadStreamResponse, error)
3538
URL() string
39+
AppendBlock(ctx context.Context, containerName string, blobName string, data []byte, o *appendblob.AppendBlockOptions) error
40+
}
41+
42+
type azblobClientImpl struct {
43+
client *azblob.Client
44+
}
45+
46+
func (c *azblobClientImpl) UploadStream(ctx context.Context, containerName string, blobName string, body io.Reader, o *azblob.UploadStreamOptions) (azblob.UploadStreamResponse, error) {
47+
return c.client.UploadStream(ctx, containerName, blobName, body, o)
48+
}
49+
50+
func (c *azblobClientImpl) URL() string {
51+
return c.client.URL()
52+
}
53+
54+
func (c *azblobClientImpl) AppendBlock(ctx context.Context, containerName string, blobName string, data []byte, o *appendblob.AppendBlockOptions) error {
55+
containerClient := c.client.ServiceClient().NewContainerClient(containerName)
56+
appendBlobClient := containerClient.NewAppendBlobClient(blobName)
57+
58+
_, err := appendBlobClient.AppendBlock(ctx, newReadSeekCloserWrapper(data), o)
59+
if err == nil {
60+
return nil
61+
}
62+
63+
// Handle BlobNotFound error by creating the blob and retrying
64+
var cerr *azcore.ResponseError
65+
if errors.As(err, &cerr) && cerr.ErrorCode == "BlobNotFound" {
66+
if _, err = appendBlobClient.Create(ctx, nil); err != nil {
67+
return fmt.Errorf("failed to create append blob: %w", err)
68+
}
69+
70+
_, err = appendBlobClient.AppendBlock(ctx, newReadSeekCloserWrapper(data), o)
71+
if err != nil {
72+
return fmt.Errorf("failed to append block after creation: %w", err)
73+
}
74+
return nil
75+
}
76+
77+
return fmt.Errorf("failed to append block: %w", err)
3678
}
3779

3880
func newAzureBlobExporter(config *Config, logger *zap.Logger, signal pipeline.Signal) *azureBlobExporter {
@@ -58,9 +100,11 @@ func (e *azureBlobExporter) start(_ context.Context, host component.Host) error
58100

59101
// create client based on auth type
60102
authType := e.config.Auth.Type
103+
azblobClient := &azblobClientImpl{}
104+
e.client = azblobClient
61105
switch authType {
62106
case ConnectionString:
63-
e.client, err = azblob.NewClientFromConnectionString(e.config.Auth.ConnectionString, nil)
107+
azblobClient.client, err = azblob.NewClientFromConnectionString(e.config.Auth.ConnectionString, nil)
64108
if err != nil {
65109
return fmt.Errorf("failed to create client from connection string: %w", err)
66110
}
@@ -73,7 +117,7 @@ func (e *azureBlobExporter) start(_ context.Context, host component.Host) error
73117
if err != nil {
74118
return fmt.Errorf("failed to create service principal credential: %w", err)
75119
}
76-
e.client, err = azblob.NewClient(e.config.URL, cred, nil)
120+
azblobClient.client, err = azblob.NewClient(e.config.URL, cred, nil)
77121
if err != nil {
78122
return fmt.Errorf("failed to create client with service principal: %w", err)
79123
}
@@ -82,7 +126,7 @@ func (e *azureBlobExporter) start(_ context.Context, host component.Host) error
82126
if err != nil {
83127
return fmt.Errorf("failed to create system managed identity credential: %w", err)
84128
}
85-
e.client, err = azblob.NewClient(e.config.URL, cred, nil)
129+
azblobClient.client, err = azblob.NewClient(e.config.URL, cred, nil)
86130
if err != nil {
87131
return fmt.Errorf("failed to create client with system managed identity: %w", err)
88132
}
@@ -93,7 +137,7 @@ func (e *azureBlobExporter) start(_ context.Context, host component.Host) error
93137
if err != nil {
94138
return fmt.Errorf("failed to create user managed identity credential: %w", err)
95139
}
96-
e.client, err = azblob.NewClient(e.config.URL, cred, nil)
140+
azblobClient.client, err = azblob.NewClient(e.config.URL, cred, nil)
97141
if err != nil {
98142
return fmt.Errorf("failed to create client with user managed identity: %w", err)
99143
}
@@ -118,7 +162,7 @@ func (e *azureBlobExporter) generateBlobName(signal pipeline.Signal) (string, er
118162
default:
119163
return "", fmt.Errorf("unsupported signal type: %v", signal)
120164
}
121-
return fmt.Sprintf("%s_%d", now.Format(format), randomInRange(1, int(e.config.BlobNameFormat.SerialNumRange))), nil
165+
return fmt.Sprintf("%s_%d", now.Format(format), randomInRange(0, int(e.config.BlobNameFormat.SerialNumRange))), nil
122166
}
123167

124168
func (e *azureBlobExporter) Capabilities() consumer.Capabilities {
@@ -162,17 +206,51 @@ func (e *azureBlobExporter) consumeData(ctx context.Context, data []byte, signal
162206
return fmt.Errorf("failed to generate blobname: %w", err)
163207
}
164208

165-
blobContentReader := bytes.NewReader(data)
166-
_, err = e.client.UploadStream(ctx, e.config.Container.Traces, blobName, blobContentReader, nil)
209+
var containerName string
210+
switch signal {
211+
case pipeline.SignalMetrics:
212+
containerName = e.config.Container.Metrics
213+
case pipeline.SignalLogs:
214+
containerName = e.config.Container.Logs
215+
case pipeline.SignalTraces:
216+
containerName = e.config.Container.Traces
217+
default:
218+
return fmt.Errorf("unsupported signal type: %v", signal)
219+
}
220+
221+
if e.config.AppendBlob != nil && e.config.AppendBlob.Enabled {
222+
// Add separator if configured
223+
if e.config.AppendBlob.Separator != "" {
224+
data = append(data, []byte(e.config.AppendBlob.Separator)...)
225+
}
226+
err = e.client.AppendBlock(ctx, containerName, blobName, data, nil)
227+
} else {
228+
blobContentReader := bytes.NewReader(data)
229+
_, err = e.client.UploadStream(ctx, containerName, blobName, blobContentReader, nil)
230+
}
231+
167232
if err != nil {
168-
return fmt.Errorf("failed to upload traces data: %w", err)
233+
return fmt.Errorf("failed to upload data: %w", err)
169234
}
170235

171236
e.logger.Debug("Successfully exported data to Azure Blob Storage",
172237
zap.String("account", e.client.URL()),
173-
zap.String("container", e.config.Container.Traces),
238+
zap.String("container", containerName),
174239
zap.String("blob", blobName),
175240
zap.Int("size", len(data)))
176241

177242
return nil
178243
}
244+
245+
func newReadSeekCloserWrapper(data []byte) *readSeekCloserWrapper {
246+
return &readSeekCloserWrapper{bytes.NewReader(data)}
247+
}
248+
249+
// readSeekCloserWrapper wraps a bytes.Reader to implement io.ReadSeekCloser
250+
type readSeekCloserWrapper struct {
251+
*bytes.Reader
252+
}
253+
254+
func (r readSeekCloserWrapper) Close() error {
255+
return nil
256+
}

0 commit comments

Comments
 (0)