Skip to content

Commit 67bdc54

Browse files
authored
receiver/kafkametricsreceiver: use common config and clients (#38634)
#### Description Remove the duplicated client config from the Config struct, and embed configkafka.ClientConfig. Remove the duplicated (cluster admin) client creation code, and use kafka.NewSaramaClient. This means that client_id now defaults to "otel-collector" instead of "otel-metrics-receiver", which is a breaking change. We also add "metadata.refresh_interval" to ClientConfig, defaulting to 10m, and deprecate "refresh_frequency" from the receiver config. If metadata.refresh_interval is not explicitly set, but refresh_frequency is, then the latter will be used for now. I discovered some of the existing tests are not working correctly while working on this PR, but fixing that will require a much more extensive refactoring effort. I will follow up with another PR once this one is merged. #### Link to tracking issue Part of #38411 #### Testing Updated unit tests. #### Documentation Updated README.
1 parent da097ef commit 67bdc54

22 files changed

+174
-204
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkametricsreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: change default client_id to "otel-collector", deprecate "refresh_frequency"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38411]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The receiver now uses the "configkafka" package which consolidates
20+
common configuration structures and default values. As a result of
21+
this change, we update the default client_id value to "otel-collector",
22+
and deprecate "refresh_frequency" in favour of "metadata.refresh_interval".
23+
24+
# If your change doesn't affect end users or the exported elements of any package,
25+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
26+
# Optional: The change log or logs in which this entry should be included.
27+
# e.g. '[user]' or '[user, api]'
28+
# Include 'user' if the change is relevant to end users.
29+
# Include 'api' if there is a change to a library API.
30+
# Default: '[user]'
31+
change_logs: [user]

internal/kafka/client.go

+10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ var saramaCompressionCodecs = map[string]sarama.CompressionCodec{
2020
"zstd": sarama.CompressionZSTD,
2121
}
2222

