From f9659c9da7e6fcf37c014a24261caf060698a59a Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Fri, 14 Mar 2025 11:11:02 +0800 Subject: [PATCH 1/2] kafkametricsreceiver: use configkafka.ClientConfig Use configkafka.ClientConfig, and align with its defaults. This means that client_id now defaults to "otel-collector" instead of "otel-metrics-receiver", which is a breaking change. We also add "metadata.refresh_interval" to ClientConfig, defaulting to 10m, and deprecate "refresh_frequency" from the receiver config. If metadata.refresh_interval is not explicitly set, but refresh_frequency is, then the latter will be used for now. --- ...fkametricsreceiver-embed-clientconfig.yaml | 31 +++++++++++++ internal/kafka/client.go | 10 ++++ internal/kafka/configkafka/config.go | 7 ++- internal/kafka/configkafka/config_test.go | 3 +- receiver/kafkametricsreceiver/README.md | 18 ++++++-- .../kafkametricsreceiver/broker_scraper.go | 14 ++---- .../broker_scraper_test.go | 20 ++++---- receiver/kafkametricsreceiver/config.go | 46 +++++++++++-------- receiver/kafkametricsreceiver/config_test.go | 29 +++++------- .../kafkametricsreceiver/consumer_scraper.go | 18 +++----- .../consumer_scraper_test.go | 30 +++++------- receiver/kafkametricsreceiver/factory.go | 6 +-- receiver/kafkametricsreceiver/factory_test.go | 23 +--------- receiver/kafkametricsreceiver/go.mod | 2 +- .../{scraper_test_helper.go => mocks_test.go} | 12 ++--- receiver/kafkametricsreceiver/receiver.go | 27 ++--------- .../kafkametricsreceiver/receiver_test.go | 15 ++---- .../kafkametricsreceiver/testdata/config.yaml | 11 ++--- .../testdata/integration/expected.yaml | 16 ------- .../kafkametricsreceiver/topic_scraper.go | 14 +++--- .../topic_scraper_test.go | 20 ++++---- 21 files changed, 170 insertions(+), 202 deletions(-) create mode 100644 .chloggen/kafkametricsreceiver-embed-clientconfig.yaml rename receiver/kafkametricsreceiver/{scraper_test_helper.go => mocks_test.go} (95%) delete mode 100644 receiver/kafkametricsreceiver/testdata/integration/expected.yaml diff --git a/.chloggen/kafkametricsreceiver-embed-clientconfig.yaml b/.chloggen/kafkametricsreceiver-embed-clientconfig.yaml new file mode 100644 index 0000000000000..6d9bbed6f7c5c --- /dev/null +++ b/.chloggen/kafkametricsreceiver-embed-clientconfig.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkametricsreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: change default client_id to "otel-collector", deprecate "refresh_frequency" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [38411] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The receiver now uses the "configkafka" package which consolidates + common configuration structures and default values. As a result of + this change, we update the default client_id value to "otel-collector", + and deprecate "refresh_frequency" in favour of "metadata.refresh_interval". + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/kafka/client.go b/internal/kafka/client.go index a8d7e90152788..00d6c714e3065 100644 --- a/internal/kafka/client.go +++ b/internal/kafka/client.go @@ -11,6 +11,15 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" ) +// NewSaramaClient returns a new Kafka client with the given configuration. +func NewSaramaClient(ctx context.Context, config configkafka.ClientConfig) (sarama.Client, error) { + saramaConfig, err := NewSaramaClientConfig(ctx, config) + if err != nil { + return nil, err + } + return sarama.NewClient(config.Brokers, saramaConfig) +} + // NewSaramaClusterAdminClient returns a new Kafka cluster admin client with the given configuration. func NewSaramaClusterAdminClient(ctx context.Context, config configkafka.ClientConfig) (sarama.ClusterAdmin, error) { saramaConfig, err := NewSaramaClientConfig(ctx, config) @@ -27,6 +36,7 @@ func NewSaramaClusterAdminClient(ctx context.Context, config configkafka.ClientC func NewSaramaClientConfig(ctx context.Context, config configkafka.ClientConfig) (*sarama.Config, error) { saramaConfig := sarama.NewConfig() saramaConfig.Metadata.Full = config.Metadata.Full + saramaConfig.Metadata.RefreshFrequency = config.Metadata.RefreshInterval saramaConfig.Metadata.Retry.Max = config.Metadata.Retry.Max saramaConfig.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff if config.ResolveCanonicalBootstrapServersOnly { diff --git a/internal/kafka/configkafka/config.go b/internal/kafka/configkafka/config.go index f1baa20814b68..afa2dd60e6bb5 100644 --- a/internal/kafka/configkafka/config.go +++ b/internal/kafka/configkafka/config.go @@ -227,6 +227,10 @@ type MetadataConfig struct { // memory if you have many topics and partitions. Defaults to true. Full bool `mapstructure:"full"` + // RefreshInterval controls the frequency at which cluster metadata is + // refreshed. Defaults to 10 minutes. + RefreshInterval time.Duration `mapstructure:"refresh_interval"` + // Retry configuration for metadata. // This configuration is useful to avoid race conditions when broker // is starting at the same time as collector. @@ -245,7 +249,8 @@ type MetadataRetryConfig struct { func NewDefaultMetadataConfig() MetadataConfig { return MetadataConfig{ - Full: true, + Full: true, + RefreshInterval: 10 * time.Minute, Retry: MetadataRetryConfig{ Max: 3, Backoff: time.Millisecond * 250, diff --git a/internal/kafka/configkafka/config_test.go b/internal/kafka/configkafka/config_test.go index 0785e1e928413..8a32fcdf4a21f 100644 --- a/internal/kafka/configkafka/config_test.go +++ b/internal/kafka/configkafka/config_test.go @@ -39,7 +39,8 @@ func TestClientConfig(t *testing.T) { }, }, Metadata: MetadataConfig{ - Full: false, + Full: false, + RefreshInterval: 10 * time.Minute, Retry: MetadataRetryConfig{ Max: 10, Backoff: 5 * time.Second, diff --git a/receiver/kafkametricsreceiver/README.md b/receiver/kafkametricsreceiver/README.md index c6b9341ca1d2f..6dc966dfda2e8 100644 --- a/receiver/kafkametricsreceiver/README.md +++ b/receiver/kafkametricsreceiver/README.md @@ -25,7 +25,6 @@ This receiver supports Kafka versions: Required settings (no defaults): -- `protocol_version`: Kafka protocol version - `scrapers`: any combination of the following scrapers can be enabled. - `topics` - `consumers` @@ -36,11 +35,12 @@ Metrics collected by the associated scraper are listed in [metadata.yaml](metada Optional Settings (with defaults): - `cluster_alias`: Alias name of the cluster. Adds `kafka.cluster.alias` resource attribute. +- `protocol_version` (default = 2.1.0): Kafka protocol version - `brokers` (default = localhost:9092): the list of brokers to read from. - `resolve_canonical_bootstrap_servers_only` (default = false): whether to resolve then reverse-lookup broker IPs during startup. - `topic_match` (default = ^[^_].*$): regex pattern of topics to filter on metrics collection. The default filter excludes internal topics (starting with `_`). - `group_match` (default = .*): regex pattern of consumer groups to filter on for metrics. -- `client_id` (default = otel-metrics-receiver): consumer client id +- `client_id` (default = otel-collector): consumer client id - `collection_interval` (default = 1m): frequency of metric collection/scraping. - `initial_delay` (default = `1s`): defines how long this receiver waits before starting. - `auth` (default none) @@ -68,6 +68,11 @@ Optional Settings (with defaults): - `config_file`: Path to Kerberos configuration. i.e /etc/krb5.conf - `keytab_file`: Path to keytab file. i.e /etc/security/kafka.keytab - `disable_fast_negotiation`: Disable PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation. This is set to `false` by default. +- `metadata` + - `full` (default = true): Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup. + - `retry` + - `max` (default = 3): The number of retries to get metadata + - `backoff` (default = 250ms): How long to wait between metadata retries ## Examples: @@ -76,7 +81,6 @@ Optional Settings (with defaults): ```yaml receivers: kafkametrics: - protocol_version: 2.0.0 scrapers: - brokers - topics @@ -86,14 +90,18 @@ receivers: 2) Configuration with more optional settings: For this example: +- A non-default broker is specified +- cluster alias is set to "kafka-prod" - collection interval is 5 secs. +- Kafka protocol version is 3.0.0 +- mTLS is configured ```yaml receivers: kafkametrics: cluster_alias: kafka-prod - brokers: 10.10.10.10:9092 - protocol_version: 2.0.0 + brokers: ["10.10.10.10:9092"] + protocol_version: 3.0.0 scrapers: - brokers - topics diff --git a/receiver/kafkametricsreceiver/broker_scraper.go b/receiver/kafkametricsreceiver/broker_scraper.go index 06717ad03ecc0..b0a5be6dc44cc 100644 --- a/receiver/kafkametricsreceiver/broker_scraper.go +++ b/receiver/kafkametricsreceiver/broker_scraper.go @@ -25,7 +25,6 @@ type brokerScraper struct { client sarama.Client settings receiver.Settings config Config - saramaConfig *sarama.Config clusterAdmin sarama.ClusterAdmin mb *metadata.MetricsBuilder } @@ -50,7 +49,7 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) { scrapeErrors := scrapererror.ScrapeErrors{} if s.client == nil { - client, err := newSaramaClient(s.config.Brokers, s.saramaConfig) + client, err := newSaramaClient(context.Background(), s.config.ClientConfig) if err != nil { return pmetric.Metrics{}, fmt.Errorf("failed to create client in brokers scraper: %w", err) } @@ -68,7 +67,7 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) { } if s.clusterAdmin == nil { - admin, err := newClusterAdmin(s.config.Brokers, s.saramaConfig) + admin, err := newClusterAdmin(s.client) if err != nil { s.settings.Logger.Error("Error creating kafka client with admin privileges", zap.Error(err)) return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrors.Combine() @@ -102,13 +101,10 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) { return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrors.Combine() } -func createBrokerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, - settings receiver.Settings, -) (scraper.Metrics, error) { +func createBrokerScraper(_ context.Context, cfg Config, settings receiver.Settings) (scraper.Metrics, error) { s := brokerScraper{ - settings: settings, - config: cfg, - saramaConfig: saramaConfig, + settings: settings, + config: cfg, } return scraper.NewMetrics( s.scrape, diff --git a/receiver/kafkametricsreceiver/broker_scraper_test.go b/receiver/kafkametricsreceiver/broker_scraper_test.go index 399c140b09007..fbcbe77a81995 100644 --- a/receiver/kafkametricsreceiver/broker_scraper_test.go +++ b/receiver/kafkametricsreceiver/broker_scraper_test.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/receiver/receivertest" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) @@ -48,28 +49,25 @@ func TestBrokerShutdown_closed(t *testing.T) { } func TestBrokerScraper_createBrokerScraper(t *testing.T) { - sc := sarama.NewConfig() newSaramaClient = mockNewSaramaClient - bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NoError(t, err) assert.NotNil(t, bs) } func TestBrokerScraperStart(t *testing.T) { newSaramaClient = mockNewSaramaClient - sc := sarama.NewConfig() - bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NoError(t, err) assert.NotNil(t, bs) assert.NoError(t, bs.Start(context.Background(), nil)) } func TestBrokerScraper_scrape_handles_client_error(t *testing.T) { - newSaramaClient = func([]string, *sarama.Config) (sarama.Client, error) { + newSaramaClient = func(context.Context, configkafka.ClientConfig) (sarama.Client, error) { return nil, errors.New("new client failed") } - sc := sarama.NewConfig() - bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NoError(t, err) assert.NotNil(t, bs) _, err = bs.ScrapeMetrics(context.Background()) @@ -77,11 +75,10 @@ func TestBrokerScraper_scrape_handles_client_error(t *testing.T) { } func TestBrokerScraper_shutdown_handles_nil_client(t *testing.T) { - newSaramaClient = func([]string, *sarama.Config) (sarama.Client, error) { + newSaramaClient = func(context.Context, configkafka.ClientConfig) (sarama.Client, error) { return nil, errors.New("new client failed") } - sc := sarama.NewConfig() - bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NoError(t, err) assert.NotNil(t, bs) err = bs.Shutdown(context.Background()) @@ -141,9 +138,8 @@ func TestBrokerScraper_scrape(t *testing.T) { } func TestBrokersScraper_createBrokerScraper(t *testing.T) { - sc := sarama.NewConfig() newSaramaClient = mockNewSaramaClient - bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NoError(t, err) assert.NotNil(t, bs) } diff --git a/receiver/kafkametricsreceiver/config.go b/receiver/kafkametricsreceiver/config.go index 3174979e0842b..93bf68b534ba3 100644 --- a/receiver/kafkametricsreceiver/config.go +++ b/receiver/kafkametricsreceiver/config.go @@ -6,6 +6,7 @@ package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry- import ( "time" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/scraper/scraperhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" @@ -15,42 +16,51 @@ import ( // Config represents user settings for kafkametrics receiver type Config struct { scraperhelper.ControllerConfig `mapstructure:",squash"` + configkafka.ClientConfig `mapstructure:",squash"` // Alias name of the kafka cluster ClusterAlias string `mapstructure:"cluster_alias"` - // The list of kafka brokers (default localhost:9092) - Brokers []string `mapstructure:"brokers"` - - // ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for - // each of the provided brokers. It will then do a PTR lookup for each - // returned IP, and that set of names becomes the broker list. This can be - // required in SASL environments. - ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"` - - // ProtocolVersion Kafka protocol version - ProtocolVersion string `mapstructure:"protocol_version"` - // TopicMatch topics to collect metrics on TopicMatch string `mapstructure:"topic_match"` // GroupMatch consumer groups to collect on GroupMatch string `mapstructure:"group_match"` - // Authentication data - Authentication configkafka.AuthenticationConfig `mapstructure:"auth"` - // Cluster metadata refresh frequency // Configures the refresh frequency to update cached cluster metadata // Defaults to 10 minutes from Sarama library + // + // If Metadata.RefreshInterval is set, this will be ignored. + // + // Deprecated [v0.122.0]: use Metadata.RefreshInterval instead. RefreshFrequency time.Duration `mapstructure:"refresh_frequency"` // Scrapers defines which metric data points to be captured from kafka Scrapers []string `mapstructure:"scrapers"` - // ClientID is the id associated with the consumer that reads from topics in kafka. - ClientID string `mapstructure:"client_id"` - // MetricsBuilderConfig allows customizing scraped metrics/attributes representation. metadata.MetricsBuilderConfig `mapstructure:",squash"` } + +func (c *Config) Unmarshal(conf *confmap.Conf) error { + if refreshFrequency := conf.Get("refresh_frequency"); refreshFrequency != nil { + metadataConf, err := conf.Sub("metadata") + if err != nil { + return err + } + if !metadataConf.IsSet("refresh_interval") { + // User has not explicitly set metadata.refresh_interval, + // but they have set the (deprecated) refresh_frequency, + // so use that. + if err := conf.Merge(confmap.NewFromStringMap(map[string]any{ + "metadata": map[string]any{ + "refresh_interval": refreshFrequency, + }, + })); err != nil { + return err + } + } + } + return conf.Unmarshal(c) +} diff --git a/receiver/kafkametricsreceiver/config_test.go b/receiver/kafkametricsreceiver/config_test.go index b24875fb83e7a..f565f27ee0d6e 100644 --- a/receiver/kafkametricsreceiver/config_test.go +++ b/receiver/kafkametricsreceiver/config_test.go @@ -6,11 +6,11 @@ package kafkametricsreceiver import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/scraper/scraperhelper" @@ -28,24 +28,19 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NoError(t, sub.Unmarshal(cfg)) + expectedClientConfig := configkafka.NewDefaultClientConfig() + expectedClientConfig.Brokers = []string{"10.10.10.10:9092"} + expectedClientConfig.Metadata.Full = false + expectedClientConfig.Metadata.RefreshInterval = time.Nanosecond // set by refresh_frequency + assert.Equal(t, &Config{ ControllerConfig: scraperhelper.NewDefaultControllerConfig(), - ClusterAlias: "kafka-test", - Brokers: []string{"10.10.10.10:9092"}, - ProtocolVersion: "2.0.0", - TopicMatch: "test_\\w+", - GroupMatch: "test_\\w+", - Authentication: configkafka.AuthenticationConfig{ - TLS: &configtls.ClientConfig{ - Config: configtls.Config{ - CAFile: "ca.pem", - CertFile: "cert.pem", - KeyFile: "key.pem", - }, - }, - }, - RefreshFrequency: 1, - ClientID: defaultClientID, + ClientConfig: expectedClientConfig, + + ClusterAlias: "kafka-test", + TopicMatch: "test_\\w+", + GroupMatch: "test_\\w+", + RefreshFrequency: time.Nanosecond, Scrapers: []string{"brokers", "topics", "consumers"}, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), }, cfg) diff --git a/receiver/kafkametricsreceiver/consumer_scraper.go b/receiver/kafkametricsreceiver/consumer_scraper.go index 58cc54d4eaf4e..bdf64c8d6a49d 100644 --- a/receiver/kafkametricsreceiver/consumer_scraper.go +++ b/receiver/kafkametricsreceiver/consumer_scraper.go @@ -26,7 +26,6 @@ type consumerScraper struct { groupFilter *regexp.Regexp topicFilter *regexp.Regexp clusterAdmin sarama.ClusterAdmin - saramaConfig *sarama.Config config Config mb *metadata.MetricsBuilder } @@ -45,7 +44,7 @@ func (s *consumerScraper) shutdown(_ context.Context) error { func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) { if s.client == nil { - client, err := newSaramaClient(s.config.Brokers, s.saramaConfig) + client, err := newSaramaClient(context.Background(), s.config.ClientConfig) if err != nil { return pmetric.Metrics{}, fmt.Errorf("failed to create client in consumer scraper: %w", err) } @@ -53,7 +52,7 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) { } if s.clusterAdmin == nil { - admin, err := newClusterAdmin(s.config.Brokers, s.saramaConfig) + admin, err := newClusterAdmin(s.client) if err != nil { if s.client != nil { _ = s.client.Close() @@ -166,9 +165,7 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) { return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeError } -func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, - settings receiver.Settings, -) (scraper.Metrics, error) { +func createConsumerScraper(_ context.Context, cfg Config, settings receiver.Settings) (scraper.Metrics, error) { groupFilter, err := regexp.Compile(cfg.GroupMatch) if err != nil { return nil, fmt.Errorf("failed to compile group_match: %w", err) @@ -178,11 +175,10 @@ func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.C return nil, fmt.Errorf("failed to compile topic filter: %w", err) } s := consumerScraper{ - settings: settings, - groupFilter: groupFilter, - topicFilter: topicFilter, - config: cfg, - saramaConfig: saramaConfig, + settings: settings, + groupFilter: groupFilter, + topicFilter: topicFilter, + config: cfg, } return scraper.NewMetrics( s.scrape, diff --git a/receiver/kafkametricsreceiver/consumer_scraper_test.go b/receiver/kafkametricsreceiver/consumer_scraper_test.go index 68ac8d3f862f2..b0c6808d74d0f 100644 --- a/receiver/kafkametricsreceiver/consumer_scraper_test.go +++ b/receiver/kafkametricsreceiver/consumer_scraper_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/receiver/receivertest" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) @@ -45,20 +46,18 @@ func TestConsumerShutdown_closed(t *testing.T) { } func TestConsumerScraper_createConsumerScraper(t *testing.T) { - sc := sarama.NewConfig() newSaramaClient = mockNewSaramaClient newClusterAdmin = mockNewClusterAdmin - cs, err := createConsumerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + cs, err := createConsumerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NoError(t, err) assert.NotNil(t, cs) } func TestConsumerScraper_scrape_handles_client_error(t *testing.T) { - newSaramaClient = func([]string, *sarama.Config) (sarama.Client, error) { + newSaramaClient = func(context.Context, configkafka.ClientConfig) (sarama.Client, error) { return nil, errors.New("new client failed") } - sc := sarama.NewConfig() - cs, err := createConsumerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + cs, err := createConsumerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NoError(t, err) assert.NotNil(t, cs) _, err = cs.ScrapeMetrics(context.Background()) @@ -66,11 +65,10 @@ func TestConsumerScraper_scrape_handles_client_error(t *testing.T) { } func TestConsumerScraper_scrape_handles_nil_client(t *testing.T) { - newSaramaClient = func([]string, *sarama.Config) (sarama.Client, error) { + newSaramaClient = func(context.Context, configkafka.ClientConfig) (sarama.Client, error) { return nil, errors.New("new client failed") } - sc := sarama.NewConfig() - cs, err := createConsumerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + cs, err := createConsumerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NoError(t, err) assert.NotNil(t, cs) err = cs.Shutdown(context.Background()) @@ -78,17 +76,16 @@ func TestConsumerScraper_scrape_handles_nil_client(t *testing.T) { } func TestConsumerScraper_scrape_handles_clusterAdmin_error(t *testing.T) { - newSaramaClient = func([]string, *sarama.Config) (sarama.Client, error) { + newSaramaClient = func(context.Context, configkafka.ClientConfig) (sarama.Client, error) { client := newMockClient() client.Mock. On("Close").Return(nil) return client, nil } - newClusterAdmin = func([]string, *sarama.Config) (sarama.ClusterAdmin, error) { + newClusterAdmin = func(sarama.Client) (sarama.ClusterAdmin, error) { return nil, errors.New("new cluster admin failed") } - sc := sarama.NewConfig() - cs, err := createConsumerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + cs, err := createConsumerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NoError(t, err) assert.NotNil(t, cs) _, err = cs.ScrapeMetrics(context.Background()) @@ -98,8 +95,7 @@ func TestConsumerScraper_scrape_handles_clusterAdmin_error(t *testing.T) { func TestConsumerScraperStart(t *testing.T) { newSaramaClient = mockNewSaramaClient newClusterAdmin = mockNewClusterAdmin - sc := sarama.NewConfig() - cs, err := createConsumerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + cs, err := createConsumerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NoError(t, err) assert.NotNil(t, cs) err = cs.Start(context.Background(), nil) @@ -109,10 +105,9 @@ func TestConsumerScraperStart(t *testing.T) { func TestConsumerScraper_createScraper_handles_invalid_topic_match(t *testing.T) { newSaramaClient = mockNewSaramaClient newClusterAdmin = mockNewClusterAdmin - sc := sarama.NewConfig() cs, err := createConsumerScraper(context.Background(), Config{ TopicMatch: "[", - }, sc, receivertest.NewNopSettings(metadata.Type)) + }, receivertest.NewNopSettings(metadata.Type)) assert.Error(t, err) assert.Nil(t, cs) } @@ -120,10 +115,9 @@ func TestConsumerScraper_createScraper_handles_invalid_topic_match(t *testing.T) func TestConsumerScraper_createScraper_handles_invalid_group_match(t *testing.T) { newSaramaClient = mockNewSaramaClient newClusterAdmin = mockNewClusterAdmin - sc := sarama.NewConfig() cs, err := createConsumerScraper(context.Background(), Config{ GroupMatch: "[", - }, sc, receivertest.NewNopSettings(metadata.Type)) + }, receivertest.NewNopSettings(metadata.Type)) assert.Error(t, err) assert.Nil(t, cs) } diff --git a/receiver/kafkametricsreceiver/factory.go b/receiver/kafkametricsreceiver/factory.go index 522804a1a324c..8eb0bb21e5116 100644 --- a/receiver/kafkametricsreceiver/factory.go +++ b/receiver/kafkametricsreceiver/factory.go @@ -11,14 +11,13 @@ import ( "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/scraper/scraperhelper" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) const ( - defaultBroker = "localhost:9092" defaultGroupMatch = ".*" defaultTopicMatch = "^[^_].*$" - defaultClientID = "otel-metrics-receiver" ) // NewFactory creates kafkametrics receiver factory. @@ -32,10 +31,9 @@ func NewFactory() receiver.Factory { func createDefaultConfig() component.Config { config := &Config{ ControllerConfig: scraperhelper.NewDefaultControllerConfig(), - Brokers: []string{defaultBroker}, + ClientConfig: configkafka.NewDefaultClientConfig(), GroupMatch: defaultGroupMatch, TopicMatch: defaultTopicMatch, - ClientID: defaultClientID, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), } if config.ClusterAlias != "" { diff --git a/receiver/kafkametricsreceiver/factory_test.go b/receiver/kafkametricsreceiver/factory_test.go index 8fb8ba9dc23b5..e3c2a1825f18a 100644 --- a/receiver/kafkametricsreceiver/factory_test.go +++ b/receiver/kafkametricsreceiver/factory_test.go @@ -4,36 +4,15 @@ package kafkametricsreceiver import ( - "context" "testing" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receivertest" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) -func TestCreateDefaultConfig(t *testing.T) { +func TestFactory_CreateDefaultConfig(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() assert.NotNil(t, cfg, "default config not created") assert.NoError(t, componenttest.CheckConfigStruct(cfg)) } - -func TestCreateMetrics(t *testing.T) { - prev := newMetricsReceiver - newMetricsReceiver = func(context.Context, Config, receiver.Settings, consumer.Metrics) (receiver.Metrics, error) { - return nil, nil - } - factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) - cfg.Brokers = []string{"invalid:9092"} - cfg.ProtocolVersion = "2.0.0" - cfg.Scrapers = []string{"topics"} - _, err := createMetricsReceiver(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil) - newMetricsReceiver = prev - assert.NoError(t, err) -} diff --git a/receiver/kafkametricsreceiver/go.mod b/receiver/kafkametricsreceiver/go.mod index e316f1a16950d..63638f517fabb 100644 --- a/receiver/kafkametricsreceiver/go.mod +++ b/receiver/kafkametricsreceiver/go.mod @@ -9,7 +9,6 @@ require ( github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v1.27.1-0.20250313100724-0885401136ff go.opentelemetry.io/collector/component/componenttest v0.121.1-0.20250313100724-0885401136ff - go.opentelemetry.io/collector/config/configtls v1.27.1-0.20250313100724-0885401136ff go.opentelemetry.io/collector/confmap v1.27.1-0.20250313100724-0885401136ff go.opentelemetry.io/collector/consumer v1.27.1-0.20250313100724-0885401136ff go.opentelemetry.io/collector/consumer/consumertest v0.121.1-0.20250313100724-0885401136ff @@ -77,6 +76,7 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.27.1-0.20250313100724-0885401136ff // indirect + go.opentelemetry.io/collector/config/configtls v1.27.1-0.20250313100724-0885401136ff // indirect go.opentelemetry.io/collector/consumer/consumererror v0.121.1-0.20250313100724-0885401136ff // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.121.1-0.20250313100724-0885401136ff // indirect go.opentelemetry.io/collector/featuregate v1.27.1-0.20250313100724-0885401136ff // indirect diff --git a/receiver/kafkametricsreceiver/scraper_test_helper.go b/receiver/kafkametricsreceiver/mocks_test.go similarity index 95% rename from receiver/kafkametricsreceiver/scraper_test_helper.go rename to receiver/kafkametricsreceiver/mocks_test.go index 161caa5120d9c..029db079793c1 100644 --- a/receiver/kafkametricsreceiver/scraper_test_helper.go +++ b/receiver/kafkametricsreceiver/mocks_test.go @@ -4,11 +4,14 @@ package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver" import ( + "context" "errors" "strconv" "github.com/IBM/sarama" "github.com/stretchr/testify/mock" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" ) const ( @@ -25,11 +28,6 @@ const ( testLogRetentionHours = 168 ) -var ( - newSaramaClient = sarama.NewClient - newClusterAdmin = sarama.NewClusterAdmin -) - var ( testTopics = []string{testTopic} testPartitions = []int32{1} @@ -37,11 +35,11 @@ var ( testBrokers = make([]*sarama.Broker, 1) ) -func mockNewSaramaClient([]string, *sarama.Config) (sarama.Client, error) { +func mockNewSaramaClient(context.Context, configkafka.ClientConfig) (sarama.Client, error) { return newMockClient(), nil } -func mockNewClusterAdmin([]string, *sarama.Config) (sarama.ClusterAdmin, error) { +func mockNewClusterAdmin(sarama.Client) (sarama.ClusterAdmin, error) { return newMockClusterAdmin(), nil } diff --git a/receiver/kafkametricsreceiver/receiver.go b/receiver/kafkametricsreceiver/receiver.go index b8b2995b01b46..4123fb204603e 100644 --- a/receiver/kafkametricsreceiver/receiver.go +++ b/receiver/kafkametricsreceiver/receiver.go @@ -18,7 +18,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) -type createKafkaScraper func(context.Context, Config, *sarama.Config, receiver.Settings) (scraper.Metrics, error) +type createKafkaScraper func(context.Context, Config, receiver.Settings) (scraper.Metrics, error) var ( brokersScraperType = component.MustNewType("brokers") @@ -29,6 +29,9 @@ var ( topicsScraperType.String(): createTopicsScraper, consumersScraperType.String(): createConsumerScraper, } + + newSaramaClient = kafka.NewSaramaClient + newClusterAdmin = sarama.NewClusterAdminFromClient ) var newMetricsReceiver = func( @@ -37,30 +40,10 @@ var newMetricsReceiver = func( params receiver.Settings, consumer consumer.Metrics, ) (receiver.Metrics, error) { - sc := sarama.NewConfig() - sc.ClientID = config.ClientID - if config.ResolveCanonicalBootstrapServersOnly { - sc.Net.ResolveCanonicalBootstrapServers = true - } - if config.ProtocolVersion != "" { - version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) - if err != nil { - return nil, err - } - sc.Version = version - } - - if config.RefreshFrequency != 0 { - sc.Metadata.RefreshFrequency = config.RefreshFrequency - } - - if err := kafka.ConfigureSaramaAuthentication(ctx, config.Authentication, sc); err != nil { - return nil, err - } scraperControllerOptions := make([]scraperhelper.ControllerOption, 0, len(config.Scrapers)) for _, scraper := range config.Scrapers { if s, ok := allScrapers[scraper]; ok { - s, err := s(ctx, config, sc, params) + s, err := s(ctx, config, params) if err != nil { return nil, err } diff --git a/receiver/kafkametricsreceiver/receiver_test.go b/receiver/kafkametricsreceiver/receiver_test.go index de716bfd7da29..3eddf88c138dd 100644 --- a/receiver/kafkametricsreceiver/receiver_test.go +++ b/receiver/kafkametricsreceiver/receiver_test.go @@ -8,7 +8,6 @@ import ( "errors" "testing" - "github.com/IBM/sarama" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pmetric" @@ -19,18 +18,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) -func TestNewReceiver_invalid_version_err(t *testing.T) { - c := createDefaultConfig().(*Config) - c.ProtocolVersion = "invalid" - r, err := newMetricsReceiver(context.Background(), *c, receivertest.NewNopSettings(metadata.Type), nil) - assert.Error(t, err) - assert.Nil(t, r) -} - func TestNewReceiver_invalid_scraper_error(t *testing.T) { c := createDefaultConfig().(*Config) c.Scrapers = []string{"brokers", "cpu"} - mockScraper := func(_ context.Context, _ Config, _ *sarama.Config, _ receiver.Settings) (scraper.Metrics, error) { + mockScraper := func(_ context.Context, _ Config, _ receiver.Settings) (scraper.Metrics, error) { return scraper.NewMetrics(func(context.Context) (pmetric.Metrics, error) { return pmetric.Metrics{}, nil }) @@ -55,7 +46,7 @@ func TestNewReceiver_refresh_frequency(t *testing.T) { func TestNewReceiver(t *testing.T) { c := createDefaultConfig().(*Config) c.Scrapers = []string{"brokers"} - mockScraper := func(_ context.Context, _ Config, _ *sarama.Config, _ receiver.Settings) (scraper.Metrics, error) { + mockScraper := func(_ context.Context, _ Config, _ receiver.Settings) (scraper.Metrics, error) { return scraper.NewMetrics( func(context.Context) (pmetric.Metrics, error) { return pmetric.Metrics{}, nil @@ -70,7 +61,7 @@ func TestNewReceiver(t *testing.T) { func TestNewReceiver_handles_scraper_error(t *testing.T) { c := createDefaultConfig().(*Config) c.Scrapers = []string{"brokers"} - mockScraper := func(context.Context, Config, *sarama.Config, receiver.Settings) (scraper.Metrics, error) { + mockScraper := func(context.Context, Config, receiver.Settings) (scraper.Metrics, error) { return nil, errors.New("fail") } allScrapers["brokers"] = mockScraper diff --git a/receiver/kafkametricsreceiver/testdata/config.yaml b/receiver/kafkametricsreceiver/testdata/config.yaml index 290fd73adf52c..936d32021c9b0 100644 --- a/receiver/kafkametricsreceiver/testdata/config.yaml +++ b/receiver/kafkametricsreceiver/testdata/config.yaml @@ -1,16 +1,15 @@ kafkametrics: cluster_alias: kafka-test brokers: 10.10.10.10:9092 - protocol_version: 2.0.0 scrapers: - brokers - topics - consumers - auth: - tls: - ca_file: ca.pem - cert_file: cert.pem - key_file: key.pem + metadata: + # Set a non-default value in metadata to ensure + # the refresh_frequency -> refresh_interval alias + # does not destroy any other config. + full: false refresh_frequency: 1 topic_match: test_\w+ group_match: test_\w+ diff --git a/receiver/kafkametricsreceiver/testdata/integration/expected.yaml b/receiver/kafkametricsreceiver/testdata/integration/expected.yaml deleted file mode 100644 index c10f9f660cf0b..0000000000000 --- a/receiver/kafkametricsreceiver/testdata/integration/expected.yaml +++ /dev/null @@ -1,16 +0,0 @@ -resourceMetrics: - - resource: {} - scopeMetrics: - - metrics: - - description: Number of brokers in the cluster. - sum: - aggregationTemporality: 2 - dataPoints: - - asInt: "1" - startTimeUnixNano: "1685063120199110000" - timeUnixNano: "1685063125236251000" - name: kafka.brokers - unit: '{brokers}' - scope: - name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver - version: latest diff --git a/receiver/kafkametricsreceiver/topic_scraper.go b/receiver/kafkametricsreceiver/topic_scraper.go index eadf7f567a46b..12c7138b1d7e3 100644 --- a/receiver/kafkametricsreceiver/topic_scraper.go +++ b/receiver/kafkametricsreceiver/topic_scraper.go @@ -27,7 +27,6 @@ type topicScraper struct { clusterAdmin sarama.ClusterAdmin settings receiver.Settings topicFilter *regexp.Regexp - saramaConfig *sarama.Config config Config mb *metadata.MetricsBuilder } @@ -52,7 +51,7 @@ func (s *topicScraper) start(_ context.Context, _ component.Host) error { func (s *topicScraper) scrape(context.Context) (pmetric.Metrics, error) { if s.client == nil { - client, err := newSaramaClient(s.config.Brokers, s.saramaConfig) + client, err := newSaramaClient(context.Background(), s.config.ClientConfig) if err != nil { return pmetric.Metrics{}, fmt.Errorf("failed to create client in topics scraper: %w", err) } @@ -123,7 +122,7 @@ func (s *topicScraper) scrapeTopicConfigs(now pcommon.Timestamp, errors scrapere return } if s.clusterAdmin == nil { - admin, err := newClusterAdmin(s.config.Brokers, s.saramaConfig) + admin, err := newClusterAdmin(s.client) if err != nil { s.settings.Logger.Error("Error creating kafka client with admin privileges", zap.Error(err)) return @@ -169,16 +168,15 @@ func (s *topicScraper) scrapeTopicConfigs(now pcommon.Timestamp, errors scrapere } } -func createTopicsScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, settings receiver.Settings) (scraper.Metrics, error) { +func createTopicsScraper(_ context.Context, cfg Config, settings receiver.Settings) (scraper.Metrics, error) { topicFilter, err := regexp.Compile(cfg.TopicMatch) if err != nil { return nil, fmt.Errorf("failed to compile topic filter: %w", err) } s := topicScraper{ - settings: settings, - topicFilter: topicFilter, - saramaConfig: saramaConfig, - config: cfg, + settings: settings, + topicFilter: topicFilter, + config: cfg, } return scraper.NewMetrics( s.scrape, diff --git a/receiver/kafkametricsreceiver/topic_scraper_test.go b/receiver/kafkametricsreceiver/topic_scraper_test.go index aedd1110280fa..3a63370af6d98 100644 --- a/receiver/kafkametricsreceiver/topic_scraper_test.go +++ b/receiver/kafkametricsreceiver/topic_scraper_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/receiver/receivertest" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) @@ -49,19 +50,17 @@ func TestTopicShutdown_closed(t *testing.T) { } func TestTopicScraper_createsScraper(t *testing.T) { - sc := sarama.NewConfig() newSaramaClient = mockNewSaramaClient - ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + ms, err := createTopicsScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NoError(t, err) assert.NotNil(t, ms) } func TestTopicScraper_ScrapeHandlesError(t *testing.T) { - newSaramaClient = func([]string, *sarama.Config) (sarama.Client, error) { + newSaramaClient = func(context.Context, configkafka.ClientConfig) (sarama.Client, error) { return nil, errors.New("no scraper here") } - sc := sarama.NewConfig() - ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + ms, err := createTopicsScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NotNil(t, ms) assert.NoError(t, err) _, err = ms.ScrapeMetrics(context.Background()) @@ -69,11 +68,10 @@ func TestTopicScraper_ScrapeHandlesError(t *testing.T) { } func TestTopicScraper_ShutdownHandlesNilClient(t *testing.T) { - newSaramaClient = func([]string, *sarama.Config) (sarama.Client, error) { + newSaramaClient = func(context.Context, configkafka.ClientConfig) (sarama.Client, error) { return nil, errors.New("no scraper here") } - sc := sarama.NewConfig() - ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + ms, err := createTopicsScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NotNil(t, ms) assert.NoError(t, err) err = ms.Shutdown(context.Background()) @@ -82,8 +80,7 @@ func TestTopicScraper_ShutdownHandlesNilClient(t *testing.T) { func TestTopicScraper_startScraperCreatesClient(t *testing.T) { newSaramaClient = mockNewSaramaClient - sc := sarama.NewConfig() - ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type)) + ms, err := createTopicsScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type)) assert.NotNil(t, ms) assert.NoError(t, err) err = ms.Start(context.Background(), nil) @@ -92,10 +89,9 @@ func TestTopicScraper_startScraperCreatesClient(t *testing.T) { func TestTopicScraper_createScraperHandles_invalid_topicMatch(t *testing.T) { newSaramaClient = mockNewSaramaClient - sc := sarama.NewConfig() ms, err := createTopicsScraper(context.Background(), Config{ TopicMatch: "[", - }, sc, receivertest.NewNopSettings(metadata.Type)) + }, receivertest.NewNopSettings(metadata.Type)) assert.Error(t, err) assert.Nil(t, ms) } From c044cff94d621491781b88f824fa5748f47329e4 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 20 Mar 2025 11:26:42 +0800 Subject: [PATCH 2/2] Fix test --- receiver/kafkareceiver/config_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index ea4319a01cce9..4113ce1536c2a 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -53,7 +53,8 @@ func TestLoadConfig(t *testing.T) { }, }, Metadata: configkafka.MetadataConfig{ - Full: true, + Full: true, + RefreshInterval: 10 * time.Minute, Retry: configkafka.MetadataRetryConfig{ Max: 10, Backoff: time.Second * 5, @@ -92,7 +93,8 @@ func TestLoadConfig(t *testing.T) { }, }, Metadata: configkafka.MetadataConfig{ - Full: true, + Full: true, + RefreshInterval: 10 * time.Minute, Retry: configkafka.MetadataRetryConfig{ Max: 10, Backoff: time.Second * 5,