Skip to content

Commit b145575

Browse files
committed
Add capability for memory and persistent queue to block when add items
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent dc9a838 commit b145575

8 files changed

+408
-253
lines changed

.chloggen/add-blocking.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add capability for memory and persistent queue to block when add items
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12074]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterqueue/bounded_memory_queue.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,18 @@ type boundedMemoryQueue[T any] struct {
2323
type memoryQueueSettings[T any] struct {
2424
sizer sizer[T]
2525
capacity int64
26+
blocking bool
2627
}
2728

2829
// newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
2930
// callback for dropped items (e.g. useful to emit metrics).
3031
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
3132
return &boundedMemoryQueue[T]{
32-
sizedQueue: newSizedQueue[T](set.capacity, set.sizer),
33+
sizedQueue: newSizedQueue[T](set.capacity, set.sizer, set.blocking),
3334
}
3435
}
3536

36-
func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
37+
func (q *boundedMemoryQueue[T]) Read(context.Context) (uint64, context.Context, T, bool) {
3738
ctx, req, ok := q.sizedQueue.pop()
3839
return 0, ctx, req, ok
3940
}

exporter/exporterqueue/bounded_memory_queue_test.go

+53-18
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ import (
1616
"github.com/stretchr/testify/require"
1717

1818
"go.opentelemetry.io/collector/component/componenttest"
19-
"go.opentelemetry.io/collector/pdata/ptrace"
20-
"go.opentelemetry.io/collector/pdata/testdata"
2119
)
2220

2321
// In this test we run a queue with capacity 1 and a single consumer.
@@ -102,11 +100,11 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
102100
func TestQueueUsage(t *testing.T) {
103101
tests := []struct {
104102
name string
105-
sizer sizer[ptrace.Traces]
103+
sizer sizer[uint64]
106104
}{
107105
{
108106
name: "requests_based",
109-
sizer: &requestSizer[ptrace.Traces]{},
107+
sizer: &requestSizer[uint64]{},
110108
},
111109
{
112110
name: "items_based",
@@ -115,16 +113,15 @@ func TestQueueUsage(t *testing.T) {
115113
}
116114
for _, tt := range tests {
117115
t.Run(tt.name, func(t *testing.T) {
118-
q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: tt.sizer, capacity: int64(100)})
116+
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100)})
119117
consumed := &atomic.Int64{}
120118
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
121-
ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error {
119+
ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error {
122120
consumed.Add(1)
123121
return nil
124122
})
125-
td := testdata.GenerateTraces(10)
126123
for j := 0; j < 10; j++ {
127-
require.NoError(t, q.Offer(context.Background(), td))
124+
require.NoError(t, q.Offer(context.Background(), uint64(10)))
128125
}
129126
assert.NoError(t, q.Shutdown(context.Background()))
130127
assert.NoError(t, ac.Shutdown(context.Background()))
@@ -133,6 +130,47 @@ func TestQueueUsage(t *testing.T) {
133130
}
134131
}
135132

133+
func TestBlockingQueueUsage(t *testing.T) {
134+
tests := []struct {
135+
name string
136+
sizer sizer[uint64]
137+
}{
138+
{
139+
name: "requests_based",
140+
sizer: &requestSizer[uint64]{},
141+
},
142+
{
143+
name: "items_based",
144+
sizer: &itemsSizer{},
145+
},
146+
}
147+
for _, tt := range tests {
148+
t.Run(tt.name, func(t *testing.T) {
149+
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100), blocking: true})
150+
consumed := &atomic.Int64{}
151+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
152+
ac := newAsyncConsumer(q, 10, func(context.Context, uint64) error {
153+
consumed.Add(1)
154+
return nil
155+
})
156+
wg := &sync.WaitGroup{}
157+
for i := 0; i < 10; i++ {
158+
wg.Add(1)
159+
go func() {
160+
defer wg.Done()
161+
for j := 0; j < 100_000; j++ {
162+
assert.NoError(t, q.Offer(context.Background(), uint64(10)))
163+
}
164+
}()
165+
}
166+
wg.Wait()
167+
assert.NoError(t, q.Shutdown(context.Background()))
168+
assert.NoError(t, ac.Shutdown(context.Background()))
169+
assert.Equal(t, int64(1_000_000), consumed.Load())
170+
})
171+
}
172+
}
173+
136174
func TestZeroSizeNoConsumers(t *testing.T) {
137175
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0})
138176

@@ -149,8 +187,7 @@ func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool
149187
if !ok {
150188
return false
151189
}
152-
consumeErr := consumeFunc(ctx, req)
153-
q.OnProcessingFinished(index, consumeErr)
190+
q.OnProcessingFinished(index, consumeFunc(ctx, req))
154191
return true
155192
}
156193

