Skip to content

Commit ad784e9

Browse files
committed
Update queue size after the element is exported, ensures limit
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 0831231 commit ad784e9

File tree

6 files changed

+75
-49
lines changed

6 files changed

+75
-49
lines changed

exporter/exporterhelper/internal/batch_sender_test.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func TestBatchSender_Merge(t *testing.T) {
9090
assert.Equal(t, int64(1), sink.RequestsCount())
9191
assert.Eventually(t, func() bool {
9292
return sink.RequestsCount() == 2 && sink.ItemsCount() == 15
93-
}, 100*time.Millisecond, 10*time.Millisecond)
93+
}, 1*time.Second, 10*time.Millisecond)
9494
})
9595
}
9696
for _, tt := range tests {
@@ -160,12 +160,12 @@ func TestBatchSender_BatchExportError(t *testing.T) {
160160
errReq := &requesttest.FakeRequest{Items: 20, ExportErr: errors.New("transient error"), Sink: sink}
161161
require.NoError(t, be.Send(context.Background(), errReq))
162162

163-
// the batch should be dropped since the queue doesn't have requeuing enabled.
163+
// the batch should be dropped since the queue doesn't have re-queuing enabled.
164164
assert.Eventually(t, func() bool {
165165
return sink.RequestsCount() == tt.expectedRequests &&
166166
sink.ItemsCount() == tt.expectedItems &&
167167
be.queue.Size() == 0
168-
}, 100*time.Millisecond, 10*time.Millisecond)
168+
}, 1*time.Second, 10*time.Millisecond)
169169

170170
require.NoError(t, be.Shutdown(context.Background()))
171171
})
@@ -194,13 +194,13 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
194194
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}))
195195
assert.Eventually(t, func() bool {
196196
return sink.RequestsCount() == 1 && sink.ItemsCount() == 8
197-
}, 500*time.Millisecond, 10*time.Millisecond)
197+
}, 1*time.Second, 10*time.Millisecond)
198198

199199
// big request should be broken down into two requests, both are sent right away.
200200
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink}))
201201
assert.Eventually(t, func() bool {
202202
return sink.RequestsCount() == 3 && sink.ItemsCount() == 25
203-
}, 500*time.Millisecond, 10*time.Millisecond)
203+
}, 1*time.Second, 10*time.Millisecond)
204204

205205
// request that cannot be split should be dropped.
206206
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{
@@ -212,7 +212,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
212212
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink}))
213213
assert.Eventually(t, func() bool {
214214
return sink.RequestsCount() == 5 && sink.ItemsCount() == 38
215-
}, 500*time.Millisecond, 10*time.Millisecond)
215+
}, 1*time.Second, 10*time.Millisecond)
216216
require.NoError(t, be.Shutdown(context.Background()))
217217
})
218218
}
@@ -370,20 +370,20 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
370370

371371
assert.Eventually(t, func() bool {
372372
return sink.RequestsCount() == 1 && sink.ItemsCount() == 4
373-
}, 100*time.Millisecond, 10*time.Millisecond)
373+
}, 1*time.Second, 10*time.Millisecond)
374374

375375
// the 3rd request should be flushed by itself due to flush interval
376376
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
377377
assert.Eventually(t, func() bool {
378378
return sink.RequestsCount() == 2 && sink.ItemsCount() == 6
379-
}, 100*time.Millisecond, 10*time.Millisecond)
379+
}, 1*time.Second, 10*time.Millisecond)
380380

381381
// the 4th and 5th request should be flushed in the same batched request by max concurrency limit.
382382
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
383383
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
384384
assert.Eventually(t, func() bool {
385385
return sink.RequestsCount() == 3 && sink.ItemsCount() == 10
386-
}, 100*time.Millisecond, 10*time.Millisecond)
386+
}, 1*time.Second, 10*time.Millisecond)
387387

388388
// do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling.
389389
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 5, Sink: sink}))
@@ -392,15 +392,15 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
392392
// in case of MaxSizeItems=10, wait for the leftover request to send
393393
assert.Eventually(t, func() bool {
394394
return sink.RequestsCount() == 5 && sink.ItemsCount() == 21
395-
}, 50*time.Millisecond, 10*time.Millisecond)
395+
}, 1*time.Second, 10*time.Millisecond)
396396
}
397397

398398
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink}))
399399
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 6, Sink: sink}))
400400
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 20, Sink: sink}))
401401
assert.Eventually(t, func() bool {
402402
return sink.RequestsCount() == tt.expectedRequests && sink.ItemsCount() == tt.expectedItems
403-
}, 100*time.Millisecond, 10*time.Millisecond)
403+
}, 1*time.Second, 10*time.Millisecond)
404404
})
405405
}
406406
}
@@ -648,7 +648,7 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) {
648648
assert.EventuallyWithT(t, func(c *assert.CollectT) {
649649
assert.LessOrEqual(c, int64(1), sink.RequestsCount())
650650
assert.EqualValues(c, 8, sink.ItemsCount())
651-
}, 500*time.Millisecond, 10*time.Millisecond)
651+
}, 1*time.Second, 10*time.Millisecond)
652652

