Skip to content

Commit cc2374a

Browse files
committed
Add blocking option to control queue behavior when full
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 71aae79 commit cc2374a

File tree

7 files changed

+40
-5
lines changed

7 files changed

+40
-5
lines changed

.chloggen/add-blocking-option.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add blocking option to control queue behavior when full
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12090]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

exporter/exporterhelper/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ The following configuration options can be modified:
1717
- `enabled` (default = true)
1818
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
1919
- `queue_size` (default = 1000): Maximum number of batches kept in memory before dropping; ignored if `enabled` is `false`
20+
- `blocking` (default = false): If true blocks until queue has space for the request otherwise returns immediately; ignored if `enabled` is `false`
2021
User should calculate this as `num_seconds * requests_per_second / requests_per_batch` where:
2122
- `num_seconds` is the number of seconds to buffer in case of a backend outage
2223
- `requests_per_second` is the average number of requests per seconds

exporter/exporterhelper/internal/base_exporter.go

+1
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ func WithQueue(config QueueConfig) Option {
222222
Enabled: config.Enabled,
223223
NumConsumers: config.NumConsumers,
224224
QueueSize: config.QueueSize,
225+
Blocking: config.Blocking,
225226
}
226227
o.queueFactory = exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{
227228
Marshaler: o.Marshaler,

exporter/exporterhelper/internal/queue_sender.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ import (
2020
"go.opentelemetry.io/collector/exporter/internal/queue"
2121
)
2222

23-
const defaultQueueSize = 1000
24-
2523
// QueueConfig defines configuration for queueing batches before sending to the consumerSender.
2624
type QueueConfig struct {
2725
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
@@ -32,6 +30,9 @@ type QueueConfig struct {
3230
NumConsumers int `mapstructure:"num_consumers"`
3331
// QueueSize is the maximum number of batches allowed in queue at a given time.
3432
QueueSize int `mapstructure:"queue_size"`
33+
// Blocking controls the queue behavior when full.
34+
// If true it blocks until enough space to add the new request to the queue.
35+
Blocking bool `mapstructure:"blocking"`
3536
// StorageID if not empty, enables the persistent storage and uses the component specified
3637
// as a storage extension for the persistent queue
3738
StorageID *component.ID `mapstructure:"storage"`
@@ -45,7 +46,8 @@ func NewDefaultQueueConfig() QueueConfig {
4546
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
4647
// This can be estimated at 1-4 GB worth of maximum memory usage
4748
// This default is probably still too high, and may be adjusted further down in a future release
48-
QueueSize: defaultQueueSize,
49+
QueueSize: 1_000,
50+
Blocking: false,
4951
}
5052
}
5153

exporter/exporterhelper/internal/queue_sender_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
261261
require.NoError(t, err)
262262
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
263263

264-
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize)))
264+
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(1000)))
265265

266266
for i := 0; i < 7; i++ {
267267
require.NoError(t, be.Send(context.Background(), newErrorRequest()))
@@ -271,7 +271,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
271271

272272
assert.NoError(t, be.Shutdown(context.Background()))
273273
// metrics should be unregistered at shutdown to prevent memory leak
274-
require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize)))
274+
require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(1000)))
275275
require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7),
276276
attribute.String(DataTypeKey, dataType.String())))
277277
}

exporter/exporterqueue/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ type Config struct {
2020
NumConsumers int `mapstructure:"num_consumers"`
2121
// QueueSize is the maximum number of requests allowed in queue at any given time.
2222
QueueSize int `mapstructure:"queue_size"`
23+
// Blocking controls the queue behavior when full.
24+
// If true it blocks until enough space to add the new request to the queue.
25+
Blocking bool `mapstructure:"blocking"`
2326
}
2427

2528
// NewDefaultConfig returns the default Config.
@@ -30,6 +33,7 @@ func NewDefaultConfig() Config {
3033
Enabled: true,
3134
NumConsumers: 10,
3235
QueueSize: 1_000,
36+
Blocking: true,
3337
}
3438
}
3539

exporter/exporterqueue/queue.go

+2
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func NewMemoryQueueFactory[T any]() Factory[T] {
6969
return newBoundedMemoryQueue[T](memoryQueueSettings[T]{
7070
sizer: &requestSizer[T]{},
7171
capacity: int64(cfg.QueueSize),
72+
blocking: cfg.Blocking,
7273
})
7374
}
7475
}
@@ -95,6 +96,7 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P
9596
return newPersistentQueue[T](persistentQueueSettings[T]{
9697
sizer: &requestSizer[T]{},
9798
capacity: int64(cfg.QueueSize),
99+
blocking: cfg.Blocking,
98100
signal: set.Signal,
99101
storageID: *storageID,
100102
marshaler: factorySettings.Marshaler,

0 commit comments

Comments
 (0)