@@ -35,6 +35,7 @@ import (
35
35
"go.opentelemetry.io/collector/config/configtelemetry"
36
36
"go.opentelemetry.io/collector/config/experimental/configsource"
37
37
"go.opentelemetry.io/collector/consumer/consumererror"
38
+ "go.opentelemetry.io/collector/extension/ballastextension"
38
39
"go.opentelemetry.io/collector/internal/collector/telemetry"
39
40
"go.opentelemetry.io/collector/service/internal/builder"
40
41
"go.opentelemetry.io/collector/service/parserprovider"
@@ -286,30 +287,37 @@ func (col *Collector) execute(ctx context.Context) error {
286
287
)
287
288
col .stateChannel <- Starting
288
289
289
- // Set memory ballast
290
- ballast , ballastSizeBytes := col .createMemoryBallast ()
290
+ // Add `mem-ballast-size-mib` warning message if it is still enabled
291
+ // TODO: will remove all `mem-ballast-size-mib` footprints after some baking time.
292
+ if builder .MemBallastSize () > 0 {
293
+ col .logger .Warn ("`mem-ballast-size-mib` command line option has been deprecated. Please use `ballast extension` instead!" )
294
+ }
291
295
292
296
col .asyncErrorChannel = make (chan error )
293
297
294
- // Setup everything.
295
- err := col .setupTelemetry (ballastSizeBytes )
298
+ err := col .setupConfigurationComponents (ctx )
296
299
if err != nil {
297
300
return err
298
301
}
299
302
300
- err = col .setupConfigurationComponents (ctx )
303
+ // Get ballastSizeBytes if ballast extension is enabled
304
+ ballastSizeBytes := col .getBallastSize ()
305
+
306
+ // Setup Telemetry.
307
+ err = col .setupTelemetry (ballastSizeBytes )
301
308
if err != nil {
302
309
return err
303
310
}
304
311
312
+ col .service .GetExtensions ()
313
+
305
314
// Everything is ready, now run until an event requiring shutdown happens.
306
315
col .runAndWaitForShutdownEvent ()
307
316
308
317
// Accumulate errors and proceed with shutting down remaining components.
309
318
var errs []error
310
319
311
320
// Begin shutdown sequence.
312
- runtime .KeepAlive (ballast )
313
321
col .logger .Info ("Starting shutdown..." )
314
322
315
323
if closable , ok := col .parserProvider .(parserprovider.Closeable ); ok {
@@ -335,17 +343,6 @@ func (col *Collector) execute(ctx context.Context) error {
335
343
return consumererror .Combine (errs )
336
344
}
337
345
338
- func (col * Collector ) createMemoryBallast () ([]byte , uint64 ) {
339
- ballastSizeMiB := builder .MemBallastSize ()
340
- if ballastSizeMiB > 0 {
341
- ballastSizeBytes := uint64 (ballastSizeMiB ) * 1024 * 1024
342
- ballast := make ([]byte , ballastSizeBytes )
343
- col .logger .Info ("Using memory ballast" , zap .Int ("MiBs" , ballastSizeMiB ))
344
- return ballast , ballastSizeBytes
345
- }
346
- return nil , 0
347
- }
348
-
349
346
// reloadService shutdowns the current col.service and setups a new one according
350
347
// to the latest configuration. It requires that col.parserProvider and col.factories
351
348
// are properly populated to finish successfully.
@@ -370,3 +367,15 @@ func (col *Collector) reloadService(ctx context.Context) error {
370
367
371
368
return nil
372
369
}
370
+
371
+ func (col * Collector ) getBallastSize () uint64 {
372
+ var ballastSize uint64
373
+ extensions := col .service .GetExtensions ()
374
+ for _ , extension := range extensions {
375
+ if ext , ok := extension .(* ballastextension.MemoryBallast ); ok {
376
+ ballastSize = ext .GetBallastSize ()
377
+ break
378
+ }
379
+ }
380
+ return ballastSize
381
+ }
0 commit comments