23+
// NewSaramaClient returns a new Kafka client with the given configuration.
24+
func NewSaramaClient(ctx context.Context, config configkafka.ClientConfig) (sarama.Client, error) {
25+
saramaConfig, err := NewSaramaClientConfig(ctx, config)
26+
if err != nil {
27+
return nil, err
28+
}
29+
return sarama.NewClient(config.Brokers, saramaConfig)
30+
}
31+
2332
// NewSaramaClusterAdminClient returns a new Kafka cluster admin client with the given configuration.
2433
func NewSaramaClusterAdminClient(ctx context.Context, config configkafka.ClientConfig) (sarama.ClusterAdmin, error) {
2534
saramaConfig, err := NewSaramaClientConfig(ctx, config)
@@ -60,6 +69,7 @@ func NewSaramaSyncProducer(
6069
func NewSaramaClientConfig(ctx context.Context, config configkafka.ClientConfig) (*sarama.Config, error) {
6170
saramaConfig := sarama.NewConfig()
6271
saramaConfig.Metadata.Full = config.Metadata.Full
72+
saramaConfig.Metadata.RefreshFrequency = config.Metadata.RefreshInterval
6373
saramaConfig.Metadata.Retry.Max = config.Metadata.Retry.Max
6474
saramaConfig.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
6575
if config.ResolveCanonicalBootstrapServersOnly {

internal/kafka/configkafka/config.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,10 @@ type MetadataConfig struct {
227227
// memory if you have many topics and partitions. Defaults to true.
228228
Full bool `mapstructure:"full"`
229229

230+
// RefreshInterval controls the frequency at which cluster metadata is
231+
// refreshed. Defaults to 10 minutes.
232+
RefreshInterval time.Duration `mapstructure:"refresh_interval"`
233+
230234
// Retry configuration for metadata.
231235
// This configuration is useful to avoid race conditions when broker
232236
// is starting at the same time as collector.
@@ -245,7 +249,8 @@ type MetadataRetryConfig struct {
245249

246250
func NewDefaultMetadataConfig() MetadataConfig {
247251
return MetadataConfig{
248-
Full: true,
252+
Full: true,
253+
RefreshInterval: 10 * time.Minute,
249254
Retry: MetadataRetryConfig{
250255
Max: 3,
251256
Backoff: time.Millisecond * 250,

internal/kafka/configkafka/config_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ func TestClientConfig(t *testing.T) {
3939
},
4040
},
4141
Metadata: MetadataConfig{
42-
Full: false,
42+
Full: false,
43+
RefreshInterval: 10 * time.Minute,
4344
Retry: MetadataRetryConfig{
4445
Max: 10,
4546
Backoff: 5 * time.Second,

receiver/kafkametricsreceiver/README.md

+13-5
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ This receiver supports Kafka versions:
2525

2626
Required settings (no defaults):
2727

28-
- `protocol_version`: Kafka protocol version
2928
- `scrapers`: any combination of the following scrapers can be enabled.
3029
- `topics`
3130
- `consumers`
@@ -36,11 +35,12 @@ Metrics collected by the associated scraper are listed in [metadata.yaml](metada
3635
Optional Settings (with defaults):
3736

3837
- `cluster_alias`: Alias name of the cluster. Adds `kafka.cluster.alias` resource attribute.
38+
- `protocol_version` (default = 2.1.0): Kafka protocol version
3939
- `brokers` (default = localhost:9092): the list of brokers to read from.
4040
- `resolve_canonical_bootstrap_servers_only` (default = false): whether to resolve then reverse-lookup broker IPs during startup.
4141
- `topic_match` (default = ^[^_].*$): regex pattern of topics to filter on metrics collection. The default filter excludes internal topics (starting with `_`).
4242
- `group_match` (default = .*): regex pattern of consumer groups to filter on for metrics.
43-
- `client_id` (default = otel-metrics-receiver): consumer client id
43+
- `client_id` (default = otel-collector): consumer client id
4444
- `collection_interval` (default = 1m): frequency of metric collection/scraping.
4545
- `initial_delay` (default = `1s`): defines how long this receiver waits before starting.
4646
- `auth` (default none)
@@ -68,6 +68,11 @@ Optional Settings (with defaults):
6868
- `config_file`: Path to Kerberos configuration. i.e /etc/krb5.conf
6969
- `keytab_file`: Path to keytab file. i.e /etc/security/kafka.keytab
7070
- `disable_fast_negotiation`: Disable PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation. This is set to `false` by default.
71+
- `metadata`
72+
- `full` (default = true): Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.
73+
- `retry`
74+
- `max` (default = 3): The number of retries to get metadata
75+
- `backoff` (default = 250ms): How long to wait between metadata retries
7176

7277
## Examples:
7378

@@ -76,7 +81,6 @@ Optional Settings (with defaults):
7681
```yaml
7782
receivers:
7883
kafkametrics:
79-
protocol_version: 2.0.0
8084
scrapers:
8185
- brokers
8286
- topics
@@ -86,14 +90,18 @@ receivers:
8690
2) Configuration with more optional settings:
8791
8892
For this example:
93+
- A non-default broker is specified
94+
- cluster alias is set to "kafka-prod"
8995
- collection interval is 5 secs.
96+
- Kafka protocol version is 3.0.0
97+
- mTLS is configured
9098
9199
```yaml
92100
receivers:
93101
kafkametrics:
94102
cluster_alias: kafka-prod
95-
brokers: 10.10.10.10:9092
96-
protocol_version: 2.0.0
103+
brokers: ["10.10.10.10:9092"]
104+
protocol_version: 3.0.0
97105
scrapers:
98106
- brokers
99107
- topics

receiver/kafkametricsreceiver/broker_scraper.go

+5-9
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ type brokerScraper struct {
2525
client sarama.Client
2626
settings receiver.Settings
2727
config Config
28-
saramaConfig *sarama.Config
2928
clusterAdmin sarama.ClusterAdmin
3029
mb *metadata.MetricsBuilder
3130
}
@@ -50,7 +49,7 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
5049
scrapeErrors := scrapererror.ScrapeErrors{}
5150

5251
if s.client == nil {
53-
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
52+
client, err := newSaramaClient(context.Background(), s.config.ClientConfig)
5453
if err != nil {
5554
return pmetric.Metrics{}, fmt.Errorf("failed to create client in brokers scraper: %w", err)
5655
}
@@ -68,7 +67,7 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
6867
}
6968

7069
if s.clusterAdmin == nil {
71-
admin, err := newClusterAdmin(s.config.Brokers, s.saramaConfig)
70+
admin, err := newClusterAdmin(s.client)
7271
if err != nil {
7372
s.settings.Logger.Error("Error creating kafka client with admin privileges", zap.Error(err))
7473
return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrors.Combine()
@@ -102,13 +101,10 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
102101
return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrors.Combine()
103102
}
104103

105-
func createBrokerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config,
106-
settings receiver.Settings,
107-
) (scraper.Metrics, error) {
104+
func createBrokerScraper(_ context.Context, cfg Config, settings receiver.Settings) (scraper.Metrics, error) {
108105
s := brokerScraper{
109-
settings: settings,
110-
config: cfg,
111-
saramaConfig: saramaConfig,
106+
settings: settings,
107+
config: cfg,
112108
}
113109
return scraper.NewMetrics(
114110
s.scrape,

receiver/kafkametricsreceiver/broker_scraper_test.go

+8-12
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/component/componenttest"
1515
"go.opentelemetry.io/collector/receiver/receivertest"
1616

17+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
1718
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
1819
)
1920

@@ -48,40 +49,36 @@ func TestBrokerShutdown_closed(t *testing.T) {
4849
}
4950

5051
func TestBrokerScraper_createBrokerScraper(t *testing.T) {
51-
sc := sarama.NewConfig()
5252
newSaramaClient = mockNewSaramaClient
53-
bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type))
53+
bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type))
5454
assert.NoError(t, err)
5555
assert.NotNil(t, bs)
5656
}
5757

5858
func TestBrokerScraperStart(t *testing.T) {
5959
newSaramaClient = mockNewSaramaClient
60-
sc := sarama.NewConfig()
61-
bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type))
60+
bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type))
6261
assert.NoError(t, err)
6362
assert.NotNil(t, bs)
6463
assert.NoError(t, bs.Start(context.Background(), nil))
6564
}
6665

6766
func TestBrokerScraper_scrape_handles_client_error(t *testing.T) {
68-
newSaramaClient = func([]string, *sarama.Config) (sarama.Client, error) {
67+
newSaramaClient = func(context.Context, configkafka.ClientConfig) (sarama.Client, error) {
6968
return nil, errors.New("new client failed")
7069
}
71-
sc := sarama.NewConfig()
72-
bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type))
70+
bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type))
7371
assert.NoError(t, err)
7472
assert.NotNil(t, bs)
7573
_, err = bs.ScrapeMetrics(context.Background())
7674
assert.Error(t, err)
7775
}
7876

7977
func TestBrokerScraper_shutdown_handles_nil_client(t *testing.T) {
80-
newSaramaClient = func([]string, *sarama.Config) (sarama.Client, error) {
78+
newSaramaClient = func(context.Context, configkafka.ClientConfig) (sarama.Client, error) {
8179
return nil, errors.New("new client failed")
8280
}
83-
sc := sarama.NewConfig()
84-
bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type))
81+
bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type))
8582
assert.NoError(t, err)
8683
assert.NotNil(t, bs)
8784
err = bs.Shutdown(context.Background())
@@ -141,9 +138,8 @@ func TestBrokerScraper_scrape(t *testing.T) {
141138
}
142139

