Skip to content

Commit 0d2f646

Browse files
authored
Update queue size after the element is done exported (#12399)
This PR changes when the size of the queues is updated, previously the size was updated when the element was removed from the queue but before was fully consumed (done callback was called), after this PR the size is updated when done callback is called. This change ensures that the queue size limit applies to the whole exporter and allows better memory control for the users. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 41f3e69 commit 0d2f646

File tree

7 files changed

+132
-51
lines changed

7 files changed

+132
-51
lines changed
+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Update queue size after the element is done exported
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12399]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: After this change the active queue size will include elements in the process of being exported.
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: []

exporter/exporterhelper/internal/batch_sender_test.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func TestBatchSender_Merge(t *testing.T) {
9090
assert.Equal(t, int64(1), sink.RequestsCount())
9191
assert.Eventually(t, func() bool {
9292
return sink.RequestsCount() == 2 && sink.ItemsCount() == 15
93-
}, 100*time.Millisecond, 10*time.Millisecond)
93+
}, 1*time.Second, 10*time.Millisecond)
9494
})
9595
}
9696
for _, tt := range tests {
@@ -160,12 +160,12 @@ func TestBatchSender_BatchExportError(t *testing.T) {
160160
errReq := &requesttest.FakeRequest{Items: 20, ExportErr: errors.New("transient error"), Sink: sink}
161161
require.NoError(t, be.Send(context.Background(), errReq))
162162

163-
// the batch should be dropped since the queue doesn't have requeuing enabled.
163+
// the batch should be dropped since the queue doesn't have re-queuing enabled.
164164
assert.Eventually(t, func() bool {
165165
return sink.RequestsCount() == tt.expectedRequests &&
166166
sink.ItemsCount() == tt.expectedItems &&
167167
be.queue.Size() == 0
168-
}, 100*time.Millisecond, 10*time.Millisecond)
168+
}, 1*time.Second, 10*time.Millisecond)
169169

170170
require.NoError(t, be.Shutdown(context.Background()))
171171
})
@@ -194,13 +194,13 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
194194
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}))
195195
assert.Eventually(t, func() bool {
196196
return sink.RequestsCount() == 1 && sink.ItemsCount() == 8
197-
}, 500*time.Millisecond, 10*time.Millisecond)
197+
}, 1*time.Second, 10*time.Millisecond)
198198

199199
// big request should be broken down into two requests, both are sent right away.
200200
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink}))
201201
assert.Eventually(t, func() bool {
202202
return sink.RequestsCount() == 3 && sink.ItemsCount() == 25
203-
}, 500*time.Millisecond, 10*time.Millisecond)
203+
}, 1*time.Second, 10*time.Millisecond)
204204

205205
// request that cannot be split should be dropped.
206206
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{
@@ -212,7 +212,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
212212
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink}))
213213
assert.Eventually(t, func() bool {
214214
return sink.RequestsCount() == 5 && sink.ItemsCount() == 38
215-
}, 500*time.Millisecond, 10*time.Millisecond)
215+
}, 1*time.Second, 10*time.Millisecond)
216216
require.NoError(t, be.Shutdown(context.Background()))
217217
})
218218
}
@@ -370,20 +370,20 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
370370

371371
assert.Eventually(t, func() bool {
372372
return sink.RequestsCount() == 1 && sink.ItemsCount() == 4
373-
}, 100*time.Millisecond, 10*time.Millisecond)
373+
}, 1*time.Second, 10*time.Millisecond)
374374

375375
// the 3rd request should be flushed by itself due to flush interval
376376
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
377377
assert.Eventually(t, func() bool {
378378
return sink.RequestsCount() == 2 && sink.ItemsCount() == 6
379-
}, 100*time.Millisecond, 10*time.Millisecond)
379+
}, 1*time.Second, 10*time.Millisecond)
380380

381381
// the 4th and 5th request should be flushed in the same batched request by max concurrency limit.
382382
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
383383
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
384384
assert.Eventually(t, func() bool {
385385
return sink.RequestsCount() == 3 && sink.ItemsCount() == 10
386-
}, 100*time.Millisecond, 10*time.Millisecond)
386+
}, 1*time.Second, 10*time.Millisecond)
387387

388388
// do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling.
389389
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 5, Sink: sink}))
@@ -392,15 +392,15 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
392392
// in case of MaxSizeItems=10, wait for the leftover request to send
393393
assert.Eventually(t, func() bool {
394394
return sink.RequestsCount() == 5 && sink.ItemsCount() == 21
395-
}, 50*time.Millisecond, 10*time.Millisecond)
395+
}, 1*time.Second, 10*time.Millisecond)
396396
}
397397

