Skip to content

Commit 07509db

Browse files
committed
Add capability for memory and persistent queue to block when add items
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 2144722 commit 07509db

File tree

6 files changed

+165
-49
lines changed

6 files changed

+165
-49
lines changed

exporter/exporterqueue/bounded_memory_queue.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,18 @@ type boundedMemoryQueue[T any] struct {
2323
type memoryQueueSettings[T any] struct {
2424
sizer sizer[T]
2525
capacity int64
26+
blocking bool
2627
}
2728

2829
// newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
2930
// callback for dropped items (e.g. useful to emit metrics).
3031
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
3132
return &boundedMemoryQueue[T]{
32-
sizedQueue: newSizedQueue[T](set.capacity, set.sizer),
33+
sizedQueue: newSizedQueue[T](set.capacity, set.sizer, set.blocking),
3334
}
3435
}
3536

36-
func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
37+
func (q *boundedMemoryQueue[T]) Read(context.Context) (uint64, context.Context, T, bool) {
3738
ctx, req, ok := q.sizedQueue.pop()
3839
return 0, ctx, req, ok
3940
}

exporter/exporterqueue/bounded_memory_queue_test.go

+34
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,40 @@ func TestQueueUsage(t *testing.T) {
133133
}
134134
}
135135

136+
func TestBlockingQueueUsage(t *testing.T) {
137+
tests := []struct {
138+
name string
139+
sizer sizer[ptrace.Traces]
140+
}{
141+
{
142+
name: "requests_based",
143+
sizer: &requestSizer[ptrace.Traces]{},
144+
},
145+
{
146+
name: "items_based",
147+
sizer: &itemsSizer{},
148+
},
149+
}
150+
for _, tt := range tests {
151+
t.Run(tt.name, func(t *testing.T) {
152+
q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: tt.sizer, capacity: int64(100)})
153+
consumed := &atomic.Int64{}
154+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
155+
ac := newAsyncConsumer(q, 10, func(context.Context, ptrace.Traces) error {
156+
consumed.Add(1)
157+
return nil
158+
})
159+
td := testdata.GenerateTraces(10)
160+
for j := 0; j < 1_000_000; j++ {
161+
require.NoError(t, q.Offer(context.Background(), td))
162+
}
163+
assert.NoError(t, q.Shutdown(context.Background()))
164+
assert.NoError(t, ac.Shutdown(context.Background()))
165+
assert.Equal(t, int64(10), consumed.Load())
166+
})
167+
}
168+
}
169+
136170
func TestZeroSizeNoConsumers(t *testing.T) {
137171
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0})
138172

exporter/exporterqueue/cond.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package exporterqueue
2+
3+
import (
4+
"context"
5+
"sync"
6+
)
7+
8+
// cond is equivalent with sync.Cond, but context.Context aware. Which means Wait() will return if context is done.
9+
// Also, it requires the caller to hold the c.L during all calls.
10+
type cond struct {
11+
L sync.Locker
12+
ch chan struct{}
13+
waiting int64
14+
}
15+
16+
func newCond(l sync.Locker) *cond {
17+
return &cond{L: l, ch: make(chan struct{}, 1)}
18+
}
19+
20+
// Signal wakes one goroutine waiting on c, if there is any.
21+
// It requires for the caller to hold c.L during the call.
22+
func (c *cond) Signal() {
23+
if c.waiting == 0 {
24+
return
25+
}
26+
c.waiting--
27+
c.ch <- struct{}{}
28+
}
29+
30+
// Broadcast wakes all goroutines waiting on c.
31+
// It requires for the caller to hold c.L during the call.
32+
func (c *cond) Broadcast() {
33+
for ; c.waiting > 0; c.waiting-- {
34+
c.ch <- struct{}{}
35+
}
36+
}
37+
38+
// Wait atomically unlocks c.L and suspends execution of the calling goroutine. After later resuming execution, Wait locks c.L before returning.
39+
func (c *cond) Wait(ctx context.Context) error {
40+
c.waiting++
41+
c.L.Unlock()
42+
select {
43+
case <-ctx.Done():
44+
c.L.Lock()
45+
if c.waiting == 0 {
46+
// If waiting is 0, it means that there was a signal sent and nobody else waits for it.
47+
// Consume it, so that we don't unblock other consumer unnecessary,
48+
// or we don't block the producer because the channel buffer is full.
49+
<-c.ch
50+
} else {
51+
// Decrease the number of waiting routines.
52+
c.waiting--
53+
}
54+
return ctx.Err()
55+
case <-c.ch:
56+
c.L.Lock()
57+
return nil
58+
}
59+
}

exporter/exporterqueue/persistent_queue.go

