Skip to content

Commit 2493b6e

Browse files
authored
[chore] Move code from exporter/internal to exporterhelper/internal (#12342)
This PR is a preparation to simplify dependency between "exporter" module and the new "exporterhelper" module that will be created next. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent fc644ed commit 2493b6e

30 files changed

+167
-149
lines changed

exporter/exporterhelper/exporterhelper.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,19 @@
33

44
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"
55

6-
import "go.opentelemetry.io/collector/exporter/internal"
6+
import (
7+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
8+
)
79

810
// Request represents a single request that can be sent to an external endpoint.
911
// Experimental: This API is at the early stage of development and may change without backward compatibility
1012
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
11-
type Request = internal.Request
13+
type Request = request.Request
1214

1315
// RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial
1416
// temporary failures. For example, if some items failed to process and can be retried, this interface allows to
1517
// return a new Request that contains the items left to be sent. Otherwise, the original Request should be returned.
1618
// If not implemented, the original Request will be returned assuming the error is applied to the whole Request.
1719
// Experimental: This API is at the early stage of development and may change without backward compatibility
1820
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
19-
type RequestErrorHandler = internal.RequestErrorHandler
21+
type RequestErrorHandler = request.RequestErrorHandler

exporter/exporterhelper/internal/base_exporter.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ import (
1919
"go.opentelemetry.io/collector/consumer"
2020
"go.opentelemetry.io/collector/exporter"
2121
"go.opentelemetry.io/collector/exporter/exporterbatcher"
22+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2223
"go.opentelemetry.io/collector/exporter/exporterqueue" // BaseExporter contains common fields between different exporter types.
23-
"go.opentelemetry.io/collector/exporter/internal"
2424
"go.opentelemetry.io/collector/pipeline"
2525
)
2626

@@ -31,8 +31,8 @@ type BaseExporter struct {
3131
component.StartFunc
3232
component.ShutdownFunc
3333

34-
Marshaler exporterqueue.Marshaler[internal.Request]
35-
Unmarshaler exporterqueue.Unmarshaler[internal.Request]
34+
Marshaler exporterqueue.Marshaler[request.Request]
35+
Unmarshaler exporterqueue.Unmarshaler[request.Request]
3636

3737
Set exporter.Settings
3838

@@ -42,17 +42,17 @@ type BaseExporter struct {
4242
// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
4343
// The data is handled by each sender in the respective order starting from the queueSender.
4444
// Most of the senders are optional, and initialized with a no-op path-through sender.
45-
QueueSender Sender[internal.Request]
46-
ObsrepSender Sender[internal.Request]
47-
RetrySender Sender[internal.Request]
45+
QueueSender Sender[request.Request]
46+
ObsrepSender Sender[request.Request]
47+
RetrySender Sender[request.Request]
4848

49-
firstSender Sender[internal.Request]
49+
firstSender Sender[request.Request]
5050

5151
ConsumerOptions []consumer.Option
5252

5353
timeoutCfg TimeoutConfig
5454
retryCfg configretry.BackOffConfig
55-
queueFactory exporterqueue.Factory[internal.Request]
55+
queueFactory exporterqueue.Factory[request.Request]
5656
queueCfg exporterqueue.Config
5757
batcherCfg exporterbatcher.Config
5858
}
@@ -61,7 +61,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O
6161
be := &BaseExporter{
6262
Set: set,
6363
timeoutCfg: NewDefaultTimeoutConfig(),
64-
queueFactory: exporterqueue.NewMemoryQueueFactory[internal.Request](),
64+
queueFactory: exporterqueue.NewMemoryQueueFactory[request.Request](),
6565
}
6666

6767
for _, op := range options {
@@ -105,7 +105,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O
105105
}
106106

107107
// Send sends the request using the first sender in the chain.
108-
func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error {
108+
func (be *BaseExporter) Send(ctx context.Context, req request.Request) error {
109109
// Have to read the number of items before sending the request since the request can
110110
// be modified by the downstream components like the batcher.
111111
itemsCount := req.ItemsCount()
@@ -206,7 +206,7 @@ func WithQueue(config QueueConfig) Option {
206206
QueueSize: config.QueueSize,
207207
Blocking: config.Blocking,
208208
}
209-
o.queueFactory = exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{
209+
o.queueFactory = exporterqueue.NewPersistentQueueFactory[request.Request](config.StorageID, exporterqueue.PersistentQueueSettings[request.Request]{
210210
Marshaler: o.Marshaler,
211211
Unmarshaler: o.Unmarshaler,
212212
})
@@ -218,7 +218,7 @@ func WithQueue(config QueueConfig) Option {
218218
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
219219
// Experimental: This API is at the early stage of development and may change without backward compatibility
220220
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
221-
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[internal.Request]) Option {
221+
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[request.Request]) Option {
222222
return func(o *BaseExporter) error {
223223
if o.Marshaler != nil || o.Unmarshaler != nil {
224224
return errors.New("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead")
@@ -257,7 +257,7 @@ func WithBatcher(cfg exporterbatcher.Config) Option {
257257

258258
// WithMarshaler is used to set the request marshaler for the new exporter helper.
259259
// It must be provided as the first option when creating a new exporter helper.
260-
func WithMarshaler(marshaler exporterqueue.Marshaler[internal.Request]) Option {
260+
func WithMarshaler(marshaler exporterqueue.Marshaler[request.Request]) Option {
261261
return func(o *BaseExporter) error {
262262
o.Marshaler = marshaler
263263
return nil
@@ -266,7 +266,7 @@ func WithMarshaler(marshaler exporterqueue.Marshaler[internal.Request]) Option {
266266

267267
// WithUnmarshaler is used to set the request unmarshaler for the new exporter helper.
268268
// It must be provided as the first option when creating a new exporter helper.
269-
func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Option {
269+
func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[request.Request]) Option {
270270
return func(o *BaseExporter) error {
271271
o.Unmarshaler = unmarshaler
272272
return nil

exporter/exporterhelper/internal/base_exporter_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ import (
1818
"go.opentelemetry.io/collector/config/configretry"
1919
"go.opentelemetry.io/collector/exporter"
2020
"go.opentelemetry.io/collector/exporter/exporterbatcher"
21+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
22+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2123
"go.opentelemetry.io/collector/exporter/exporterqueue"
2224
"go.opentelemetry.io/collector/exporter/exportertest"
23-
"go.opentelemetry.io/collector/exporter/internal"
24-
"go.opentelemetry.io/collector/exporter/internal/requesttest"
2525
"go.opentelemetry.io/collector/pipeline"
2626
)
2727

@@ -39,11 +39,11 @@ var (
3939
type noopSender struct {
4040
component.StartFunc
4141
component.ShutdownFunc
42-
SendFunc[internal.Request]
42+
SendFunc[request.Request]
4343
}
4444

45-
func newNoopExportSender() Sender[internal.Request] {
46-
return &noopSender{SendFunc: func(ctx context.Context, req internal.Request) error {
45+
func newNoopExportSender() Sender[request.Request] {
46+
return &noopSender{SendFunc: func(ctx context.Context, req request.Request) error {
4747
select {
4848
case <-ctx.Done():
4949
return ctx.Err() // Returns the cancellation error
@@ -103,7 +103,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
103103
_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal,
104104
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
105105
WithRetry(configretry.NewDefaultBackOffConfig()),
106-
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]()))
106+
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[request.Request]()))
107107
require.Error(t, err)
108108
})
109109
}
@@ -123,7 +123,7 @@ func TestBaseExporterLogging(t *testing.T) {
123123
qCfg := exporterqueue.NewDefaultConfig()
124124
qCfg.Enabled = false
125125
bs, err := NewBaseExporter(set, defaultSignal,
126-
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()),
126+
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[request.Request]()),
127127
WithBatcher(exporterbatcher.NewDefaultConfig()),
128128
WithRetry(rCfg))
129129
require.NoError(t, err)
@@ -172,7 +172,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
172172
func() Option {
173173
qs := exporterqueue.NewDefaultConfig()
174174
qs.Enabled = false
175-
return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[internal.Request]())
175+
return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[request.Request]())
176176
}(),
177177
func() Option {
178178
bs := exporterbatcher.NewDefaultConfig()

exporter/exporterhelper/internal/batch_sender.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/component"
1515
"go.opentelemetry.io/collector/exporter"
1616
"go.opentelemetry.io/collector/exporter/exporterbatcher"
17-
"go.opentelemetry.io/collector/exporter/internal"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1818
)
1919

2020
// BatchSender is a component that places requests into batches before passing them to the downstream senders.
@@ -24,7 +24,7 @@ import (
2424
// - concurrencyLimit is reached.
2525
type BatchSender struct {
2626
cfg exporterbatcher.Config
27-
next Sender[internal.Request]
27+
next Sender[request.Request]
2828

2929
// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
3030
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
@@ -44,7 +44,7 @@ type BatchSender struct {
4444
}
4545

4646
// NewBatchSender returns a new batch consumer component.
47-
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings, concurrencyLimit int64, next Sender[internal.Request]) *BatchSender {
47+
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings, concurrencyLimit int64, next Sender[request.Request]) *BatchSender {
4848
bs := &BatchSender{
4949
cfg: cfg,
5050
next: next,
@@ -101,7 +101,7 @@ func (bs *BatchSender) Start(_ context.Context, _ component.Host) error {
101101

102102
type batch struct {
103103
ctx context.Context
104-
request internal.Request
104+
request request.Request
105105
done chan struct{}
106106
err error
107107

@@ -137,7 +137,7 @@ func (bs *BatchSender) isActiveBatchReady() bool {
137137
(bs.concurrencyLimit > 0 && bs.activeRequests.Load() >= bs.concurrencyLimit)
138138
}
139139

140-
func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error {
140+
func (bs *BatchSender) Send(ctx context.Context, req request.Request) error {
141141
// Stopped batch sender should act as pass-through to allow the queue to be drained.
142142
if bs.stopped.Load() {
143143
return bs.next.Send(ctx, req)
@@ -150,10 +150,10 @@ func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error {
150150
}
151151

152152
// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests.
153-
func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Request) error {
153+
func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req request.Request) error {
154154
bs.mu.Lock()
155155

156-
var reqs []internal.Request
156+
var reqs []request.Request
157157
var mergeSplitErr error
158158
if bs.activeBatch.request == nil {
159159
reqs, mergeSplitErr = req.MergeSplit(ctx, bs.cfg.MaxSizeConfig, nil)
@@ -200,7 +200,7 @@ func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Req
200200
}
201201

202202
// sendMergeBatch sends the request to the batch and waits for the batch to be exported.
203-
func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request) error {
203+
func (bs *BatchSender) sendMergeBatch(ctx context.Context, req request.Request) error {
204204
bs.mu.Lock()
205205

206206
if bs.activeBatch.request != nil {
@@ -228,7 +228,7 @@ func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request)
228228
// The context is only set once and is not updated after the first call.
229229
// Merging the context would be complex and require an additional goroutine to handle the context cancellation.
230230
// We take the approach of using the context from the first request since it's likely to have the shortest timeout.
231-
func (bs *BatchSender) updateActiveBatch(ctx context.Context, req internal.Request) {
231+
func (bs *BatchSender) updateActiveBatch(ctx context.Context, req request.Request) {
232232
if bs.activeBatch.request == nil {
233233
bs.activeBatch.ctx = ctx
234234
}

exporter/exporterhelper/internal/batch_sender_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import (
1616

1717
"go.opentelemetry.io/collector/component/componenttest"
1818
"go.opentelemetry.io/collector/exporter/exporterbatcher"
19+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
1920
"go.opentelemetry.io/collector/exporter/exporterqueue"
20-
"go.opentelemetry.io/collector/exporter/internal/requesttest"
2121
)
2222

2323
func TestBatchSender_Merge(t *testing.T) {

exporter/internal/queue/batcher.go renamed to exporter/exporterhelper/internal/batcher/batcher.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"
4+
package batcher // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
55

66
import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
1010
"go.opentelemetry.io/collector/exporter/exporterbatcher"
11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1112
"go.opentelemetry.io/collector/exporter/exporterqueue"
12-
"go.opentelemetry.io/collector/exporter/internal"
1313
)
1414

1515
// Batcher is in charge of reading items from the queue and send them out asynchronously.
1616
type Batcher interface {
1717
component.Component
18-
Consume(context.Context, internal.Request, exporterqueue.Done)
18+
Consume(context.Context, request.Request, exporterqueue.Done)
1919
}
2020

2121
func NewBatcher(batchCfg exporterbatcher.Config,
22-
exportFunc func(ctx context.Context, req internal.Request) error,
22+
exportFunc func(ctx context.Context, req request.Request) error,
2323
maxWorkers int,
2424
) (Batcher, error) {
2525
if !batchCfg.Enabled {

exporter/internal/queue/default_batcher.go renamed to exporter/exporterhelper/internal/batcher/default_batcher.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"
4+
package batcher // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
55

66
import (
77
"context"
@@ -12,21 +12,21 @@ import (
1212

1313
"go.opentelemetry.io/collector/component"
1414
"go.opentelemetry.io/collector/exporter/exporterbatcher"
15+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1516
"go.opentelemetry.io/collector/exporter/exporterqueue"
16-
"go.opentelemetry.io/collector/exporter/internal"
1717
)
1818

1919
type batch struct {
2020
ctx context.Context
21-
req internal.Request
21+
req request.Request
2222
done multiDone
2323
}
2424

2525
// defaultBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
2626
type defaultBatcher struct {
2727
batchCfg exporterbatcher.Config
2828
workerPool chan struct{}
29-
exportFunc func(ctx context.Context, req internal.Request) error
29+
exportFunc func(ctx context.Context, req request.Request) error
3030
stopWG sync.WaitGroup
3131
currentBatchMu sync.Mutex
3232
currentBatch *batch
@@ -35,7 +35,7 @@ type defaultBatcher struct {
3535
}
3636

3737
func newDefaultBatcher(batchCfg exporterbatcher.Config,
38-
exportFunc func(ctx context.Context, req internal.Request) error,
38+
exportFunc func(ctx context.Context, req request.Request) error,
3939
maxWorkers int,
4040
) *defaultBatcher {
4141
// TODO: Determine what is the right behavior for this in combination with async queue.
@@ -61,10 +61,10 @@ func (qb *defaultBatcher) resetTimer() {
6161
}
6262
}
6363

64-
func (qb *defaultBatcher) Consume(ctx context.Context, req internal.Request, done exporterqueue.Done) {
64+
func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done exporterqueue.Done) {
6565
qb.currentBatchMu.Lock()
6666

67-
var reqList []internal.Request
67+
var reqList []request.Request
6868
var mergeSplitErr error
6969
if qb.currentBatch == nil {
7070
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
@@ -201,7 +201,7 @@ func (qb *defaultBatcher) flushCurrentBatchIfNecessary() {
201201
}
202202

203203
// flush starts a goroutine that calls exportFunc. It blocks until a worker is available if necessary.
204-
func (qb *defaultBatcher) flush(ctx context.Context, req internal.Request, done exporterqueue.Done) {
204+
func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done exporterqueue.Done) {
205205
qb.stopWG.Add(1)
206206
if qb.workerPool != nil {
207207
<-qb.workerPool

0 commit comments

Comments
 (0)