398398
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink}))
399399
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 6, Sink: sink}))
400400
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 20, Sink: sink}))
401401
assert.Eventually(t, func() bool {
402402
return sink.RequestsCount() == tt.expectedRequests && sink.ItemsCount() == tt.expectedItems
403-
}, 100*time.Millisecond, 10*time.Millisecond)
403+
}, 1*time.Second, 10*time.Millisecond)
404404
})
405405
}
406406
}
@@ -648,7 +648,7 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) {
648648
assert.EventuallyWithT(t, func(c *assert.CollectT) {
649649
assert.LessOrEqual(c, int64(1), sink.RequestsCount())
650650
assert.EqualValues(c, 8, sink.ItemsCount())
651-
}, 500*time.Millisecond, 10*time.Millisecond)
651+
}, 1*time.Second, 10*time.Millisecond)
652652

653653
require.NoError(t, be.Shutdown(context.Background()))
654654
}

exporter/exporterqueue/async_queue_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
7070
wg.Add(1)
7171
go func() {
7272
defer wg.Done()
73-
for j := 0; j < 11; j++ {
73+
for j := 0; j < 10; j++ {
7474
assert.NoError(t, ac.Offer(ctx, 1))
7575
}
7676
assert.ErrorIs(t, ac.Offer(ctx, 3), context.Canceled)

exporter/exporterqueue/disabled_queue.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ var donePool = sync.Pool{
1919

2020
func newDisabledQueue[T any](consumeFunc ConsumeFunc[T]) Queue[T] {
2121
return &disabledQueue[T]{
22+
sizer: &requestSizer[T]{},
2223
consumeFunc: consumeFunc,
2324
size: &atomic.Int64{},
2425
}
@@ -28,14 +29,18 @@ type disabledQueue[T any] struct {
2829
component.StartFunc
2930
component.ShutdownFunc
3031
consumeFunc ConsumeFunc[T]
32+
sizer sizer[T]
3133
size *atomic.Int64
3234
}
3335

3436
func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
37+
elSize := d.sizer.Sizeof(req)
38+
d.size.Add(elSize)
39+
3540
done := donePool.Get().(*blockingDone)
36-
d.size.Add(1)
41+
done.queue = d
42+
done.elSize = elSize
3743
d.consumeFunc(ctx, req, done)
38-
defer d.size.Add(-1)
3944
// Only re-add the blockingDone instance back to the pool if successfully received the
4045
// message from the consumer which guarantees consumer will not use that anymore,
4146
// otherwise no guarantee about when the consumer will add the message to the channel so cannot reuse or close.
@@ -48,6 +53,10 @@ func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
4853
}
4954
}
5055

56+
func (d *disabledQueue[T]) onDone(elSize int64) {
57+
d.size.Add(-elSize)
58+
}
59+
5160
// Size returns the current number of blocked requests waiting to be processed.
5261
func (d *disabledQueue[T]) Size() int64 {
5362
return d.size.Load()
@@ -59,9 +68,14 @@ func (d *disabledQueue[T]) Capacity() int64 {
5968
}
6069

6170
type blockingDone struct {
62-
ch chan error
71+
queue interface {
72+
onDone(int64)
73+
}
74+
elSize int64
75+
ch chan error
6376
}
6477

6578
func (d *blockingDone) OnDone(err error) {
79+
d.queue.onDone(d.elSize)
6680
d.ch <- err
6781
}

exporter/exporterqueue/memory_queue.go

+37-7
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ import (
1111
"go.opentelemetry.io/collector/component"
1212
)
1313

14+
var sizeDonePool = sync.Pool{
15+
New: func() any {
16+
return &sizeDone{}
17+
},
18+
}
19+
1420
var errInvalidSize = errors.New("invalid element size")
1521

1622
// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
@@ -91,11 +97,11 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
9197
defer sq.mu.Unlock()
9298

9399
for {
94-
if sq.size > 0 {
100+
if sq.items.hasElements() {
95101
elCtx, el, elSize := sq.items.pop()
96-
sq.size -= elSize
97-
sq.hasMoreSpace.Signal()
98-
return elCtx, el, noopDoneInst, true
102+
sd := sizeDonePool.Get().(*sizeDone)
103+
sd.reset(elSize, sq)
104+
return elCtx, el, sd, true
99105
}
100106

101107
if sq.stopped {
@@ -109,6 +115,13 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
109115
}
110116
}
111117

118+
func (sq *memoryQueue[T]) onDone(elSize int64) {
119+
sq.mu.Lock()
120+
defer sq.mu.Unlock()
121+
sq.size -= elSize
122+
sq.hasMoreSpace.Signal()
123+
}
124+
112125
// Shutdown closes the queue channel to initiate draining of the queue.
113126
func (sq *memoryQueue[T]) Shutdown(context.Context) error {
114127
sq.mu.Lock()
@@ -142,6 +155,7 @@ type linkedQueue[T any] struct {
142155

143156
func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
144157
n := &node[T]{ctx: ctx, data: data, size: size}
158+
// If tail is nil means list is empty so update both head and tail to point to same element.
145159
if l.tail == nil {
146160
l.head = n
147161
l.tail = n
@@ -151,18 +165,34 @@ func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
151165
l.tail = n
152166
}
153167

168+
func (l *linkedQueue[T]) hasElements() bool {
169+
return l.head != nil
170+
}
171+
154172
func (l *linkedQueue[T]) pop() (context.Context, T, int64) {
155173
n := l.head
156174
l.head = n.next
175+
// If it gets to the last element, then update tail as well.
157176
if l.head == nil {
158177
l.tail = nil
159178
}
160179
n.next = nil
161180
return n.ctx, n.data, n.size
162181
}
163182

164-
type noopDone struct{}
183+
type sizeDone struct {
184+
size int64
185+
queue interface {
186+
onDone(int64)
187+
}
188+
}
165189

166-
func (*noopDone) OnDone(error) {}
190+
func (sd *sizeDone) reset(size int64, queue interface{ onDone(int64) }) {
191+
sd.size = size
192+
sd.queue = queue
193+
}
167194

168-
var noopDoneInst = &noopDone{}
195+
func (sd *sizeDone) OnDone(error) {
196+
defer sizeDonePool.Put(sd)
197+
sd.queue.onDone(sd.size)
198+
}

exporter/exporterqueue/persistent_queue.go

+32-15
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ var (
3838
errWrongExtensionType = errors.New("requested extension is not a storage extension")
3939
)
4040

41+
var indexDonePool = sync.Pool{
42+
New: func() any {
43+
return &indexDone{}
44+
},
45+
}
46+
4147
type persistentQueueSettings[T any] struct {
4248
sizer sizer[T]
4349
capacity int64
@@ -292,16 +298,9 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
292298
pq.hasMoreSpace.Signal()
293299
}
294300
if consumed {
295-
pq.queueSize -= pq.set.sizer.Sizeof(req)
296-
// The size might be not in sync with the queue in case it's restored from the disk
297-
// because we don't flush the current queue size on the disk on every read/write.
298-
// In that case we need to make sure it doesn't go below 0.
299-
if pq.queueSize < 0 {
300-
pq.queueSize = 0
301-
}
302-
pq.hasMoreSpace.Signal()
303-
304-
return context.Background(), req, indexDone[T]{index: index, pq: pq}, true
301+
id := indexDonePool.Get().(*indexDone)
302+
id.reset(index, pq.set.sizer.Sizeof(req), pq)
303+
return context.Background(), req, id, true
305304
}
306305
}
307306

@@ -348,7 +347,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
348347
}
349348

350349
// onDone should be called to remove the item of the given index from the queue once processing is finished.
351-
func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) {
350+
func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr error) {
352351
// Delete the item from the persistent storage after it was processed.
353352
pq.mu.Lock()
354353
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
@@ -359,6 +358,15 @@ func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) {
359358
pq.mu.Unlock()
360359
}()
361360

361+
pq.queueSize -= elSize
362+
// The size might be not in sync with the queue in case it's restored from the disk
363+
// because we don't flush the current queue size on the disk on every read/write.
364+
// In that case we need to make sure it doesn't go below 0.
365+
if pq.queueSize < 0 {
366+
pq.queueSize = 0
367+
}
368+
pq.hasMoreSpace.Signal()
369+
362370
if experr.IsShutdownErr(consumeErr) {
363371
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
364372
// TODO: Handle partially delivered requests by updating their values in the storage.
@@ -555,11 +563,20 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) {
555563
return val, nil
556564
}
557565

558-
type indexDone[T any] struct {
566+
type indexDone struct {
559567
index uint64
560-
pq *persistentQueue[T]
568+
size int64
569+
queue interface {
570+
onDone(uint64, int64, error)
571+
}
572+
}
573+
574+
func (id *indexDone) reset(index uint64, size int64, queue interface{ onDone(uint64, int64, error) }) {
575+
id.index = index
576+
id.size = size
577+
id.queue = queue
561578
}
562579

563-
func (id indexDone[T]) OnDone(err error) {
564-
id.pq.onDone(id.index, err)
580+
func (id *indexDone) OnDone(err error) {
581+
id.queue.onDone(id.index, id.size, err)
565582
}

0 commit comments

Comments
 (0)