653653
require.NoError(t, be.Shutdown(context.Background()))
654654
}

exporter/exporterqueue/async_queue_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
7070
wg.Add(1)
7171
go func() {
7272
defer wg.Done()
73-
for j := 0; j < 11; j++ {
73+
for j := 0; j < 10; j++ {
7474
assert.NoError(t, ac.Offer(ctx, 1))
7575
}
7676
assert.ErrorIs(t, ac.Offer(ctx, 3), context.Canceled)

exporter/exporterqueue/disabled_queue.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ var donePool = sync.Pool{
1919

2020
func newDisabledQueue[T any](consumeFunc ConsumeFunc[T]) Queue[T] {
2121
return &disabledQueue[T]{
22+
sizer: &requestSizer[T]{},
2223
consumeFunc: consumeFunc,
2324
size: &atomic.Int64{},
2425
}
@@ -28,14 +29,18 @@ type disabledQueue[T any] struct {
2829
component.StartFunc
2930
component.ShutdownFunc
3031
consumeFunc ConsumeFunc[T]
32+
sizer sizer[T]
3133
size *atomic.Int64
3234
}
3335

3436
func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
37+
elSize := d.sizer.Sizeof(req)
38+
d.size.Add(elSize)
39+
3540
done := donePool.Get().(*blockingDone)
36-
d.size.Add(1)
41+
done.queue = d
42+
done.elSize = elSize
3743
d.consumeFunc(ctx, req, done)
38-
defer d.size.Add(-1)
3944
// Only re-add the blockingDone instance back to the pool if successfully received the
4045
// message from the consumer which guarantees consumer will not use that anymore,
4146
// otherwise no guarantee about when the consumer will add the message to the channel so cannot reuse or close.
@@ -48,6 +53,10 @@ func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
4853
}
4954
}
5055

56+
func (d *disabledQueue[T]) onDone(elSize int64) {
57+
d.size.Add(-elSize)
58+
}
59+
5160
// Size returns the current number of blocked requests waiting to be processed.
5261
func (d *disabledQueue[T]) Size() int64 {
5362
return d.size.Load()
@@ -59,9 +68,14 @@ func (d *disabledQueue[T]) Capacity() int64 {
5968
}
6069

6170
type blockingDone struct {
62-
ch chan error
71+
queue interface {
72+
onDone(int64)
73+
}
74+
elSize int64
75+
ch chan error
6376
}
6477

6578
func (d *blockingDone) OnDone(err error) {
79+
d.queue.onDone(d.elSize)
6680
d.ch <- err
6781
}

exporter/exporterqueue/memory_queue.go

+24-8
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,9 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
9191
defer sq.mu.Unlock()
9292

9393
for {
94-
if sq.size > 0 {
94+
if sq.items.hasElements() {
9595
elCtx, el, elSize := sq.items.pop()
96-
sq.size -= elSize
97-
sq.hasMoreSpace.Signal()
98-
return elCtx, el, noopDoneInst, true
96+
return elCtx, el, sizeDone[T]{queue: sq, size: elSize}, true
9997
}
10098

10199
if sq.stopped {
@@ -109,6 +107,13 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
109107
}
110108
}
111109

110+
func (sq *memoryQueue[T]) onDone(elSize int64) {
111+
sq.mu.Lock()
112+
defer sq.mu.Unlock()
113+
sq.size -= elSize
114+
sq.hasMoreSpace.Signal()
115+
}
116+
112117
// Shutdown closes the queue channel to initiate draining of the queue.
113118
func (sq *memoryQueue[T]) Shutdown(context.Context) error {
114119
sq.mu.Lock()
@@ -142,6 +147,7 @@ type linkedQueue[T any] struct {
142147

143148
func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
144149
n := &node[T]{ctx: ctx, data: data, size: size}
150+
// If tail is nil means list is empty so update both head and tail to point to same element.
145151
if l.tail == nil {
146152
l.head = n
147153
l.tail = n
@@ -151,18 +157,28 @@ func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
151157
l.tail = n
152158
}
153159

160+
func (l *linkedQueue[T]) hasElements() bool {
161+
return l.head != nil
162+
}
163+
154164
func (l *linkedQueue[T]) pop() (context.Context, T, int64) {
155165
n := l.head
156166
l.head = n.next
167+
// If it gets to the last element, then update tail as well.
157168
if l.head == nil {
158169
l.tail = nil
159170
}
160171
n.next = nil
161172
return n.ctx, n.data, n.size
162173
}
163174

164-
type noopDone struct{}
165-
166-
func (*noopDone) OnDone(error) {}
175+
type sizeDone[T any] struct {
176+
queue interface {
177+
onDone(int64)
178+
}
179+
size int64
180+
}
167181

168-
var noopDoneInst = &noopDone{}
182+
func (dc sizeDone[T]) OnDone(error) {
183+
dc.queue.onDone(dc.size)
184+
}

exporter/exporterqueue/persistent_queue.go

+13-12
Original file line numberDiff line numberDiff line change
@@ -292,16 +292,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
292292
pq.hasMoreSpace.Signal()
293293
}
294294
if consumed {
295-
pq.queueSize -= pq.set.sizer.Sizeof(req)
296-
// The size might be not in sync with the queue in case it's restored from the disk
297-
// because we don't flush the current queue size on the disk on every read/write.
298-
// In that case we need to make sure it doesn't go below 0.
299-
if pq.queueSize < 0 {
300-
pq.queueSize = 0
301-
}
302-
pq.hasMoreSpace.Signal()
303-
304-
return context.Background(), req, indexDone[T]{index: index, pq: pq}, true
295+
return context.Background(), req, indexDone[T]{index: index, size: pq.set.sizer.Sizeof(req), pq: pq}, true
305296
}
306297
}
307298

@@ -348,7 +339,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
348339
}
349340

350341
// onDone should be called to remove the item of the given index from the queue once processing is finished.
351-
func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) {
342+
func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr error) {
352343
// Delete the item from the persistent storage after it was processed.
353344
pq.mu.Lock()
354345
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
@@ -359,6 +350,15 @@ func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) {
359350
pq.mu.Unlock()
360351
}()
361352

353+
pq.queueSize -= elSize
354+
// The size might be not in sync with the queue in case it's restored from the disk
355+
// because we don't flush the current queue size on the disk on every read/write.
356+
// In that case we need to make sure it doesn't go below 0.
357+
if pq.queueSize < 0 {
358+
pq.queueSize = 0
359+
}
360+
pq.hasMoreSpace.Signal()
361+
362362
if experr.IsShutdownErr(consumeErr) {
363363
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
364364
// TODO: Handle partially delivered requests by updating their values in the storage.
@@ -557,9 +557,10 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) {
557557

558558
type indexDone[T any] struct {
559559
index uint64
560+
size int64
560561
pq *persistentQueue[T]
561562
}
562563

563564
func (id indexDone[T]) OnDone(err error) {
564-
id.pq.onDone(id.index, err)
565+
id.pq.onDone(id.index, id.size, err)
565566
}

exporter/exporterqueue/persistent_queue_test.go

+8-13
Original file line numberDiff line numberDiff line change
@@ -605,8 +605,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) {
605605

606606
// Put some items, make sure they are loaded and shutdown the storage...
607607
for i := 0; i < 3; i++ {
608-
err := ps.Offer(context.Background(), uint64(50))
609-
require.NoError(t, err)
608+
require.NoError(t, ps.Offer(context.Background(), uint64(50)))
610609
}
611610
assert.Equal(t, int64(3), ps.Size())
612611
require.True(t, consume(ps, func(context.Context, uint64) error {
@@ -652,8 +651,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
652651
ps := createTestPersistentQueueWithRequestsCapacity(t, ext, 1000)
653652

654653
for i := 0; i < 5; i++ {
655-
err := ps.Offer(context.Background(), req)
656-
require.NoError(t, err)
654+
require.NoError(t, ps.Offer(context.Background(), req))
657655
}
658656

659657
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{})
@@ -712,21 +710,20 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) {
712710

713711
// Put in items up to capacity
714712
for i := 0; i < 5; i++ {
715-
err := ps.Offer(context.Background(), req)
716-
require.NoError(t, err)
713+
require.NoError(t, ps.Offer(context.Background(), req))
717714
}
715+
require.Equal(t, int64(5), ps.Size())
718716

719717
require.True(t, consume(ps, func(context.Context, uint64) error {
720-
// put one more item in
721-
require.NoError(t, ps.Offer(context.Background(), req))
718+
// Check that size is still full even when consuming the element.
722719
require.Equal(t, int64(5), ps.Size())
723720
return experr.NewShutdownErr(nil)
724721
}))
725722
require.NoError(t, ps.Shutdown(context.Background()))
726723

727724
// Reload with extra capacity to make sure we re-enqueue in-progress items.
728-
newPs := createTestPersistentQueueWithRequestsCapacity(t, ext, 6)
729-
require.Equal(t, int64(6), newPs.Size())
725+
newPs := createTestPersistentQueueWithRequestsCapacity(t, ext, 5)
726+
require.Equal(t, int64(5), newPs.Size())
730727
}
731728

732729
func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) {
@@ -1004,9 +1001,7 @@ func TestPersistentQueue_ItemDispatchingFinish_ErrorHandling(t *testing.T) {
10041001
ps := createTestPersistentQueueWithClient(client)
10051002
client.Reset()
10061003

1007-
err := ps.itemDispatchingFinish(context.Background(), 0)
1008-
1009-
require.ErrorIs(t, err, tt.expectedError)
1004+
require.ErrorIs(t, ps.itemDispatchingFinish(context.Background(), 0), tt.expectedError)
10101005
})
10111006
}
10121007
}

0 commit comments

Comments
 (0)