Skip to content

Add resource metrics cache cleanup #39957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .chloggen/add-resource-metrics-cache-cleanup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewritereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add cache cleanup mechanism for resource metrics in prometheusremotewritereceiver.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37277]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: To avoid memory leaks, the receiver will now clean up the internal cache of resource metrics
periodically. For now the cleanup interval is not configurable and is set to 5 minutes.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
87 changes: 78 additions & 9 deletions receiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,28 @@ import (
)

func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) {
cache := newCache(5 * time.Minute)

return &prometheusRemoteWriteReceiver{
settings: settings,
nextConsumer: nextConsumer,
config: cfg,
server: &http.Server{
ReadTimeout: 60 * time.Second,
},
rmCache: make(map[uint64]pmetric.ResourceMetrics),
rmCache: cache,
}, nil
}

type prometheusRemoteWriteReceiver struct {
settings receiver.Settings
nextConsumer consumer.Metrics

config *Config
server *http.Server
wg sync.WaitGroup
rmCache map[uint64]pmetric.ResourceMetrics
config *Config
server *http.Server
wg sync.WaitGroup

rmCache *rmCache
}

// MetricIdentity contains all the components that uniquely identify a metric
Expand Down Expand Up @@ -91,6 +94,70 @@ func (mi MetricIdentity) Hash() uint64 {
return xxhash.Sum64String(combined)
}

// rmCache is exclusive to write and read operations over a map of resource hashes to ResourceMetrics.
type rmCache struct {
// TODO(@perebaj) create a mechanism to clean up the cache based on a max size
// data is a map of resource hashes to ResourceMetrics
data map[uint64]pmetric.ResourceMetrics
// CleanupInterval is the interval at which the cache will be cleaned up
CleanupInterval time.Duration
// To avoid race conditions between r/w operations and the cleanup goroutine, we need to use a mutex.
// With that we are able to block the cache when accessing it.
// The RWMutex has been chosen because readers don't block each other, but writers block readers.
mutex sync.RWMutex
// stopChan works as signal to stop the cleanup goroutine if something happens in the receiver server
stopChan chan struct{}
}

func newCache(cleanupInterval time.Duration) *rmCache {
c := &rmCache{
data: make(map[uint64]pmetric.ResourceMetrics),
CleanupInterval: cleanupInterval,
stopChan: make(chan struct{}),
}

go c.startCleanup()

return c
}

func (c *rmCache) startCleanup() {
ticker := time.NewTicker(c.CleanupInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
c.cleanupAll()
case <-c.stopChan:
return
}
}
}

func (c *rmCache) Stop() {
close(c.stopChan)
}

func (c *rmCache) cleanupAll() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.data = make(map[uint64]pmetric.ResourceMetrics)
}

func (c *rmCache) get(key uint64) (pmetric.ResourceMetrics, bool) {
c.mutex.RLock()
defer c.mutex.RUnlock()
rm, ok := c.data[key]
return rm, ok
}

func (c *rmCache) set(key uint64, rm pmetric.ResourceMetrics) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.data[key] = rm
}

func (prw *prometheusRemoteWriteReceiver) Start(ctx context.Context, host component.Host) error {
mux := http.NewServeMux()
mux.HandleFunc("/api/v1/write", prw.handlePRW)
Expand All @@ -108,6 +175,7 @@ func (prw *prometheusRemoteWriteReceiver) Start(ctx context.Context, host compon
prw.wg.Add(1)
go func() {
defer prw.wg.Done()
defer prw.rmCache.Stop()
if err := prw.server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(fmt.Errorf("error starting prometheus remote-write receiver: %w", err)))
}
Expand Down Expand Up @@ -237,7 +305,7 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
var rm pmetric.ResourceMetrics
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))

if existingRM, ok := prw.rmCache[hashedLabels]; ok {
if existingRM, ok := prw.rmCache.get(hashedLabels); ok {
rm = existingRM
} else {
rm = otelMetrics.ResourceMetrics().AppendEmpty()
Expand All @@ -252,20 +320,21 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
attrs.PutStr(l.Name, l.Value)
}
}
prw.rmCache[hashedLabels] = rm
prw.rmCache.set(hashedLabels, rm)
continue
}

// For metrics other than target_info, we need to follow the standard process of creating a metric.
var rm pmetric.ResourceMetrics
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
existingRM, ok := prw.rmCache[hashedLabels]
existingRM, ok := prw.rmCache.get(hashedLabels)

if ok {
rm = existingRM
} else {
rm = otelMetrics.ResourceMetrics().AppendEmpty()
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance"))
prw.rmCache[hashedLabels] = rm
prw.rmCache.set(hashedLabels, rm)
}

scopeName, scopeVersion := prw.extractScopeInfo(ls)
Expand Down
63 changes: 60 additions & 3 deletions receiver/prometheusremotewritereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver {
prwReceiver, err := factory.CreateMetrics(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, prwReceiver, "metrics receiver creation failed")

return prwReceiver.(*prometheusRemoteWriteReceiver)
writeReceiver := prwReceiver.(*prometheusRemoteWriteReceiver)
defer writeReceiver.rmCache.Stop()
return writeReceiver
}

func TestHandlePRWContentTypeNegotiation(t *testing.T) {
Expand Down Expand Up @@ -474,7 +475,7 @@ func TestTranslateV2(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
// since we are using the rmCache to store values across requests, we need to clear it after each test, otherwise it will affect the next test
prwReceiver.rmCache = make(map[uint64]pmetric.ResourceMetrics)
prwReceiver.rmCache.data = make(map[uint64]pmetric.ResourceMetrics)
metrics, stats, err := prwReceiver.translateV2(ctx, tc.request)
if tc.expectError != "" {
assert.ErrorContains(t, err, tc.expectError)
Expand Down Expand Up @@ -654,3 +655,59 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) {
})
}
}

func TestCacheCleanup(t *testing.T) {
cleanupInterval := 300 * time.Millisecond
cache := newCache(cleanupInterval)

defer cache.Stop()

rm1 := pmetric.NewResourceMetrics()
rm2 := pmetric.NewResourceMetrics()
cache.set(1, rm1)
cache.set(2, rm2)

got1, exists1 := cache.get(1)
assert.True(t, exists1)
assert.Equal(t, rm1, got1)

got2, exists2 := cache.get(2)
assert.True(t, exists2)
assert.Equal(t, rm2, got2)

// Wait the set operations and the cleaup interval.
// After this, the cache must be empty and the items must be removed.
<-time.After(cleanupInterval + 100*time.Millisecond)

_, exists1 = cache.get(1)
assert.False(t, exists1)

_, exists2 = cache.get(2)
assert.False(t, exists2)
}

func TestCacheConcurrentAccess(t *testing.T) {
cache := newCache(100 * time.Second)
defer cache.Stop()

done := make(chan struct{}, 2)
go func() {
for i := 0; i < 100; i++ {
cache.set(uint64(i), pmetric.NewResourceMetrics())
}
done <- struct{}{}
}()

go func() {
for i := 0; i < 100; i++ {
cache.get(uint64(i))
}
done <- struct{}{}
}()

// Wait for both goroutines to complete
<-done
<-done

assert.Len(t, cache.data, 100)
}
Loading