Skip to content

Commit f278630

Browse files
committed
Cleanup initialization order for exporterhelper
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 5f95f72 commit f278630

11 files changed

+170
-127
lines changed

.chloggen/clenup-initialization.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix bug that the exporter with new batcher may have been marked as non mutation.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12239]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Only affects users that manually turned on `exporter.UsePullingBasedExporterQueueBatcher` featuregate.
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: []

exporter/exporterhelper/internal/base_exporter.go

+73-61
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegis
3232
featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"),
3333
)
3434

35-
type ObsrepSenderFactory = func(obsrep *ObsReport) Sender[internal.Request]
35+
type ObsrepSenderFactory = func(obsrep *ObsReport, next Sender[internal.Request]) Sender[internal.Request]
3636

3737
// Option apply changes to BaseExporter.
3838
type Option func(*BaseExporter) error
@@ -52,17 +52,20 @@ type BaseExporter struct {
5252
// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
5353
// The data is handled by each sender in the respective order starting from the queueSender.
5454
// Most of the senders are optional, and initialized with a no-op path-through sender.
55-
BatchSender Sender[internal.Request]
56-
QueueSender Sender[internal.Request]
57-
ObsrepSender Sender[internal.Request]
58-
RetrySender Sender[internal.Request]
59-
TimeoutSender *TimeoutSender // TimeoutSender is always initialized.
55+
BatchSender Sender[internal.Request]
56+
QueueSender Sender[internal.Request]
57+
ObsrepSender Sender[internal.Request]
58+
RetrySender Sender[internal.Request]
59+
60+
firstSender Sender[internal.Request]
6061

6162
ConsumerOptions []consumer.Option
6263

63-
queueCfg exporterqueue.Config
64+
timeoutCfg TimeoutConfig
65+
retryCfg configretry.BackOffConfig
6466
queueFactory exporterqueue.Factory[internal.Request]
65-
BatcherCfg exporterbatcher.Config
67+
queueCfg exporterqueue.Config
68+
batcherCfg exporterbatcher.Config
6669
}
6770

6871
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
@@ -72,49 +75,50 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
7275
}
7376

7477
be := &BaseExporter{
75-
BatchSender: &BaseSender[internal.Request]{},
76-
QueueSender: &BaseSender[internal.Request]{},
77-
ObsrepSender: osf(obsReport),
78-
RetrySender: &BaseSender[internal.Request]{},
79-
TimeoutSender: &TimeoutSender{cfg: NewDefaultTimeoutConfig()},
80-
81-
Set: set,
78+
timeoutCfg: NewDefaultTimeoutConfig(),
79+
Set: set,
8280
}
8381

8482
for _, op := range options {
85-
err = multierr.Append(err, op(be))
83+
if err = op(be); err != nil {
84+
return nil, err
85+
}
8686
}
87-
if err != nil {
88-
return nil, err
87+
88+
// TimeoutSender is always initialized.
89+
be.firstSender = &TimeoutSender{cfg: be.timeoutCfg}
90+
if be.retryCfg.Enabled {
91+
be.RetrySender = newRetrySender(be.retryCfg, set, be.firstSender)
92+
be.firstSender = be.RetrySender
93+
}
94+
95+
be.ObsrepSender = osf(obsReport, be.firstSender)
96+
be.firstSender = be.ObsrepSender
97+
98+
if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled ||
99+
usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled && !be.queueCfg.Enabled {
100+
bs := NewBatchSender(be.batcherCfg, set, be.firstSender)
101+
if be.queueCfg.Enabled {
102+
bs.concurrencyLimit = int64(be.queueCfg.NumConsumers)
103+
}
104+
be.BatchSender = bs
105+
be.firstSender = be.BatchSender
89106
}
90107

91108
if be.queueCfg.Enabled {
92109
qSet := exporterqueue.Settings{
93110
Signal: signal,
94-
ExporterSettings: be.Set,
111+
ExporterSettings: set,
95112
}
96-
q := be.queueFactory(context.Background(), qSet, be.queueCfg)
97-
q, err = newObsQueue(qSet, q)
113+
be.QueueSender, err = NewQueueSender(be.queueFactory, qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
98114
if err != nil {
99115
return nil, err
100116
}
101-
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.BatcherCfg)
102-
}
103-
104-
if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
105-
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
106-
bs := NewBatchSender(be.BatcherCfg, be.Set)
107-
be.BatchSender = bs
117+
be.firstSender = be.QueueSender
108118
}
109119

