Skip to content

Commit 291f0e9

Browse files
[receiver/kafka_metrics_receiver] Add refresh_frequency to enable custom refreshing for cluster metadata (#37897)
Fixes: #37896 There are other ways to solve this issue, but that would require changes in Sarama library. This PR proposes a solution to exposes `refresh_frequency` from Sarama library and let the user configure it. Today the default setting from Sarama is being used which defaults to 10 mins. More info and similar issues are mentioned in the tracking issue. Added unit tests, and tested manually with a 3 brokers kafka-kraft cluster. --------- Signed-off-by: Shivanshu Raj Shrivastava <[email protected]> Co-authored-by: Dmitrii Anoshin <[email protected]>
1 parent 7692c25 commit 291f0e9

File tree

6 files changed

+51
-0
lines changed

6 files changed

+51
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receiver/kafkametricsreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add `refresh_frequency` config to `kafkametricsreceiver`, to configure custom duration for cluster metadata refresh
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37896]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
- Helps fine tuning the refresh_frequency, and enables custom cluster metadata refresh intervals
20+
- Default refresh_frequency is set 10 minutes from Sarama library defaults
21+
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [user]

receiver/kafkametricsreceiver/config.go

+7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver"
55

66
import (
7+
"time"
8+
79
"go.opentelemetry.io/collector/scraper/scraperhelper"
810

911
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
@@ -38,6 +40,11 @@ type Config struct {
3840
// Authentication data
3941
Authentication kafka.Authentication `mapstructure:"auth"`
4042

43+
// Cluster metadata refresh frequency
44+
// Configures the refresh frequency to update cached cluster metadata
45+
// Defaults to 10 minutes from Sarama library
46+
RefreshFrequency time.Duration `mapstructure:"refresh_frequency"`
47+
4148
// Scrapers defines which metric data points to be captured from kafka
4249
Scrapers []string `mapstructure:"scrapers"`
4350

receiver/kafkametricsreceiver/config_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func TestLoadConfig(t *testing.T) {
4444
},
4545
},
4646
},
47+
RefreshFrequency: 1,
4748
ClientID: defaultClientID,
4849
Scrapers: []string{"brokers", "topics", "consumers"},
4950
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),

receiver/kafkametricsreceiver/receiver.go

+5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ var newMetricsReceiver = func(
4949
}
5050
sc.Version = version
5151
}
52+
53+
if config.RefreshFrequency != 0 {
54+
sc.Metadata.RefreshFrequency = config.RefreshFrequency
55+
}
56+
5257
if err := kafka.ConfigureAuthentication(ctx, config.Authentication, sc); err != nil {
5358
return nil, err
5459
}

receiver/kafkametricsreceiver/receiver_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ func TestNewReceiver_invalid_auth_error(t *testing.T) {
5959
assert.Nil(t, r)
6060
}
6161

62+
func TestNewReceiver_refresh_frequency(t *testing.T) {
63+
c := createDefaultConfig().(*Config)
64+
c.RefreshFrequency = 1
65+
r, err := newMetricsReceiver(context.Background(), *c, receivertest.NewNopSettings(), nil)
66+
assert.NoError(t, err)
67+
assert.NotNil(t, r)
68+
}
69+
6270
func TestNewReceiver(t *testing.T) {
6371
c := createDefaultConfig().(*Config)
6472
c.Scrapers = []string{"brokers"}

receiver/kafkametricsreceiver/testdata/config.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ kafkametrics:
1111
ca_file: ca.pem
1212
cert_file: cert.pem
1313
key_file: key.pem
14+
refresh_frequency: 1
1415
topic_match: test_\w+
1516
group_match: test_\w+

0 commit comments

Comments
 (0)