Skip to content

Commit 63a413c

Browse files
committed
Update queue size after the element is done exported
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 0831231 commit 63a413c

File tree

7 files changed

+132
-51
lines changed

7 files changed

+132
-51
lines changed
+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Update queue size after the element is done exported
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12399]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: After this change the active queue size will include elements in the process of being exported.
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: []

exporter/exporterhelper/internal/batch_sender_test.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func TestBatchSender_Merge(t *testing.T) {
9090
assert.Equal(t, int64(1), sink.RequestsCount())
9191
assert.Eventually(t, func() bool {
9292
return sink.RequestsCount() == 2 && sink.ItemsCount() == 15
93-
}, 100*time.Millisecond, 10*time.Millisecond)
93+
}, 1*time.Second, 10*time.Millisecond)
9494
})
9595
}
9696
for _, tt := range tests {
@@ -160,12 +160,12 @@ func TestBatchSender_BatchExportError(t *testing.T) {
160160
errReq := &requesttest.FakeRequest{Items: 20, ExportErr: errors.New("transient error"), Sink: sink}
161161
require.NoError(t, be.Send(context.Background(), errReq))
162162

163-
// the batch should be dropped since the queue doesn't have requeuing enabled.
163+
// the batch should be dropped since the queue doesn't have re-queuing enabled.
164164
assert.Eventually(t, func() bool {
165165
return sink.RequestsCount() == tt.expectedRequests &&
166166
sink.ItemsCount() == tt.expectedItems &&
167167
be.queue.Size() == 0
168-
}, 100*time.Millisecond, 10*time.Millisecond)
168+
}, 1*time.Second, 10*time.Millisecond)
169169

170170
require.NoError(t, be.Shutdown(context.Background()))
171171
})
@@ -194,13 +194,13 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
194194
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}))
195195
assert.Eventually(t, func() bool {
196196
return sink.RequestsCount() == 1 && sink.ItemsCount() == 8
197-
}, 500*time.Millisecond, 10*time.Millisecond)
197+
}, 1*time.Second, 10*time.Millisecond)
198198

199199
// big request should be broken down into two requests, both are sent right away.
200200
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink}))
201201
assert.Eventually(t, func() bool {
202202
return sink.RequestsCount() == 3 && sink.ItemsCount() == 25
203-
}, 500*time.Millisecond, 10*time.Millisecond)
203+
}, 1*time.Second, 10*time.Millisecond)
204204

205205
// request that cannot be split should be dropped.
206206
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{
@@ -212,7 +212,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
212212
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink}))
213213
assert.Eventually(t, func() bool {
214214
return sink.RequestsCount() == 5 && sink.ItemsCount() == 38
215-
}, 500*time.Millisecond, 10*time.Millisecond)
215+
}, 1*time.Second, 10*time.Millisecond)
216216
require.NoError(t, be.Shutdown(context.Background()))
217217
})
218218
}
@@ -370,20 +370,20 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
370370

371371
assert.Eventually(t, func() bool {
372372
return sink.RequestsCount() == 1 && sink.ItemsCount() == 4
373-
}, 100*time.Millisecond, 10*time.Millisecond)
373+
}, 1*time.Second, 10*time.Millisecond)
374374

375375
// the 3rd request should be flushed by itself due to flush interval
376376
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
377377
assert.Eventually(t, func() bool {
378378
return sink.RequestsCount() == 2 && sink.ItemsCount() == 6
379-
}, 100*time.Millisecond, 10*time.Millisecond)
379+
}, 1*time.Second, 10*time.Millisecond)
380380

381381
// the 4th and 5th request should be flushed in the same batched request by max concurrency limit.
382382
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
383383
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
384384
assert.Eventually(t, func() bool {
385385
return sink.RequestsCount() == 3 && sink.ItemsCount() == 10
386-
}, 100*time.Millisecond, 10*time.Millisecond)
386+
}, 1*time.Second, 10*time.Millisecond)
387387

388388
// do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling.
389389
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 5, Sink: sink}))
@@ -392,15 +392,15 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
392392
// in case of MaxSizeItems=10, wait for the leftover request to send
393393
assert.Eventually(t, func() bool {
394394
return sink.RequestsCount() == 5 && sink.ItemsCount() == 21
395-
}, 50*time.Millisecond, 10*time.Millisecond)
395+
}, 1*time.Second, 10*time.Millisecond)
396396
}
397397