110-
be.connectSenders()
111-
112-
if bs, ok := be.BatchSender.(*BatchSender); ok {
113-
// If queue sender is enabled assign to the batch sender the same number of workers.
114-
if qs, ok := be.QueueSender.(*QueueSender); ok {
115-
bs.concurrencyLimit = int64(qs.numConsumers)
116-
}
117-
// Batcher sender mutates the data.
120+
if be.batcherCfg.Enabled {
121+
// Batcher mutates the data.
118122
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
119123
}
120124

@@ -123,47 +127,55 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
123127

124128
// Send sends the request using the first sender in the chain.
125129
func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error {
126-
err := be.QueueSender.Send(ctx, req)
130+
err := be.firstSender.Send(ctx, req)
127131
if err != nil {
128132
be.Set.Logger.Error("Exporting failed. Rejecting data."+be.ExportFailureMessage,
129133
zap.Error(err), zap.Int("rejected_items", req.ItemsCount()))
130134
}
131135
return err
132136
}
133137

134-
// connectSenders connects the senders in the predefined order.
135-
func (be *BaseExporter) connectSenders() {
136-
be.QueueSender.SetNextSender(be.BatchSender)
137-
be.BatchSender.SetNextSender(be.ObsrepSender)
138-
be.ObsrepSender.SetNextSender(be.RetrySender)
139-
be.RetrySender.SetNextSender(be.TimeoutSender)
140-
}
141-
142138
func (be *BaseExporter) Start(ctx context.Context, host component.Host) error {
143139
// First start the wrapped exporter.
144140
if err := be.StartFunc.Start(ctx, host); err != nil {
145141
return err
146142
}
147143

148-
// If no error then start the BatchSender.
149-
if err := be.BatchSender.Start(ctx, host); err != nil {
150-
return err
144+
if be.BatchSender != nil {
145+
// If no error then start the BatchSender.
146+
if err := be.BatchSender.Start(ctx, host); err != nil {
147+
return err
148+
}
151149
}
152150

153151
// Last start the queueSender.
154-
return be.QueueSender.Start(ctx, host)
152+
if be.QueueSender != nil {
153+
return be.QueueSender.Start(ctx, host)
154+
}
155+
156+
return nil
155157
}
156158

157159
func (be *BaseExporter) Shutdown(ctx context.Context) error {
158-
return multierr.Combine(
159-
// First shutdown the retry sender, so the queue sender can flush the queue without retries.
160-
be.RetrySender.Shutdown(ctx),
161-
// Then shutdown the batch sender
162-
be.BatchSender.Shutdown(ctx),
163-
// Then shutdown the queue sender.
164-
be.QueueSender.Shutdown(ctx),
165-
// Last shutdown the wrapped exporter itself.
166-
be.ShutdownFunc.Shutdown(ctx))
160+
var err error
161+
162+
// First shutdown the retry sender, so the queue sender can flush the queue without retries.
163+
if be.RetrySender != nil {
164+
err = multierr.Append(err, be.RetrySender.Shutdown(ctx))
165+
}
166+
167+
// Then shutdown the batch sender
168+
if be.BatchSender != nil {
169+
err = multierr.Append(err, be.BatchSender.Shutdown(ctx))
170+
}
171+
172+
// Then shutdown the queue sender.
173+
if be.QueueSender != nil {
174+
err = multierr.Append(err, be.QueueSender.Shutdown(ctx))
175+
}
176+
177+
// Last shutdown the wrapped exporter itself.
178+
return multierr.Append(err, be.ShutdownFunc.Shutdown(ctx))
167179
}
168180

169181
// WithStart overrides the default Start function for an exporter.
@@ -188,7 +200,7 @@ func WithShutdown(shutdown component.ShutdownFunc) Option {
188200
// The default TimeoutConfig is 5 seconds.
189201
func WithTimeout(timeoutConfig TimeoutConfig) Option {
190202
return func(o *BaseExporter) error {
191-
o.TimeoutSender.cfg = timeoutConfig
203+
o.timeoutCfg = timeoutConfig
192204
return nil
193205
}
194206
}
@@ -201,7 +213,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
201213
o.ExportFailureMessage += " Try enabling retry_on_failure config option to retry on retryable errors."
202214
return nil
203215
}
204-
o.RetrySender = newRetrySender(config, o.Set)
216+
o.retryCfg = config
205217
return nil
206218
}
207219
}
@@ -268,7 +280,7 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
268280
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
269281
func WithBatcher(cfg exporterbatcher.Config) Option {
270282
return func(o *BaseExporter) error {
271-
o.BatcherCfg = cfg
283+
o.batcherCfg = cfg
272284
return nil
273285
}
274286
}

