Skip to content

Commit 2b5fa0e

Browse files
authored
Add support to configure min GC intervals for soft and hard limits (#12450)
When servers are in a high memory usage mode, the amount of times GC is called creates a high CPU usage which combined with a high ingest rate limits capability to offload existing queued data. Configuring the minimum interval even for hard limit, allows the system to spend some CPU cycles between GCs to offload old data from queues/batch processor. The most amount of data I've seen accumulated are blocked in the batch processor queue (incoming channel) not in the exporter queue. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 362db71 commit 2b5fa0e

File tree

10 files changed

+229
-82
lines changed

10 files changed

+229
-82
lines changed

.chloggen/add-support-for-gc.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: memorylimiter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support to configure min GC intervals for soft and hard limits.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12450]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

extension/memorylimiterextension/factory.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.opentelemetry.io/collector/component"
1212
"go.opentelemetry.io/collector/extension"
1313
"go.opentelemetry.io/collector/extension/memorylimiterextension/internal/metadata"
14+
"go.opentelemetry.io/collector/internal/memorylimiter"
1415
)
1516

1617
// NewFactory returns a new factory for the Memory Limiter extension.
@@ -25,7 +26,7 @@ func NewFactory() extension.Factory {
2526
// CreateDefaultConfig creates the default configuration for extension. Notice
2627
// that the default configuration is expected to fail for this extension.
2728
func createDefaultConfig() component.Config {
28-
return &Config{}
29+
return memorylimiter.NewDefaultConfig()
2930
}
3031

3132
func create(_ context.Context, set extension.Settings, cfg component.Config) (extension.Extension, error) {

extension/memorylimiterextension/memorylimiter_test.go

+9-19
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/stretchr/testify/require"
1414
"go.uber.org/zap"
1515

16-
"go.opentelemetry.io/collector/component"
16+
"go.opentelemetry.io/collector/component/componenttest"
1717
"go.opentelemetry.io/collector/internal/memorylimiter"
1818
"go.opentelemetry.io/collector/internal/memorylimiter/iruntime"
1919
)
@@ -70,14 +70,20 @@ func TestMemoryPressureResponse(t *testing.T) {
7070
}
7171
for _, tt := range tests {
7272
t.Run(tt.name, func(t *testing.T) {
73-
memorylimiter.GetMemoryFn = totalMemory
73+
memorylimiter.GetMemoryFn = func() (uint64, error) {
74+
return uint64(2048), nil
75+
}
7476
memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) {
7577
ms.Alloc = tt.memAlloc
7678
}
79+
t.Cleanup(func() {
80+
memorylimiter.GetMemoryFn = iruntime.TotalMemory
81+
memorylimiter.ReadMemStatsFn = runtime.ReadMemStats
82+
})
7783
ml, err := newMemoryLimiter(tt.mlCfg, zap.NewNop())
7884
assert.NoError(t, err)
7985

80-
assert.NoError(t, ml.Start(ctx, &mockHost{}))
86+
assert.NoError(t, ml.Start(ctx, componenttest.NewNopHost()))
8187
ml.memLimiter.CheckMemLimits()
8288
mustRefuse := ml.MustRefuse()
8389
if tt.expectError {
@@ -88,20 +94,4 @@ func TestMemoryPressureResponse(t *testing.T) {
8894
assert.NoError(t, ml.Shutdown(ctx))
8995
})
9096
}
91-
t.Cleanup(func() {
92-
memorylimiter.GetMemoryFn = iruntime.TotalMemory
93-
memorylimiter.ReadMemStatsFn = runtime.ReadMemStats
94-
})
95-
}
96-
97-
type mockHost struct {
98-
component.Host
99-
}
100-
101-
func (h *mockHost) GetExtensions() map[component.ID]component.Component {
102-
return make(map[component.ID]component.Component)
103-
}
104-
105-
func totalMemory() (uint64, error) {
106-
return uint64(2048), nil
10797
}

internal/memorylimiter/config.go

+20
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
var (
1414
errCheckIntervalOutOfRange = errors.New("'check_interval' must be greater than zero")
15+
errInconsistentGCMinInterval = errors.New("'min_gc_interval_when_soft_limited' should be larger than 'min_gc_interval_when_hard_limited'")
1516
errLimitOutOfRange = errors.New("'limit_mib' or 'limit_percentage' must be greater than zero")
1617
errSpikeLimitOutOfRange = errors.New("'spike_limit_mib' must be smaller than 'limit_mib'")
1718
errSpikeLimitPercentageOutOfRange = errors.New("'spike_limit_percentage' must be smaller than 'limit_percentage'")
@@ -26,6 +27,16 @@ type Config struct {
2627
// checks will be performed.
2728
CheckInterval time.Duration `mapstructure:"check_interval"`
2829

30+
// MinGCIntervalWhenSoftLimited minimum interval between forced GC when in soft (=limit_mib - spike_limit_mib) limited mode.
31+
// Zero value means no minimum interval.
32+
// GCs is a CPU-heavy operation and executing it too frequently may affect the recovery capabilities of the collector.
33+
MinGCIntervalWhenSoftLimited time.Duration `mapstructure:"min_gc_interval_when_soft_limited"`
34+
35+
// MinGCIntervalWhenHardLimited minimum interval between forced GC when in hard (=limit_mib) limited mode.
36+
// Zero value means no minimum interval.
37+
// GCs is a CPU-heavy operation and executing it too frequently may affect the recovery capabilities of the collector.
38+
MinGCIntervalWhenHardLimited time.Duration `mapstructure:"min_gc_interval_when_hard_limited"`
39+
2940
// MemoryLimitMiB is the maximum amount of memory, in MiB, targeted to be
3041
// allocated by the process.
3142
MemoryLimitMiB uint32 `mapstructure:"limit_mib"`
@@ -45,11 +56,20 @@ type Config struct {
4556

4657
var _ component.Config = (*Config)(nil)
4758

59+
func NewDefaultConfig() *Config {
60+
return &Config{
61+
MinGCIntervalWhenSoftLimited: 10 * time.Second,
62+
}
63+
}
64+
4865
// Validate checks if the processor configuration is valid
4966
func (cfg *Config) Validate() error {
5067
if cfg.CheckInterval <= 0 {
5168
return errCheckIntervalOutOfRange
5269
}
70+
if cfg.MinGCIntervalWhenSoftLimited < cfg.MinGCIntervalWhenHardLimited {
71+
return errInconsistentGCMinInterval
72+
}
5373
if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 {
5474
return errLimitOutOfRange
5575
}

internal/memorylimiter/config_test.go

+11
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,17 @@ func TestConfigValidate(t *testing.T) {
8484
},
8585
err: errSpikeLimitPercentageOutOfRange,
8686
},
87+
{
88+
name: "invalid gc intervals",
89+
cfg: &Config{
90+
CheckInterval: 100 * time.Millisecond,
91+
MinGCIntervalWhenSoftLimited: 50 * time.Millisecond,
92+
MinGCIntervalWhenHardLimited: 100 * time.Millisecond,
93+
MemoryLimitMiB: 5722,
94+
MemorySpikeLimitMiB: 1907,
95+
},
96+
err: errInconsistentGCMinInterval,
97+
},
8798
}
8899
for _, tt := range tests {
89100
t.Run(tt.name, func(t *testing.T) {

internal/memorylimiter/go.mod

+3-1
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,18 @@ require (
2525
github.com/mitchellh/reflectwalk v1.0.2 // indirect
2626
github.com/pmezard/go-difflib v1.0.0 // indirect
2727
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
28+
github.com/rogpeppe/go-internal v1.13.1 // indirect
2829
github.com/tklauser/go-sysconf v0.3.12 // indirect
2930
github.com/tklauser/numcpus v0.6.1 // indirect
3031
github.com/yusufpapurcu/wmi v1.2.4 // indirect
3132
go.opentelemetry.io/collector/pdata v1.26.0 // indirect
3233
go.opentelemetry.io/otel v1.34.0 // indirect
3334
go.opentelemetry.io/otel/metric v1.34.0 // indirect
35+
go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect
3436
go.opentelemetry.io/otel/trace v1.34.0 // indirect
3537
go.uber.org/multierr v1.11.0 // indirect
3638
golang.org/x/net v0.33.0 // indirect
37-
golang.org/x/sys v0.28.0 // indirect
39+
golang.org/x/sys v0.29.0 // indirect
3840
golang.org/x/text v0.21.0 // indirect
3941
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
4042
google.golang.org/grpc v1.70.0 // indirect

internal/memorylimiter/go.sum

+8-8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/memorylimiter/memorylimiter.go

+42-41
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@ import (
2020

2121
const (
2222
mibBytes = 1024 * 1024
23-
24-
// Minimum interval between forced GC when in soft limited mode. We don't want to
25-
// do GCs too frequently since it is a CPU-heavy operation.
26-
minGCIntervalWhenSoftLimited = 10 * time.Second
2723
)
2824

2925
var (
@@ -50,11 +46,14 @@ type MemoryLimiter struct {
5046

5147
ticker *time.Ticker
5248

53-
lastGCDone time.Time
49+
minGCIntervalWhenSoftLimited time.Duration
50+
minGCIntervalWhenHardLimited time.Duration
51+
lastGCDone time.Time
5452

55-
// The function to read the mem values is set as a reference to help with
53+
// The functions to read the mem values and run GC are set as a reference to help with
5654
// testing different values.
5755
readMemStatsFn func(m *runtime.MemStats)
56+
runGCFn func()
5857

5958
// Fields used for logging.
6059
logger *zap.Logger
@@ -78,18 +77,20 @@ func NewMemoryLimiter(cfg *Config, logger *zap.Logger) (*MemoryLimiter, error) {
7877
zap.Duration("check_interval", cfg.CheckInterval))
7978

8079
return &MemoryLimiter{
81-
usageChecker: *usageChecker,
82-
memCheckWait: cfg.CheckInterval,
83-
ticker: time.NewTicker(cfg.CheckInterval),
84-
readMemStatsFn: ReadMemStatsFn,
85-
logger: logger,
86-
mustRefuse: &atomic.Bool{},
80+
usageChecker: *usageChecker,
81+
memCheckWait: cfg.CheckInterval,
82+
ticker: time.NewTicker(cfg.CheckInterval),
83+
minGCIntervalWhenSoftLimited: cfg.MinGCIntervalWhenSoftLimited,
84+
minGCIntervalWhenHardLimited: cfg.MinGCIntervalWhenHardLimited,
85+
lastGCDone: time.Now(),
86+
readMemStatsFn: ReadMemStatsFn,
87+
runGCFn: runtime.GC,
88+
logger: logger,
89+
mustRefuse: &atomic.Bool{},
8790
}, nil
8891
}
8992

90-
// startMonitoring starts a single ticker'd goroutine per instance
91-
// that will check memory usage every checkInterval period.
92-
func (ml *MemoryLimiter) startMonitoring() {
93+
func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error {
9394
ml.refCounterLock.Lock()
9495
defer ml.refCounterLock.Unlock()
9596

@@ -110,10 +111,6 @@ func (ml *MemoryLimiter) startMonitoring() {
110111
}
111112
}()
112113
}
113-
}
114-
115-
func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error {
116-
ml.startMonitoring()
117114
return nil
118115
}
119116

@@ -167,7 +164,7 @@ func memstatToZapField(ms *runtime.MemStats) zap.Field {
167164
}
168165

169166
func (ml *MemoryLimiter) doGCandReadMemStats() *runtime.MemStats {
170-
runtime.GC()
167+
ml.runGCFn()
171168
ml.lastGCDone = time.Now()
172169
ms := ml.readMemStats()
173170
ml.logger.Info("Memory usage after GC.", memstatToZapField(ms))
@@ -180,38 +177,42 @@ func (ml *MemoryLimiter) CheckMemLimits() {
180177

181178
ml.logger.Debug("Currently used memory.", memstatToZapField(ms))
182179

183-
if ml.usageChecker.aboveHardLimit(ms) {
184-
ml.logger.Warn("Memory usage is above hard limit. Forcing a GC.", memstatToZapField(ms))
185-
ms = ml.doGCandReadMemStats()
186-
}
187-
188-
// Remember current state.
189-
wasRefusing := ml.mustRefuse.Load()
190-
191-
// Check if the memory usage is above the soft limit.
192-
mustRefuse := ml.usageChecker.aboveSoftLimit(ms)
193-
194-
if wasRefusing && !mustRefuse {
195-
// Was previously refusing but enough memory is available now, no need to limit.
196-
ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms))
180+
// Check if we are below the soft limit.
181+
aboveSoftLimit := ml.usageChecker.aboveSoftLimit(ms)
182+
if !aboveSoftLimit {
183+
if ml.mustRefuse.Load() {
184+
// Was previously refusing but enough memory is available now, no need to limit.
185+
ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms))
186+
}
187+
ml.mustRefuse.Store(aboveSoftLimit)
188+
return
197189
}
198190

199-
if !wasRefusing && mustRefuse {
191+
if ml.usageChecker.aboveHardLimit(ms) {
192+
// We are above hard limit, do a GC if it wasn't done recently and see if
193+
// it brings memory usage below the soft limit.
194+
if time.Since(ml.lastGCDone) > ml.minGCIntervalWhenHardLimited {
195+
ml.logger.Warn("Memory usage is above hard limit. Forcing a GC.", memstatToZapField(ms))
196+
ms = ml.doGCandReadMemStats()
197+
// Check the limit again to see if GC helped.
198+
aboveSoftLimit = ml.usageChecker.aboveSoftLimit(ms)
199+
}
200+
} else {
200201
// We are above soft limit, do a GC if it wasn't done recently and see if
201202
// it brings memory usage below the soft limit.
202-
if time.Since(ml.lastGCDone) > minGCIntervalWhenSoftLimited {
203+
if time.Since(ml.lastGCDone) > ml.minGCIntervalWhenSoftLimited {
203204
ml.logger.Info("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms))
204205
ms = ml.doGCandReadMemStats()
205206
// Check the limit again to see if GC helped.
206-
mustRefuse = ml.usageChecker.aboveSoftLimit(ms)
207+
aboveSoftLimit = ml.usageChecker.aboveSoftLimit(ms)
207208
}
209+
}
208210

209-
if mustRefuse {
210-
ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms))
211-
}
211+
if !ml.mustRefuse.Load() && aboveSoftLimit {
212+
ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms))
212213
}
213214

214-
ml.mustRefuse.Store(mustRefuse)
215+
ml.mustRefuse.Store(aboveSoftLimit)
215216
}
216217

217218
type memUsageChecker struct {

0 commit comments

Comments
 (0)