398398
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink}))
399399
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 6, Sink: sink}))
400400
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 20, Sink: sink}))
401401
assert.Eventually(t, func() bool {
402402
return sink.RequestsCount() == tt.expectedRequests && sink.ItemsCount() == tt.expectedItems
403-
}, 100*time.Millisecond, 10*time.Millisecond)
403+
}, 1*time.Second, 10*time.Millisecond)
404404
})
405405
}
406406
}
@@ -648,7 +648,7 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) {
648648
assert.EventuallyWithT(t, func(c *assert.CollectT) {
649649
assert.LessOrEqual(c, int64(1), sink.RequestsCount())
650650
assert.EqualValues(c, 8, sink.ItemsCount())
651-
}, 500*time.Millisecond, 10*time.Millisecond)
651+
}, 1*time.Second, 10*time.Millisecond)
652652

653653
require.NoError(t, be.Shutdown(context.Background()))
654654
}

exporter/exporterqueue/async_queue_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
7070
wg.Add(1)
7171
go func() {
7272
defer wg.Done()
73-
for j := 0; j < 11; j++ {
73+
for j := 0; j < 10; j++ {
7474
assert.NoError(t, ac.Offer(ctx, 1))
7575
}
7676
assert.ErrorIs(t, ac.Offer(ctx, 3), context.Canceled)

exporter/exporterqueue/disabled_queue.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ var donePool = sync.Pool{
1919

2020
func newDisabledQueue[T any](consumeFunc ConsumeFunc[T]) Queue[T] {
2121
return &disabledQueue[T]{
22+
sizer: &requestSizer[T]{},
2223
consumeFunc: consumeFunc,
2324
size: &atomic.Int64{},
2425
}
@@ -28,14 +29,18 @@ type disabledQueue[T any] struct {
2829
component.StartFunc
2930
component.ShutdownFunc
3031
consumeFunc ConsumeFunc[T]
32+
sizer sizer[T]
3133
size *atomic.Int64
3234
}
3335

3436
func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
37+
elSize := d.sizer.Sizeof(req)
38+
d.size.Add(elSize)
39+
3540
done := donePool.Get().(*blockingDone)
36-
d.size.Add(1)
41+
done.queue = d
42+
done.elSize = elSize
3743
d.consumeFunc(ctx, req, done)
38-
defer d.size.Add(-1)
3944
// Only re-add the blockingDone instance back to the pool if successfully received the
4045
// message from the consumer which guarantees consumer will not use that anymore,
4146
// otherwise no guarantee about when the consumer will add the message to the channel so cannot reuse or close.
@@ -48,6 +53,10 @@ func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
4853
}
4954
}
5055

56+
func (d *disabledQueue[T]) onDone(elSize int64) {
57+
d.size.Add(-elSize)
58+
}
59+
5160
// Size returns the current number of blocked requests waiting to be processed.
5261
func (d *disabledQueue[T]) Size() int64 {
5362
return d.size.Load()
@@ -59,9 +68,14 @@ func (d *disabledQueue[T]) Capacity() int64 {
5968
}
6069

6170
type blockingDone struct {
62-
ch chan error
71+
queue interface {
72+
onDone(int64)
73+
}
74+
elSize int64
75+
ch chan error
6376
}
6477

6578
func (d *blockingDone) OnDone(err error) {
79+
d.queue.onDone(d.elSize)
6680
d.ch <- err
6781
}

exporter/exporterqueue/memory_queue.go

+37-7
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ import (
1111
"go.opentelemetry.io/collector/component"
1212
)
1313

14+
var sizeDonePool = sync.Pool{
15+
New: func() any {
16+
return &sizeDone{}
17+
},
18+
}
19+
1420
var errInvalidSize = errors.New("invalid element size")
1521

1622
// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
@@ -91,11 +97,11 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
9197
defer sq.mu.Unlock()
9298

9399
for {
94-
if sq.size > 0 {
100+
if sq.items.hasElements() {
95101
elCtx, el, elSize := sq.items.pop()
96-
sq.size -= elSize
97-
sq.hasMoreSpace.Signal()
98-
return elCtx, el, noopDoneInst, true
102+
sd := sizeDonePool.Get().(*sizeDone)
103+
sd.reset(elSize, sq)
104+
return elCtx, el, sd, true
99105
}
100106

101107
if sq.stopped {
@@ -109,6 +115,13 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
109115
}
110116
}
111117

118+
func (sq *memoryQueue[T]) onDone(elSize int64) {
119+
sq.mu.Lock()
120+
defer sq.mu.Unlock()
121+
sq.size -= elSize
122+
sq.hasMoreSpace.Signal()
123+
}
124+
112125
// Shutdown closes the queue channel to initiate draining of the queue.
113126
func (sq *memoryQueue[T]) Shutdown(context.Context) error {
114127
sq.mu.Lock()
@@ -142,6 +155,7 @@ type linkedQueue[T any] struct {
142155

143156
func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
144157
n := &node[T]{ctx: ctx, data: data, size: size}
158+
// If tail is nil means list is empty so update both head and tail to point to same element.
145159
if l.tail == nil {
146160
l.head = n
147161
l.tail = n
@@ -151,18 +165,34 @@ func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
151165
l.tail = n
152166
}
153167

168+
func (l *linkedQueue[T]) hasElements() bool {
169+
return l.head != nil
170+
}
171+
154172
func (l *linkedQueue[T]) pop() (context.Context, T, int64) {
155173
n := l.head
156174
l.head = n.next
175+
// If it gets to the last element, then update tail as well.
157176
if l.head == nil {
158177
l.tail = nil
159178
}
160179
n.next = nil
161180
return n.ctx, n.data, n.size
162181
}
163182

164-
type noopDone struct{}
183+
type sizeDone struct {
184+
size int64
185+
queue interface {
186+
onDone(int64)
187+
}
188+
}
165189

166-
func (*noopDone) OnDone(error) {}
190+
func (sd *sizeDone) reset(size int64, queue interface{ onDone(int64) }) {
191+
sd.size = size
192+
sd.queue = queue
193+
}
167194

168-
var noopDoneInst = &noopDone{}
195+
func (sd *sizeDone) OnDone(error) {
196+
defer sizeDonePool.Put(sd)
197+
sd.queue.onDone(sd.size)
198+
}

exporter/exporterqueue/persistent_queue.go

+32-15
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ var (
3838
errWrongExtensionType = errors.New("requested extension is not a storage extension")
3939
)
4040

41+
var indexDonePool = sync.Pool{
42+
New: func() any {
43+
return &indexDone{}
44+
},
45+
}
46+
4147
type persistentQueueSettings[T any] struct {
4248
sizer sizer[T]
4349
capacity int64
@@ -292,16 +298,9 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
292298
pq.hasMoreSpace.Signal()
293299
}
294300
if consumed {
295-
pq.queueSize -= pq.set.sizer.Sizeof(req)
296-
// The size might be not in sync with the queue in case it's restored from the disk
297-
// because we don't flush the current queue size on the disk on every read/write.
298-
// In that case we need to make sure it doesn't go below 0.
299-
if pq.queueSize < 0 {
300-
pq.queueSize = 0
301-
}
302-
pq.hasMoreSpace.Signal()
303-
304-
return context.Background(), req, indexDone[T]{index: index, pq: pq}, true
301+
id := indexDonePool.Get().(*indexDone)
302+
id.reset(index, pq.set.sizer.Sizeof(req), pq)
303+
return context.Background(), req, id, true
305304
}
306305
}
307306

@@ -348,7 +347,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
348347
}
349348

350349
// onDone should be called to remove the item of the given index from the queue once processing is finished.
351-
func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) {
350+
func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr error) {
352351
// Delete the item from the persistent storage after it was processed.
353352
pq.mu.Lock()
354353
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
@@ -359,6 +358,15 @@ func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) {
359358
pq.mu.Unlock()
360359
}()
361360

361+
pq.queueSize -= elSize
362+
// The size might be not in sync with the queue in case it's restored from the disk
363+
// because we don't flush the current queue size on the disk on every read/write.
364+
// In that case we need to make sure it doesn't go below 0.
365+
if pq.queueSize < 0 {
366+
pq.queueSize = 0
367+
}
368+
pq.hasMoreSpace.Signal()
369+
362370
if experr.IsShutdownErr(consumeErr) {
363371
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
364372
// TODO: Handle partially delivered requests by updating their values in the storage.
@@ -555,11 +563,20 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) {
555563
return val, nil
556564
}
557565

558-
type indexDone[T any] struct {
566+
type indexDone struct {
559567
index uint64
560-
pq *persistentQueue[T]
568+
size int64
569+
queue interface {
570+
onDone(uint64, int64, error)
571+
}
572+
}
573+
574+
func (id *indexDone) reset(index uint64, size int64, queue interface{ onDone(uint64, int64, error) }) {
575+
id.index = index
576+
id.size = size
577+
id.queue = queue
561578
}
562579

563-
func (id indexDone[T]) OnDone(err error) {
564-
id.pq.onDone(id.index, err)
580+
func (id *indexDone) OnDone(err error) {
581+
id.queue.onDone(id.index, id.size, err)
565582
}

0 commit comments

Comments
 (0)