exporter/exporterhelper/internal/base_exporter_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,14 @@ var (
3333
}()
3434
)
3535

36-
func newNoopObsrepSender(*ObsReport) Sender[internal.Request] {
37-
return &BaseSender[internal.Request]{}
36+
type noopSender struct {
37+
component.StartFunc
38+
component.ShutdownFunc
39+
SendFunc[internal.Request]
40+
}
41+
42+
func newNoopObsrepSender(_ *ObsReport, next Sender[internal.Request]) Sender[internal.Request] {
43+
return &noopSender{SendFunc: next.Send}
3844
}
3945

4046
func TestBaseExporter(t *testing.T) {

exporter/exporterhelper/internal/batch_sender.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import (
2323
// - cfg.FlushTimeout is elapsed since the timestamp when the previous batch was sent out.
2424
// - concurrencyLimit is reached.
2525
type BatchSender struct {
26-
BaseSender[internal.Request]
27-
cfg exporterbatcher.Config
26+
cfg exporterbatcher.Config
27+
next Sender[internal.Request]
2828

2929
// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
3030
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
@@ -43,11 +43,12 @@ type BatchSender struct {
4343
stopped *atomic.Bool
4444
}
4545

46-
// newBatchSender returns a new batch consumer component.
47-
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings) *BatchSender {
46+
// NewBatchSender returns a new batch consumer component.
47+
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings, next Sender[internal.Request]) *BatchSender {
4848
bs := &BatchSender{
49-
activeBatch: newEmptyBatch(),
5049
cfg: cfg,
50+
next: next,
51+
activeBatch: newEmptyBatch(),
5152
logger: set.Logger,
5253
shutdownCh: nil,
5354
shutdownCompleteCh: make(chan struct{}),
@@ -119,7 +120,7 @@ func newEmptyBatch() *batch {
119120
// Caller must hold the lock.
120121
func (bs *BatchSender) exportActiveBatch() {
121122
go func(b *batch) {
122-
b.err = bs.NextSender.Send(b.ctx, b.request)
123+
b.err = bs.next.Send(b.ctx, b.request)
123124
close(b.done)
124125
bs.activeRequests.Add(-b.requestsBlocked)
125126
}(bs.activeBatch)
@@ -138,7 +139,7 @@ func (bs *BatchSender) isActiveBatchReady() bool {
138139
func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error {
139140
// Stopped batch sender should act as pass-through to allow the queue to be drained.
140141
if bs.stopped.Load() {
141-
return bs.NextSender.Send(ctx, req)
142+
return bs.next.Send(ctx, req)
142143
}
143144

144145
if bs.cfg.MaxSizeItems > 0 {
@@ -190,7 +191,7 @@ func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Req
190191
// Intentionally do not put the last request in the active batch to not block it.
191192
// TODO: Consider including the partial request in the error to avoid double publishing.
192193
for _, r := range reqs {
193-
if err := bs.NextSender.Send(ctx, r); err != nil {
194+
if err := bs.next.Send(ctx, r); err != nil {
194195
return err
195196
}
196197
}

exporter/exporterhelper/internal/obs_report_sender.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,26 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
66
import (
77
"context"
88

9+
"go.opentelemetry.io/collector/component"
910
"go.opentelemetry.io/collector/exporter/internal"
1011
)
1112

1213
type obsReportSender[K internal.Request] struct {
13-
BaseSender[K]
14+
component.StartFunc
15+
component.ShutdownFunc
1416
obsrep *ObsReport
17+
next Sender[K]
1518
}
1619

17-
func NewObsReportSender[K internal.Request](obsrep *ObsReport) Sender[K] {
18-
return &obsReportSender[K]{obsrep: obsrep}
20+
func NewObsReportSender[K internal.Request](obsrep *ObsReport, next Sender[K]) Sender[K] {
21+
return &obsReportSender[K]{obsrep: obsrep, next: next}
1922
}
2023

2124
func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
2225
c := ors.obsrep.StartOp(ctx)
2326
items := req.ItemsCount()
2427
// Forward the data to the next consumer (this pusher is the next).
25-
err := ors.NextSender.Send(c, req)
28+
err := ors.next.Send(c, req)
2629
ors.obsrep.EndOp(c, items, err)
2730
return err
2831
}

0 commit comments

Comments
 (0)