+35-26
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ var (
4242
type persistentQueueSettings[T any] struct {
4343
sizer sizer[T]
4444
capacity int64
45+
blocking bool
4546
signal pipeline.Signal
4647
storageID component.ID
4748
marshaler Marshaler[T]
@@ -78,10 +79,12 @@ type persistentQueue[T any] struct {
7879

7980
// isRequestSized indicates whether the queue is sized by the number of requests.
8081
isRequestSized bool
82+
blocking bool
8183

8284
// mu guards everything declared below.
8385
mu sync.Mutex
84-
hasElements *sync.Cond
86+
hasMoreElements *cond
87+
hasMoreSpace *cond
8588
readIndex uint64
8689
writeIndex uint64
8790
currentlyDispatchedItems []uint64
@@ -95,10 +98,11 @@ func newPersistentQueue[T any](set persistentQueueSettings[T]) Queue[T] {
9598
_, isRequestSized := set.sizer.(*requestSizer[T])
9699
pq := &persistentQueue[T]{
97100
set: set,
98-
logger: set.set.Logger,
99101
isRequestSized: isRequestSized,
102+
blocking: set.blocking,
100103
}
101-
pq.hasElements = sync.NewCond(&pq.mu)
104+
pq.hasMoreElements = newCond(&pq.mu)
105+
pq.hasMoreSpace = newCond(&pq.mu)
102106
return pq
103107
}
104108

@@ -194,7 +198,7 @@ func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
194198
backupErr := pq.backupQueueSize(ctx)
195199
// Mark this queue as stopped, so consumer don't start any more work.
196200
pq.stopped = true
197-
pq.hasElements.Broadcast()
201+
pq.hasMoreElements.Broadcast()
198202
return multierr.Combine(backupErr, pq.unrefClient(ctx))
199203
}
200204

@@ -233,8 +237,13 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
233237
// putInternal is the internal version that requires caller to hold the mutex lock.
234238
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
235239
reqSize := pq.set.sizer.Sizeof(req)
236-
if pq.queueSize+reqSize > pq.set.capacity {
237-
return ErrQueueIsFull
240+
for pq.queueSize+reqSize > pq.set.capacity {
241+
if !pq.blocking {
242+
return ErrQueueIsFull
243+
}
244+
if err := pq.hasMoreSpace.Wait(ctx); err != nil {
245+
return err
246+
}
238247
}
239248

240249
reqBuf, err := pq.set.marshaler(req)
@@ -253,7 +262,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
253262

254263
pq.writeIndex++
255264
pq.queueSize += reqSize
256-
pq.hasElements.Signal()
265+
pq.hasMoreElements.Signal()
257266

258267
// Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size
259268
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
@@ -270,31 +279,31 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context
270279
pq.mu.Lock()
271280
defer pq.mu.Unlock()
272281
for {
273-
if pq.stopped {
274-
var req T
275-
return 0, context.Background(), req, false
276-
}
277282

278283
// If queue is empty, wait until more elements and restart.
279-
if pq.readIndex == pq.writeIndex {
280-
pq.hasElements.Wait()
281-
continue
282-
}
283-
284-
index, req, consumed := pq.getNextItem(ctx)
285-
if consumed {
286-
pq.queueSize -= pq.set.sizer.Sizeof(req)
287-
// The size might be not in sync with the queue in case it's restored from the disk
288-
// because we don't flush the current queue size on the disk on every read/write.
289-
// In that case we need to make sure it doesn't go below 0.
290-
if pq.queueSize < 0 {
291-
pq.queueSize = 0
284+
for pq.readIndex != pq.writeIndex {
285+
index, req, consumed := pq.getNextItem(ctx)
286+
if consumed {
287+
pq.queueSize -= pq.set.sizer.Sizeof(req)
288+
// The size might be not in sync with the queue in case it's restored from the disk
289+
// because we don't flush the current queue size on the disk on every read/write.
290+
// In that case we need to make sure it doesn't go below 0.
291+
if pq.queueSize < 0 {
292+
pq.queueSize = 0
293+
}
294+
295+
return index, context.Background(), req, true
292296
}
297+
}
293298

294-
return index, context.Background(), req, true
299+
if pq.stopped {
300+
var req T
301+
return 0, context.Background(), req, false
295302
}
296303

297-
// If we did not consume any element retry from the beginning.
304+
// TODO: Change the Queue interface to return an error to allow distinguish between shutdown and context canceled.
305+
// Ok to ignore the error, since the context.Background() will never be done.
306+
_ = pq.hasMoreElements.Wait(context.Background())
298307
}
299308
}
300309

exporter/exporterqueue/sized_queue.go

+30-17
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,26 @@ type sizedQueue[T any] struct {
5050
sizer sizer[T]
5151
cap int64
5252

53-
mu sync.Mutex
54-
hasElements *sync.Cond
55-
items *linkedQueue[T]
56-
size int64
57-
stopped bool
53+
mu sync.Mutex
54+
hasMoreElements *cond
55+
hasMoreSpace *cond
56+
items *linkedQueue[T]
57+
size int64
58+
stopped bool
59+
blocking bool
5860
}
5961

6062
// newSizedQueue creates a sized elements channel. Each element is assigned a size by the provided sizer.
6163
// capacity is the capacity of the queue.
62-
func newSizedQueue[T any](capacity int64, sizer sizer[T]) *sizedQueue[T] {
64+
func newSizedQueue[T any](capacity int64, sizer sizer[T], blocking bool) *sizedQueue[T] {
6365
sq := &sizedQueue[T]{
64-
sizer: sizer,
65-
cap: capacity,
66-
items: &linkedQueue[T]{},
66+
sizer: sizer,
67+
cap: capacity,
68+
items: &linkedQueue[T]{},
69+
blocking: blocking,
6770
}
68-
sq.hasElements = sync.NewCond(&sq.mu)
71+
sq.hasMoreElements = newCond(&sq.mu)
72+
sq.hasMoreSpace = newCond(&sq.mu)
6973
return sq
7074
}
7175

@@ -84,14 +88,20 @@ func (sq *sizedQueue[T]) Offer(ctx context.Context, el T) error {
8488
sq.mu.Lock()
8589
defer sq.mu.Unlock()
8690

87-
if sq.size+elSize > sq.cap {
88-
return ErrQueueIsFull
91+
for sq.size+elSize > sq.cap {
92+
if !sq.blocking {
93+
return ErrQueueIsFull
94+
}
95+
// Wait for more space or before the ctx is Done.
96+
if err := sq.hasMoreSpace.Wait(ctx); err != nil {
97+
return err
98+
}
8999
}
90100

91101
sq.size += elSize
92102
sq.items.push(ctx, el, elSize)
93103
// Signal one consumer if any.
94-
sq.hasElements.Signal()
104+
sq.hasMoreElements.Signal()
95105
return nil
96106
}
97107

@@ -104,17 +114,20 @@ func (sq *sizedQueue[T]) pop() (context.Context, T, bool) {
104114

105115
for {
106116
if sq.size > 0 {
107-
ctx, el, elSize := sq.items.pop()
117+
elCtx, el, elSize := sq.items.pop()
108118
sq.size -= elSize
109-
return ctx, el, true
119+
sq.hasMoreSpace.Signal()
120+
return elCtx, el, true
110121
}
111122

112123
if sq.stopped {
113124
var el T
114125
return context.Background(), el, false
115126
}
116127

117-
sq.hasElements.Wait()
128+
// TODO: Change the Queue interface to return an error to allow distinguish between shutdown and context canceled.
129+
// Ok to ignore the error, since the context.Background() will never be done.
130+
_ = sq.hasMoreElements.Wait(context.Background())
118131
}
119132
}
120133

@@ -123,7 +136,7 @@ func (sq *sizedQueue[T]) Shutdown(context.Context) error {
123136
sq.mu.Lock()
124137
defer sq.mu.Unlock()
125138
sq.stopped = true
126-
sq.hasElements.Broadcast()
139+
sq.hasMoreElements.Broadcast()
127140
return nil
128141
}
129142

exporter/exporterqueue/sized_queue_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func (s sizerInt) Sizeof(el int) int64 {
1818
}
1919

2020
func TestSizedQueue(t *testing.T) {
21-
q := newSizedQueue[int](7, sizerInt{})
21+
q := newSizedQueue[int](7, sizerInt{}, false)
2222
require.NoError(t, q.Offer(context.Background(), 1))
2323
assert.Equal(t, 1, q.Size())
2424
assert.Equal(t, 7, q.Capacity())
@@ -47,7 +47,7 @@ func TestSizedQueue(t *testing.T) {
4747
}
4848

4949
func TestSizedQueue_DrainAllElements(t *testing.T) {
50-
q := newSizedQueue[int](7, sizerInt{})
50+
q := newSizedQueue[int](7, sizerInt{}, false)
5151
require.NoError(t, q.Offer(context.Background(), 1))
5252
require.NoError(t, q.Offer(context.Background(), 3))
5353

@@ -68,12 +68,12 @@ func TestSizedQueue_DrainAllElements(t *testing.T) {
6868
}
6969

7070
func TestSizedChannel_OfferInvalidSize(t *testing.T) {
71-
q := newSizedQueue[int](1, sizerInt{})
71+
q := newSizedQueue[int](1, sizerInt{}, false)
7272
require.ErrorIs(t, q.Offer(context.Background(), -1), errInvalidSize)
7373
}
7474

7575
func TestSizedChannel_OfferZeroSize(t *testing.T) {
76-
q := newSizedQueue[int](1, sizerInt{})
76+
q := newSizedQueue[int](1, sizerInt{}, false)
7777
require.NoError(t, q.Offer(context.Background(), 0))
7878
require.NoError(t, q.Shutdown(context.Background()))
7979
// Because the size 0 is ignored, nothing to drain.

0 commit comments

Comments
 (0)