|
7 | 7 | "context"
|
8 | 8 | "errors"
|
9 | 9 | "fmt"
|
| 10 | + "math" |
| 11 | + "runtime" |
10 | 12 | "time"
|
11 | 13 |
|
12 | 14 | "go.uber.org/zap"
|
@@ -47,6 +49,10 @@ type QueueConfig struct {
|
47 | 49 | // Enabled indicates whether to not enqueue batches before exporting.
|
48 | 50 | Enabled bool `mapstructure:"enabled"`
|
49 | 51 |
|
| 52 | + // WaitForResult determines if incoming requests are blocked until the request is processed or not. |
| 53 | + // Currently, this option is not available when persistent queue is configured using the storage configuration. |
| 54 | + WaitForResult bool `mapstructure:"wait_for_result"` |
| 55 | + |
50 | 56 | // Sizer determines the type of size measurement used by this component.
|
51 | 57 | // It accepts "requests", "items", or "bytes".
|
52 | 58 | Sizer request.SizerType `mapstructure:"sizer"`
|
@@ -99,11 +105,14 @@ func (qCfg *QueueConfig) Validate() error {
|
99 | 105 | return errors.New("`queue_size` must be positive")
|
100 | 106 | }
|
101 | 107 |
|
| 108 | + if qCfg.StorageID != nil && qCfg.WaitForResult { |
| 109 | + return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`") |
| 110 | + } |
| 111 | + |
102 | 112 | // Only support request sizer for persistent queue at this moment.
|
103 | 113 | if qCfg.StorageID != nil && qCfg.Sizer != request.SizerTypeRequests {
|
104 |
| - return errors.New("persistent queue only supports `requests` sizer") |
| 114 | + return errors.New("persistent queue configured with `storage` only supports `requests` sizer") |
105 | 115 | }
|
106 |
| - |
107 | 116 | return nil
|
108 | 117 | }
|
109 | 118 |
|
@@ -133,20 +142,43 @@ func NewQueueSender(
|
133 | 142 | }
|
134 | 143 |
|
135 | 144 | func newQueueBatchConfig(qCfg QueueConfig, bCfg BatcherConfig) queuebatch.Config {
|
136 |
| - qbCfg := queuebatch.Config{ |
137 |
| - Enabled: true, |
138 |
| - WaitForResult: !qCfg.Enabled, |
139 |
| - Sizer: qCfg.Sizer, |
140 |
| - QueueSize: qCfg.QueueSize, |
141 |
| - NumConsumers: qCfg.NumConsumers, |
142 |
| - BlockOnOverflow: qCfg.BlockOnOverflow, |
143 |
| - StorageID: qCfg.StorageID, |
144 |
| - } |
145 |
| - if bCfg.Enabled { |
146 |
| - qbCfg.Batch = &queuebatch.BatchConfig{ |
147 |
| - FlushTimeout: bCfg.FlushTimeout, |
148 |
| - MinSize: bCfg.MinSize, |
149 |
| - MaxSize: bCfg.MaxSize, |
| 145 | + var qbCfg queuebatch.Config |
| 146 | + // User configured queueing, copy all config. |
| 147 | + if qCfg.Enabled { |
| 148 | + qbCfg = queuebatch.Config{ |
| 149 | + Enabled: true, |
| 150 | + WaitForResult: qCfg.WaitForResult, |
| 151 | + Sizer: qCfg.Sizer, |
| 152 | + QueueSize: qCfg.QueueSize, |
| 153 | + NumConsumers: qCfg.NumConsumers, |
| 154 | + BlockOnOverflow: qCfg.BlockOnOverflow, |
| 155 | + StorageID: qCfg.StorageID, |
| 156 | + // TODO: Copy batching configuration as well when available. |
| 157 | + } |
| 158 | + // TODO: Remove this when WithBatcher is removed. |
| 159 | + if bCfg.Enabled { |
| 160 | + qbCfg.Batch = &queuebatch.BatchConfig{ |
| 161 | + FlushTimeout: bCfg.FlushTimeout, |
| 162 | + MinSize: bCfg.MinSize, |
| 163 | + MaxSize: bCfg.MaxSize, |
| 164 | + } |
| 165 | + } |
| 166 | + } else { |
| 167 | + // This can happen only if the deprecated way to configure batching is used with a "disabled" queue. |
| 168 | + // TODO: Remove this when WithBatcher is removed. |
| 169 | + qbCfg = queuebatch.Config{ |
| 170 | + Enabled: true, |
| 171 | + WaitForResult: true, |
| 172 | + Sizer: request.SizerTypeRequests, |
| 173 | + QueueSize: math.MaxInt, |
| 174 | + NumConsumers: runtime.NumCPU(), |
| 175 | + BlockOnOverflow: true, |
| 176 | + StorageID: nil, |
| 177 | + Batch: &queuebatch.BatchConfig{ |
| 178 | + FlushTimeout: bCfg.FlushTimeout, |
| 179 | + MinSize: bCfg.MinSize, |
| 180 | + MaxSize: bCfg.MaxSize, |
| 181 | + }, |
150 | 182 | }
|
151 | 183 | }
|
152 | 184 | return qbCfg
|
|
0 commit comments