@@ -170,8 +207,7 @@ func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(cont
170207
if !ok {
171208
return
172209
}
173-
consumeErr := consumeFunc(ctx, req)
174-
q.OnProcessingFinished(index, consumeErr)
210+
q.OnProcessingFinished(index, consumeFunc(ctx, req))
175211
}
176212
}()
177213
}
@@ -187,11 +223,11 @@ func (qc *asyncConsumer) Shutdown(_ context.Context) error {
187223
func BenchmarkOffer(b *testing.B) {
188224
tests := []struct {
189225
name string
190-
sizer sizer[ptrace.Traces]
226+
sizer sizer[uint64]
191227
}{
192228
{
193229
name: "requests_based",
194-
sizer: &requestSizer[ptrace.Traces]{},
230+
sizer: &requestSizer[uint64]{},
195231
},
196232
{
197233
name: "items_based",
@@ -200,18 +236,17 @@ func BenchmarkOffer(b *testing.B) {
200236
}
201237
for _, tt := range tests {
202238
b.Run(tt.name, func(b *testing.B) {
203-
q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: &requestSizer[ptrace.Traces]{}, capacity: int64(10 * b.N)})
239+
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: &requestSizer[uint64]{}, capacity: int64(10 * b.N)})
204240
consumed := &atomic.Int64{}
205241
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
206-
ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error {
242+
ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error {
207243
consumed.Add(1)
208244
return nil
209245
})
210-
td := testdata.GenerateTraces(10)
211246
b.ResetTimer()
212247
b.ReportAllocs()
213248
for j := 0; j < b.N; j++ {
214-
require.NoError(b, q.Offer(context.Background(), td))
249+
require.NoError(b, q.Offer(context.Background(), uint64(10)))
215250
}
216251
assert.NoError(b, q.Shutdown(context.Background()))
217252
assert.NoError(b, ac.Shutdown(context.Background()))

exporter/exporterqueue/cond.go

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
5+
6+
import (
7+
"context"
8+
"sync"
9+
)
10+
11+
// cond is equivalent with sync.Cond, but context.Context aware.
12+
// Which means Wait() will return if context is done before any signal is received.
13+
// Also, it requires the caller to hold the c.L during all calls.
14+
type cond struct {
15+
L sync.Locker
16+
ch chan struct{}
17+
waiting int64
18+
}
19+
20+
func newCond(l sync.Locker) *cond {
21+
return &cond{L: l, ch: make(chan struct{}, 1)}
22+
}
23+
24+
// Signal wakes one goroutine waiting on c, if there is any.
25+
// It requires for the caller to hold c.L during the call.
26+
func (c *cond) Signal() {
27+
if c.waiting == 0 {
28+
return
29+
}
30+
c.waiting--
31+
c.ch <- struct{}{}
32+
}
33+
34+
// Broadcast wakes all goroutines waiting on c.
35+
// It requires for the caller to hold c.L during the call.
36+
func (c *cond) Broadcast() {
37+
for ; c.waiting > 0; c.waiting-- {
38+
c.ch <- struct{}{}
39+
}
40+
}
41+
42+
// Wait atomically unlocks c.L and suspends execution of the calling goroutine. After later resuming execution, Wait locks c.L before returning.
43+
func (c *cond) Wait(ctx context.Context) error {
44+
c.waiting++
45+
c.L.Unlock()
46+
select {
47+
case <-ctx.Done():
48+
c.L.Lock()
49+
if c.waiting == 0 {
50+
// If waiting is 0, it means that there was a signal sent and nobody else waits for it.
51+
// Consume it, so that we don't unblock other consumer unnecessary,
52+
// or we don't block the producer because the channel buffer is full.
53+
<-c.ch
54+
} else {
55+
// Decrease the number of waiting routines.
56+
c.waiting--
57+
}
58+
return ctx.Err()
59+
case <-c.ch:
60+
c.L.Lock()
61+
return nil
62+
}
63+
}

exporter/exporterqueue/persistent_queue.go

+37-29
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"strconv"
1212
"sync"
1313

14-
"go.uber.org/multierr"
1514
"go.uber.org/zap"
1615

1716
"go.opentelemetry.io/collector/component"
@@ -42,6 +41,7 @@ var (
4241
type persistentQueueSettings[T any] struct {
4342
sizer sizer[T]
4443
capacity int64
44+
blocking bool
4545
signal pipeline.Signal
4646
storageID component.ID
4747
marshaler Marshaler[T]
@@ -81,7 +81,8 @@ type persistentQueue[T any] struct {
8181

8282
// mu guards everything declared below.
8383
mu sync.Mutex
84-
hasElements *sync.Cond
84+
hasMoreElements *sync.Cond
85+
hasMoreSpace *cond
8586
readIndex uint64
8687
writeIndex uint64
8788
currentlyDispatchedItems []uint64
@@ -98,7 +99,8 @@ func newPersistentQueue[T any](set persistentQueueSettings[T]) Queue[T] {
9899
logger: set.set.Logger,
99100
isRequestSized: isRequestSized,
100101
}
101-
pq.hasElements = sync.NewCond(&pq.mu)
102+
pq.hasMoreElements = sync.NewCond(&pq.mu)
103+
pq.hasMoreSpace = newCond(&pq.mu)
102104
return pq
103105
}
104106

@@ -194,8 +196,8 @@ func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
194196
backupErr := pq.backupQueueSize(ctx)
195197
// Mark this queue as stopped, so consumer don't start any more work.
196198
pq.stopped = true
197-
pq.hasElements.Broadcast()
198-
return multierr.Combine(backupErr, pq.unrefClient(ctx))
199+
pq.hasMoreElements.Broadcast()
200+
return errors.Join(backupErr, pq.unrefClient(ctx))
199201
}
200202

201203
// backupQueueSize writes the current queue size to storage. The value is used to recover the queue size
@@ -233,8 +235,13 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
233235
// putInternal is the internal version that requires caller to hold the mutex lock.
234236
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
235237
reqSize := pq.set.sizer.Sizeof(req)
236-
if pq.queueSize+reqSize > pq.set.capacity {
237-
return ErrQueueIsFull
238+
for pq.queueSize+reqSize > pq.set.capacity {
239+
if !pq.set.blocking {
240+
return ErrQueueIsFull
241+
}
242+
if err := pq.hasMoreSpace.Wait(ctx); err != nil {
243+
return err
244+
}
238245
}
239246

240247
reqBuf, err := pq.set.marshaler(req)
@@ -253,7 +260,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
253260

254261
pq.writeIndex++
255262
pq.queueSize += reqSize
256-
pq.hasElements.Signal()
263+
pq.hasMoreElements.Signal()
257264

258265
// Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size
259266
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
@@ -269,32 +276,38 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
269276
func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context, T, bool) {
270277
pq.mu.Lock()
271278
defer pq.mu.Unlock()
279+
272280
for {
273281
if pq.stopped {
274282
var req T
275283
return 0, context.Background(), req, false
276284
}
277285

278-
// If queue is empty, wait until more elements and restart.
279-
if pq.readIndex == pq.writeIndex {
280-
pq.hasElements.Wait()
281-
continue
282-
}
283-
284-
index, req, consumed := pq.getNextItem(ctx)
285-
if consumed {
286-
pq.queueSize -= pq.set.sizer.Sizeof(req)
287-
// The size might be not in sync with the queue in case it's restored from the disk
288-
// because we don't flush the current queue size on the disk on every read/write.
289-
// In that case we need to make sure it doesn't go below 0.
290-
if pq.queueSize < 0 {
286+
// Read until either a successful retrieved element or no more elements in the storage.
287+
for pq.readIndex != pq.writeIndex {
288+
index, req, consumed := pq.getNextItem(ctx)
289+
// Ensure the used size and the channel size are in sync.
290+
if pq.readIndex == pq.writeIndex {
291291
pq.queueSize = 0
292+
pq.hasMoreSpace.Signal()
293+
}
294+
if consumed {
295+
pq.queueSize -= pq.set.sizer.Sizeof(req)
296+
// The size might be not in sync with the queue in case it's restored from the disk
297+
// because we don't flush the current queue size on the disk on every read/write.
298+
// In that case we need to make sure it doesn't go below 0.
299+
if pq.queueSize < 0 {
300+
pq.queueSize = 0
301+
}
302+
pq.hasMoreSpace.Signal()
303+
304+
return index, context.Background(), req, true
292305
}
293-
294-
return index, context.Background(), req, true
295306
}
296307

297-
// If we did not consume any element retry from the beginning.
308+
// TODO: Need to change the Queue interface to return an error to allow distinguish between shutdown and context canceled.
309+
// Until then use the sync.Cond.
310+
pq.hasMoreElements.Wait()
298311
}
299312
}
300313

@@ -363,11 +376,6 @@ func (pq *persistentQueue[T]) OnProcessingFinished(index uint64, consumeErr erro
363376
pq.logger.Error("Error writing queue size to storage", zap.Error(qsErr))
364377
}
365378
}
366-
367-
// Ensure the used size and the channel size are in sync.
368-
if pq.readIndex == pq.writeIndex {
369-
pq.queueSize = 0
370-
}
371379
}
372380

373381
// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage

0 commit comments

Comments
 (0)