Skip to content

Commit b5fb587

Browse files
fix: close geoip providers database readers (#39379)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Adds a new `Close` method to the geoIP providers interface, so resources can be freed during processor's shutdown. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #38961 <!--Describe what testing was performed and which tests were added.--> #### Testing Flaky windows tests uncommented. <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.--> --------- Co-authored-by: Andrzej Stencel <[email protected]>
1 parent d4416a4 commit b5fb587

File tree

8 files changed

+97
-8
lines changed

8 files changed

+97
-8
lines changed

.chloggen/fix_geoip_close.yaml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: geoipprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Close providers readers on shutdown
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38961]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

processor/geoipprocessor/factory.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ func createMetricsProcessor(ctx context.Context, set processor.Settings, cfg com
9292
if err != nil {
9393
return nil, err
9494
}
95-
return processorhelper.NewMetrics(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, providers, set).processMetrics, processorhelper.WithCapabilities(processorCapabilities))
95+
geoProcessor := newGeoIPProcessor(geoCfg, providers, set)
96+
return processorhelper.NewMetrics(ctx, set, cfg, nextConsumer, geoProcessor.processMetrics, processorhelper.WithShutdown(geoProcessor.shutdown), processorhelper.WithCapabilities(processorCapabilities))
9697
}
9798

9899
func createTracesProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
@@ -101,7 +102,8 @@ func createTracesProcessor(ctx context.Context, set processor.Settings, cfg comp
101102
if err != nil {
102103
return nil, err
103104
}
104-
return processorhelper.NewTraces(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, providers, set).processTraces, processorhelper.WithCapabilities(processorCapabilities))
105+
geoProcessor := newGeoIPProcessor(geoCfg, providers, set)
106+
return processorhelper.NewTraces(ctx, set, cfg, nextConsumer, geoProcessor.processTraces, processorhelper.WithShutdown(geoProcessor.shutdown), processorhelper.WithCapabilities(processorCapabilities))
105107
}
106108

107109
func createLogsProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Logs) (processor.Logs, error) {
@@ -110,5 +112,6 @@ func createLogsProcessor(ctx context.Context, set processor.Settings, cfg compon
110112
if err != nil {
111113
return nil, err
112114
}
113-
return processorhelper.NewLogs(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, providers, set).processLogs, processorhelper.WithCapabilities(processorCapabilities))
115+
geoProcessor := newGeoIPProcessor(geoCfg, providers, set)
116+
return processorhelper.NewLogs(ctx, set, cfg, nextConsumer, geoProcessor.processLogs, processorhelper.WithShutdown(geoProcessor.shutdown), processorhelper.WithCapabilities(processorCapabilities))
114117
}

processor/geoipprocessor/geoip_processor.go

+13
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.opentelemetry.io/collector/pdata/pcommon"
1313
"go.opentelemetry.io/collector/processor"
1414
"go.opentelemetry.io/otel/attribute"
15+
"go.uber.org/multierr"
1516
"go.uber.org/zap"
1617

1718
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider"
@@ -115,3 +116,15 @@ func (g *geoIPProcessor) processAttributes(ctx context.Context, metadata pcommon
115116

116117
return nil
117118
}
119+
120+
func (g *geoIPProcessor) shutdown(ctx context.Context) error {
121+
var errs error
122+
for _, geoProvider := range g.providers {
123+
err := geoProvider.Close(ctx)
124+
if err != nil {
125+
errs = multierr.Append(errs, err)
126+
}
127+
}
128+
129+
return errs
130+
}

processor/geoipprocessor/geoip_processor_test.go

+36-1
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ package geoipprocessor
55