143140
func TestBrokersScraper_createBrokerScraper(t *testing.T) {
144-
sc := sarama.NewConfig()
145141
newSaramaClient = mockNewSaramaClient
146-
bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type))
142+
bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type))
147143
assert.NoError(t, err)
148144
assert.NotNil(t, bs)
149145
}

receiver/kafkametricsreceiver/config.go

+28-18
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-
66
import (
77
"time"
88

9+
"go.opentelemetry.io/collector/confmap"
910
"go.opentelemetry.io/collector/scraper/scraperhelper"
1011

1112
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
@@ -15,42 +16,51 @@ import (
1516
// Config represents user settings for kafkametrics receiver
1617
type Config struct {
1718
scraperhelper.ControllerConfig `mapstructure:",squash"`
19+
configkafka.ClientConfig `mapstructure:",squash"`
1820

1921
// Alias name of the kafka cluster
2022
ClusterAlias string `mapstructure:"cluster_alias"`
2123

22-
// The list of kafka brokers (default localhost:9092)
23-
Brokers []string `mapstructure:"brokers"`
24-
25-
// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
26-
// each of the provided brokers. It will then do a PTR lookup for each
27-
// returned IP, and that set of names becomes the broker list. This can be
28-
// required in SASL environments.
29-
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
30-
31-
// ProtocolVersion Kafka protocol version
32-
ProtocolVersion string `mapstructure:"protocol_version"`
33-
3424
// TopicMatch topics to collect metrics on
3525
TopicMatch string `mapstructure:"topic_match"`
3626

3727
// GroupMatch consumer groups to collect on
3828
GroupMatch string `mapstructure:"group_match"`
3929

40-
// Authentication data
41-
Authentication configkafka.AuthenticationConfig `mapstructure:"auth"`
42-
4330
// Cluster metadata refresh frequency
4431
// Configures the refresh frequency to update cached cluster metadata
4532
// Defaults to 10 minutes from Sarama library
33+
//
34+
// If Metadata.RefreshInterval is set, this will be ignored.
35+
//
36+
// Deprecated [v0.122.0]: use Metadata.RefreshInterval instead.
4637
RefreshFrequency time.Duration `mapstructure:"refresh_frequency"`
4738

4839
// Scrapers defines which metric data points to be captured from kafka
4940
Scrapers []string `mapstructure:"scrapers"`
5041

51-
// ClientID is the id associated with the consumer that reads from topics in kafka.
52-
ClientID string `mapstructure:"client_id"`
53-
5442
// MetricsBuilderConfig allows customizing scraped metrics/attributes representation.
5543
metadata.MetricsBuilderConfig `mapstructure:",squash"`
5644
}
45+
46+
func (c *Config) Unmarshal(conf *confmap.Conf) error {
47+
if refreshFrequency := conf.Get("refresh_frequency"); refreshFrequency != nil {
48+
metadataConf, err := conf.Sub("metadata")
49+
if err != nil {
50+
return err
51+
}
52+
if !metadataConf.IsSet("refresh_interval") {
53+
// User has not explicitly set metadata.refresh_interval,
54+
// but they have set the (deprecated) refresh_frequency,
55+
// so use that.
56+
if err := conf.Merge(confmap.NewFromStringMap(map[string]any{
57+
"metadata": map[string]any{
58+
"refresh_interval": refreshFrequency,
59+
},
60+
})); err != nil {
61+
return err
62+
}
63+
}
64+
}
65+
return conf.Unmarshal(c)
66+
}

receiver/kafkametricsreceiver/config_test.go

+12-17
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ package kafkametricsreceiver
66
import (
77
"path/filepath"
88
"testing"
9+
"time"
910

1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
1213
"go.opentelemetry.io/collector/component"
13-
"go.opentelemetry.io/collector/config/configtls"
1414
"go.opentelemetry.io/collector/confmap/confmaptest"
1515
"go.opentelemetry.io/collector/scraper/scraperhelper"
1616

@@ -28,24 +28,19 @@ func TestLoadConfig(t *testing.T) {
2828
require.NoError(t, err)
2929
require.NoError(t, sub.Unmarshal(cfg))
3030

31+
expectedClientConfig := configkafka.NewDefaultClientConfig()
32+
expectedClientConfig.Brokers = []string{"10.10.10.10:9092"}
33+
expectedClientConfig.Metadata.Full = false
34+
expectedClientConfig.Metadata.RefreshInterval = time.Nanosecond // set by refresh_frequency
35+
3136
assert.Equal(t, &Config{
3237
ControllerConfig: scraperhelper.NewDefaultControllerConfig(),
33-
ClusterAlias: "kafka-test",
34-
Brokers: []string{"10.10.10.10:9092"},
35-
ProtocolVersion: "2.0.0",
36-
TopicMatch: "test_\\w+",
37-
GroupMatch: "test_\\w+",
38-
Authentication: configkafka.AuthenticationConfig{
39-
TLS: &configtls.ClientConfig{
40-
Config: configtls.Config{
41-
CAFile: "ca.pem",
42-
CertFile: "cert.pem",
43-
KeyFile: "key.pem",
44-
},
45-
},
46-
},
47-
RefreshFrequency: 1,
48-
ClientID: defaultClientID,
38+
ClientConfig: expectedClientConfig,
39+
40+
ClusterAlias: "kafka-test",
41+
TopicMatch: "test_\\w+",
42+
GroupMatch: "test_\\w+",
43+
RefreshFrequency: time.Nanosecond,
4944
Scrapers: []string{"brokers", "topics", "consumers"},
5045
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
5146
}, cfg)

0 commit comments

Comments
 (0)