@@ -16,8 +16,6 @@ import (
16
16
"github.com/stretchr/testify/require"
17
17
18
18
"go.opentelemetry.io/collector/component/componenttest"
19
- "go.opentelemetry.io/collector/pdata/ptrace"
20
- "go.opentelemetry.io/collector/pdata/testdata"
21
19
)
22
20
23
21
// In this test we run a queue with capacity 1 and a single consumer.
@@ -102,11 +100,11 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
102
100
func TestQueueUsage (t * testing.T ) {
103
101
tests := []struct {
104
102
name string
105
- sizer sizer [ptrace. Traces ]
103
+ sizer sizer [uint64 ]
106
104
}{
107
105
{
108
106
name : "requests_based" ,
109
- sizer : & requestSizer [ptrace. Traces ]{},
107
+ sizer : & requestSizer [uint64 ]{},
110
108
},
111
109
{
112
110
name : "items_based" ,
@@ -115,16 +113,15 @@ func TestQueueUsage(t *testing.T) {
115
113
}
116
114
for _ , tt := range tests {
117
115
t .Run (tt .name , func (t * testing.T ) {
118
- q := newBoundedMemoryQueue [ptrace. Traces ](memoryQueueSettings [ptrace. Traces ]{sizer : tt .sizer , capacity : int64 (100 )})
116
+ q := newBoundedMemoryQueue [uint64 ](memoryQueueSettings [uint64 ]{sizer : tt .sizer , capacity : int64 (100 )})
119
117
consumed := & atomic.Int64 {}
120
118
require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
121
- ac := newAsyncConsumer (q , 1 , func (context.Context , ptrace. Traces ) error {
119
+ ac := newAsyncConsumer (q , 1 , func (context.Context , uint64 ) error {
122
120
consumed .Add (1 )
123
121
return nil
124
122
})
125
- td := testdata .GenerateTraces (10 )
126
123
for j := 0 ; j < 10 ; j ++ {
127
- require .NoError (t , q .Offer (context .Background (), td ))
124
+ require .NoError (t , q .Offer (context .Background (), uint64 ( 10 ) ))
128
125
}
129
126
assert .NoError (t , q .Shutdown (context .Background ()))
130
127
assert .NoError (t , ac .Shutdown (context .Background ()))
@@ -133,6 +130,47 @@ func TestQueueUsage(t *testing.T) {
133
130
}
134
131
}
135
132
133
+ func TestBlockingQueueUsage (t * testing.T ) {
134
+ tests := []struct {
135
+ name string
136
+ sizer sizer [uint64 ]
137
+ }{
138
+ {
139
+ name : "requests_based" ,
140
+ sizer : & requestSizer [uint64 ]{},
141
+ },
142
+ {
143
+ name : "items_based" ,
144
+ sizer : & itemsSizer {},
145
+ },
146
+ }
147
+ for _ , tt := range tests {
148
+ t .Run (tt .name , func (t * testing.T ) {
149
+ q := newBoundedMemoryQueue [uint64 ](memoryQueueSettings [uint64 ]{sizer : tt .sizer , capacity : int64 (100 ), blocking : true })
150
+ consumed := & atomic.Int64 {}
151
+ require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
152
+ ac := newAsyncConsumer (q , 10 , func (context.Context , uint64 ) error {
153
+ consumed .Add (1 )
154
+ return nil
155
+ })
156
+ wg := & sync.WaitGroup {}
157
+ for i := 0 ; i < 10 ; i ++ {
158
+ wg .Add (1 )
159
+ go func () {
160
+ defer wg .Done ()
161
+ for j := 0 ; j < 100_000 ; j ++ {
162
+ assert .NoError (t , q .Offer (context .Background (), uint64 (10 )))
163
+ }
164
+ }()
165
+ }
166
+ wg .Wait ()
167
+ assert .NoError (t , q .Shutdown (context .Background ()))
168
+ assert .NoError (t , ac .Shutdown (context .Background ()))
169
+ assert .Equal (t , int64 (1_000_000 ), consumed .Load ())
170
+ })
171
+ }
172
+ }
173
+
136
174
func TestZeroSizeNoConsumers (t * testing.T ) {
137
175
q := newBoundedMemoryQueue [string ](memoryQueueSettings [string ]{sizer : & requestSizer [string ]{}, capacity : 0 })
138
176
@@ -149,8 +187,7 @@ func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool
149
187
if ! ok {
150
188
return false
151
189
}
152
- consumeErr := consumeFunc (ctx , req )
153
- q .OnProcessingFinished (index , consumeErr )
190
+ q .OnProcessingFinished (index , consumeFunc (ctx , req ))
154
191
return true
155
192
}
156
193
@@ -170,8 +207,7 @@ func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(cont
170
207
if ! ok {
171
208
return
172
209
}
173
- consumeErr := consumeFunc (ctx , req )
174
- q .OnProcessingFinished (index , consumeErr )
210
+ q .OnProcessingFinished (index , consumeFunc (ctx , req ))
175
211
}
176
212
}()
177
213
}
@@ -187,11 +223,11 @@ func (qc *asyncConsumer) Shutdown(_ context.Context) error {
187
223
func BenchmarkOffer (b * testing.B ) {
188
224
tests := []struct {
189
225
name string
190
- sizer sizer [ptrace. Traces ]
226
+ sizer sizer [uint64 ]
191
227
}{
192
228
{
193
229
name : "requests_based" ,
194
- sizer : & requestSizer [ptrace. Traces ]{},
230
+ sizer : & requestSizer [uint64 ]{},
195
231
},
196
232
{
197
233
name : "items_based" ,
@@ -200,18 +236,17 @@ func BenchmarkOffer(b *testing.B) {
200
236
}
201
237
for _ , tt := range tests {
202
238
b .Run (tt .name , func (b * testing.B ) {
203
- q := newBoundedMemoryQueue [ptrace. Traces ](memoryQueueSettings [ptrace. Traces ]{sizer : & requestSizer [ptrace. Traces ]{}, capacity : int64 (10 * b .N )})
239
+ q := newBoundedMemoryQueue [uint64 ](memoryQueueSettings [uint64 ]{sizer : & requestSizer [uint64 ]{}, capacity : int64 (10 * b .N )})
204
240
consumed := & atomic.Int64 {}
205
241
require .NoError (b , q .Start (context .Background (), componenttest .NewNopHost ()))
206
- ac := newAsyncConsumer (q , 1 , func (context.Context , ptrace. Traces ) error {
242
+ ac := newAsyncConsumer (q , 1 , func (context.Context , uint64 ) error {
207
243
consumed .Add (1 )
208
244
return nil
209
245
})
210
- td := testdata .GenerateTraces (10 )
211
246
b .ResetTimer ()
212
247
b .ReportAllocs ()
213
248
for j := 0 ; j < b .N ; j ++ {
214
- require .NoError (b , q .Offer (context .Background (), td ))
249
+ require .NoError (b , q .Offer (context .Background (), uint64 ( 10 ) ))
215
250
}
216
251
assert .NoError (b , q .Shutdown (context .Background ()))
217
252
assert .NoError (b , ac .Shutdown (context .Background ()))
0 commit comments