66
import (
77
"context"
8+
"errors"
89
"net"
910
"path/filepath"
1011
"testing"
1112

13+
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
1315
"go.opentelemetry.io/collector/component"
1416
"go.opentelemetry.io/collector/consumer/consumertest"
@@ -36,10 +38,10 @@ type providerFactoryMock struct {
3638

3739
type providerMock struct {
3840
LocationF func(context.Context, net.IP) (attribute.Set, error)
41+
CloseF func(context.Context) error
3942
}
4043

4144
var (
42-
_ provider.GeoIPProvider = (*providerMock)(nil)
4345
_ provider.GeoIPProvider = (*providerMock)(nil)
4446
_ provider.GeoIPProviderFactory = (*providerFactoryMock)(nil)
4547
)
@@ -60,10 +62,17 @@ func (pm *providerMock) Location(ctx context.Context, ip net.IP) (attribute.Set,
6062
return pm.LocationF(ctx, ip)
6163
}
6264

65+
func (pm *providerMock) Close(ctx context.Context) error {
66+
return pm.CloseF(ctx)
67+
}
68+
6369
var baseMockProvider = providerMock{
6470
LocationF: func(context.Context, net.IP) (attribute.Set, error) {
6571
return attribute.Set{}, nil
6672
},
73+
CloseF: func(context.Context) error {
74+
return nil
75+
},
6776
}
6877

6978
var baseMockFactory = providerFactoryMock{
@@ -79,6 +88,9 @@ var baseProviderMock = providerMock{
7988
LocationF: func(context.Context, net.IP) (attribute.Set, error) {
8089
return attribute.Set{}, nil
8190
},
91+
CloseF: func(context.Context) error {
92+
return nil
93+
},
8294
}
8395

8496
var testCases = []struct {
@@ -148,6 +160,7 @@ func compareAllSignals(cfg component.Config, goldenDir string) func(t *testing.T
148160

149161
err = metricsProcessor.ConsumeMetrics(context.Background(), inputMetrics)
150162
require.NoError(t, err)
163+
require.NoError(t, metricsProcessor.Shutdown(context.Background()))
151164

152165
actualMetrics := nextMetrics.AllMetrics()
153166
require.Len(t, actualMetrics, 1)
@@ -167,6 +180,7 @@ func compareAllSignals(cfg component.Config, goldenDir string) func(t *testing.T
167180

168181
err = tracesProcessor.ConsumeTraces(context.Background(), inputTraces)
169182
require.NoError(t, err)
183+
require.NoError(t, tracesProcessor.Shutdown(context.Background()))
170184

171185
actualTraces := nextTraces.AllTraces()
172186
require.Len(t, actualTraces, 1)
@@ -191,6 +205,7 @@ func compareAllSignals(cfg component.Config, goldenDir string) func(t *testing.T
191205
require.Len(t, actualLogs, 1)
192206
// golden.WriteLogs(t, filepath.Join(dir, "output-logs.yaml"), actualLogs[0])
193207
require.NoError(t, plogtest.CompareLogs(expectedLogs, actualLogs[0]))
208+
require.NoError(t, logsProcessor.Shutdown(context.Background()))
194209
}
195210
}
196211

@@ -233,3 +248,23 @@ func TestProcessor(t *testing.T) {
233248
})
234249
}
235250
}
251+
252+
func TestProcessorShutdownError(t *testing.T) {
253+
// processor with two mocked providers that return error on close
254+
processor := geoIPProcessor{
255+
providers: []provider.GeoIPProvider{
256+
&providerMock{
257+
CloseF: func(context.Context) error {
258+
return errors.New("test error 1")
259+
},
260+
},
261+
&providerMock{
262+
CloseF: func(context.Context) error {
263+
return errors.New("test error 2")
264+
},
265+
},
266+
},
267+
}
268+
269+
assert.EqualError(t, processor.shutdown(context.Background()), "test error 1; test error 2")
270+
}

processor/geoipprocessor/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
go.opentelemetry.io/collector/processor/processortest v0.124.0
2222
go.opentelemetry.io/otel v1.35.0
2323
go.uber.org/goleak v1.3.0
24+
go.uber.org/multierr v1.11.0
2425
go.uber.org/zap v1.27.0
2526
)
2627

@@ -120,7 +121,6 @@ require (
120121
go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect
121122
go.opentelemetry.io/otel/trace v1.35.0 // indirect
122123
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
123-
go.uber.org/multierr v1.11.0 // indirect
124124
go4.org/netipx v0.0.0-20230824141953-6213f710f925 // indirect
125125
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
126126
golang.org/x/net v0.39.0 // indirect

processor/geoipprocessor/internal/provider/geoipprovider.go

+3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ type Config interface {
2525
type GeoIPProvider interface {
2626
// Location returns a set of attributes representing the geographical location for the given IP address. It requires a context for managing request lifetime.
2727
Location(context.Context, net.IP) (attribute.Set, error)
28+
// Close releases any resources held by the provider. It should be called when the
29+
// provider is no longer needed to ensure proper cleanup.
30+
Close(context.Context) error
2831
}
2932

3033
// GeoIPProviderFactory can create GeoIPProvider instances.

processor/geoipprocessor/internal/provider/maxmindprovider/provider.go

+9
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ func (g *maxMindProvider) Location(_ context.Context, ipAddress net.IP) (attribu
5858
}
5959
}
6060

61+
// Close unmaps the geo database file from virtual memory and returns the
62+
// resources to the system.
63+
func (g *maxMindProvider) Close(context.Context) error {
64+
if g.geoReader != nil {
65+
return g.geoReader.Close()
66+
}
67+
return nil
68+
}
69+
6170
// cityAttributes returns a list of key-values containing geographical metadata associated to the provided IP. The key names are populated using the internal geo IP conventions package. If an invalid or nil IP is provided, an error is returned.
6271
func (g *maxMindProvider) cityAttributes(ipAddress net.IP) (*[]attribute.KeyValue, error) {
6372
attributes := make([]attribute.KeyValue, 0, 11)

processor/geoipprocessor/internal/provider/maxmindprovider/provider_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ func TestInvalidNewProvider(t *testing.T) {
3232

3333
// TestProviderLocation asserts that the MaxMind provider adds the geo location data given an IP.
3434
func TestProviderLocation(t *testing.T) {
35-
if runtime.GOOS == "windows" {
36-
t.Skip("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/38961")
37-
}
3835
tmpDBfiles := testdata.GenerateLocalDB(t, "./testdata")
3936
defer os.RemoveAll(tmpDBfiles)
4037

@@ -108,10 +105,12 @@ func TestProviderLocation(t *testing.T) {
108105
actualAttributes, err := provider.Location(context.Background(), tt.sourceIP)
109106
if tt.expectedErrMsg != "" {
110107
assert.EqualError(t, err, tt.expectedErrMsg)
108+
assert.NoError(t, provider.Close(context.Background()))
111109
return
112110
}
113111

114112
assert.True(t, tt.expectedAttributes.Equals(&actualAttributes))
113+
assert.NoError(t, provider.Close(context.Background()))
115114
})
116115
}
117116
}

0 commit comments

Comments
 (0)