Skip to content

receiver/kafkametricsreceiver: use common config and clients #38634

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .chloggen/kafkametricsreceiver-embed-clientconfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkametricsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: change default client_id to "otel-collector", deprecate "refresh_frequency"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [38411]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The receiver now uses the "configkafka" package which consolidates
common configuration structures and default values. As a result of
this change, we update the default client_id value to "otel-collector",
and deprecate "refresh_frequency" in favour of "metadata.refresh_interval".

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
10 changes: 10 additions & 0 deletions internal/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ var saramaCompressionCodecs = map[string]sarama.CompressionCodec{
"zstd": sarama.CompressionZSTD,
}

// NewSaramaClient returns a new Kafka client with the given configuration.
func NewSaramaClient(ctx context.Context, config configkafka.ClientConfig) (sarama.Client, error) {
saramaConfig, err := NewSaramaClientConfig(ctx, config)
if err != nil {
return nil, err
}
return sarama.NewClient(config.Brokers, saramaConfig)
}

// NewSaramaClusterAdminClient returns a new Kafka cluster admin client with the given configuration.
func NewSaramaClusterAdminClient(ctx context.Context, config configkafka.ClientConfig) (sarama.ClusterAdmin, error) {
saramaConfig, err := NewSaramaClientConfig(ctx, config)
Expand Down Expand Up @@ -60,6 +69,7 @@ func NewSaramaSyncProducer(
func NewSaramaClientConfig(ctx context.Context, config configkafka.ClientConfig) (*sarama.Config, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Metadata.Full = config.Metadata.Full
saramaConfig.Metadata.RefreshFrequency = config.Metadata.RefreshInterval
saramaConfig.Metadata.Retry.Max = config.Metadata.Retry.Max
saramaConfig.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
if config.ResolveCanonicalBootstrapServersOnly {
Expand Down
7 changes: 6 additions & 1 deletion internal/kafka/configkafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ type MetadataConfig struct {
// memory if you have many topics and partitions. Defaults to true.
Full bool `mapstructure:"full"`

// RefreshInterval controls the frequency at which cluster metadata is
// refreshed. Defaults to 10 minutes.
RefreshInterval time.Duration `mapstructure:"refresh_interval"`

// Retry configuration for metadata.
// This configuration is useful to avoid race conditions when broker
// is starting at the same time as collector.
Expand All @@ -245,7 +249,8 @@ type MetadataRetryConfig struct {

func NewDefaultMetadataConfig() MetadataConfig {
return MetadataConfig{
Full: true,
Full: true,
RefreshInterval: 10 * time.Minute,
Retry: MetadataRetryConfig{
Max: 3,
Backoff: time.Millisecond * 250,
Expand Down
3 changes: 2 additions & 1 deletion internal/kafka/configkafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func TestClientConfig(t *testing.T) {
},
},
Metadata: MetadataConfig{
Full: false,
Full: false,
RefreshInterval: 10 * time.Minute,
Retry: MetadataRetryConfig{
Max: 10,
Backoff: 5 * time.Second,
Expand Down
18 changes: 13 additions & 5 deletions receiver/kafkametricsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ This receiver supports Kafka versions:

Required settings (no defaults):

- `protocol_version`: Kafka protocol version
- `scrapers`: any combination of the following scrapers can be enabled.
- `topics`
- `consumers`
Expand All @@ -36,11 +35,12 @@ Metrics collected by the associated scraper are listed in [metadata.yaml](metada
Optional Settings (with defaults):

- `cluster_alias`: Alias name of the cluster. Adds `kafka.cluster.alias` resource attribute.
- `protocol_version` (default = 2.1.0): Kafka protocol version
- `brokers` (default = localhost:9092): the list of brokers to read from.
- `resolve_canonical_bootstrap_servers_only` (default = false): whether to resolve then reverse-lookup broker IPs during startup.
- `topic_match` (default = ^[^_].*$): regex pattern of topics to filter on metrics collection. The default filter excludes internal topics (starting with `_`).
- `group_match` (default = .*): regex pattern of consumer groups to filter on for metrics.
- `client_id` (default = otel-metrics-receiver): consumer client id
- `client_id` (default = otel-collector): consumer client id
- `collection_interval` (default = 1m): frequency of metric collection/scraping.
- `initial_delay` (default = `1s`): defines how long this receiver waits before starting.
- `auth` (default none)
Expand Down Expand Up @@ -68,6 +68,11 @@ Optional Settings (with defaults):
- `config_file`: Path to Kerberos configuration. i.e /etc/krb5.conf
- `keytab_file`: Path to keytab file. i.e /etc/security/kafka.keytab
- `disable_fast_negotiation`: Disable PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation. This is set to `false` by default.
- `metadata`
- `full` (default = true): Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.
- `retry`
- `max` (default = 3): The number of retries to get metadata
- `backoff` (default = 250ms): How long to wait between metadata retries

## Examples:

Expand All @@ -76,7 +81,6 @@ Optional Settings (with defaults):
```yaml
receivers:
kafkametrics:
protocol_version: 2.0.0
scrapers:
- brokers
- topics
Expand All @@ -86,14 +90,18 @@ receivers:
2) Configuration with more optional settings:

For this example:
- A non-default broker is specified
- cluster alias is set to "kafka-prod"
- collection interval is 5 secs.
- Kafka protocol version is 3.0.0
- mTLS is configured

```yaml
receivers:
kafkametrics:
cluster_alias: kafka-prod
brokers: 10.10.10.10:9092
protocol_version: 2.0.0
brokers: ["10.10.10.10:9092"]
protocol_version: 3.0.0
scrapers:
- brokers
- topics
Expand Down
14 changes: 5 additions & 9 deletions receiver/kafkametricsreceiver/broker_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type brokerScraper struct {
client sarama.Client
settings receiver.Settings
config Config
saramaConfig *sarama.Config
clusterAdmin sarama.ClusterAdmin
mb *metadata.MetricsBuilder
}
Expand All @@ -50,7 +49,7 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
scrapeErrors := scrapererror.ScrapeErrors{}

if s.client == nil {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
client, err := newSaramaClient(context.Background(), s.config.ClientConfig)
if err != nil {
return pmetric.Metrics{}, fmt.Errorf("failed to create client in brokers scraper: %w", err)
}
Expand All @@ -68,7 +67,7 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
}

if s.clusterAdmin == nil {
admin, err := newClusterAdmin(s.config.Brokers, s.saramaConfig)
admin, err := newClusterAdmin(s.client)
if err != nil {
s.settings.Logger.Error("Error creating kafka client with admin privileges", zap.Error(err))
return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrors.Combine()
Expand Down Expand Up @@ -102,13 +101,10 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrors.Combine()
}

func createBrokerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config,
settings receiver.Settings,
) (scraper.Metrics, error) {
func createBrokerScraper(_ context.Context, cfg Config, settings receiver.Settings) (scraper.Metrics, error) {
s := brokerScraper{
settings: settings,
config: cfg,
saramaConfig: saramaConfig,
settings: settings,
config: cfg,
}
return scraper.NewMetrics(
s.scrape,
Expand Down
20 changes: 8 additions & 12 deletions receiver/kafkametricsreceiver/broker_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

Expand Down Expand Up @@ -48,40 +49,36 @@ func TestBrokerShutdown_closed(t *testing.T) {
}

func TestBrokerScraper_createBrokerScraper(t *testing.T) {
sc := sarama.NewConfig()
newSaramaClient = mockNewSaramaClient
bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type))
bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type))
assert.NoError(t, err)
assert.NotNil(t, bs)
}

func TestBrokerScraperStart(t *testing.T) {
newSaramaClient = mockNewSaramaClient
sc := sarama.NewConfig()
bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type))
bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type))
assert.NoError(t, err)
assert.NotNil(t, bs)
assert.NoError(t, bs.Start(context.Background(), nil))
}

func TestBrokerScraper_scrape_handles_client_error(t *testing.T) {
newSaramaClient = func([]string, *sarama.Config) (sarama.Client, error) {
newSaramaClient = func(context.Context, configkafka.ClientConfig) (sarama.Client, error) {
return nil, errors.New("new client failed")
}
sc := sarama.NewConfig()
bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type))
bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type))
assert.NoError(t, err)
assert.NotNil(t, bs)
_, err = bs.ScrapeMetrics(context.Background())
assert.Error(t, err)
}

func TestBrokerScraper_shutdown_handles_nil_client(t *testing.T) {
newSaramaClient = func([]string, *sarama.Config) (sarama.Client, error) {
newSaramaClient = func(context.Context, configkafka.ClientConfig) (sarama.Client, error) {
return nil, errors.New("new client failed")
}
sc := sarama.NewConfig()
bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type))
bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type))
assert.NoError(t, err)
assert.NotNil(t, bs)
err = bs.Shutdown(context.Background())
Expand Down Expand Up @@ -141,9 +138,8 @@ func TestBrokerScraper_scrape(t *testing.T) {
}

func TestBrokersScraper_createBrokerScraper(t *testing.T) {
sc := sarama.NewConfig()
newSaramaClient = mockNewSaramaClient
bs, err := createBrokerScraper(context.Background(), Config{}, sc, receivertest.NewNopSettings(metadata.Type))
bs, err := createBrokerScraper(context.Background(), Config{}, receivertest.NewNopSettings(metadata.Type))
assert.NoError(t, err)
assert.NotNil(t, bs)
}
46 changes: 28 additions & 18 deletions receiver/kafkametricsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-
import (
"time"

"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/scraper/scraperhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
Expand All @@ -15,42 +16,51 @@ import (
// Config represents user settings for kafkametrics receiver
type Config struct {
scraperhelper.ControllerConfig `mapstructure:",squash"`
configkafka.ClientConfig `mapstructure:",squash"`

// Alias name of the kafka cluster
ClusterAlias string `mapstructure:"cluster_alias"`

// The list of kafka brokers (default localhost:9092)
Brokers []string `mapstructure:"brokers"`

// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
// each of the provided brokers. It will then do a PTR lookup for each
// returned IP, and that set of names becomes the broker list. This can be
// required in SASL environments.
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`

// ProtocolVersion Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`

// TopicMatch topics to collect metrics on
TopicMatch string `mapstructure:"topic_match"`

// GroupMatch consumer groups to collect on
GroupMatch string `mapstructure:"group_match"`

// Authentication data
Authentication configkafka.AuthenticationConfig `mapstructure:"auth"`

// Cluster metadata refresh frequency
// Configures the refresh frequency to update cached cluster metadata
// Defaults to 10 minutes from Sarama library
//
// If Metadata.RefreshInterval is set, this will be ignored.
//
// Deprecated [v0.122.0]: use Metadata.RefreshInterval instead.
RefreshFrequency time.Duration `mapstructure:"refresh_frequency"`

// Scrapers defines which metric data points to be captured from kafka
Scrapers []string `mapstructure:"scrapers"`

// ClientID is the id associated with the consumer that reads from topics in kafka.
ClientID string `mapstructure:"client_id"`

// MetricsBuilderConfig allows customizing scraped metrics/attributes representation.
metadata.MetricsBuilderConfig `mapstructure:",squash"`
}

func (c *Config) Unmarshal(conf *confmap.Conf) error {
if refreshFrequency := conf.Get("refresh_frequency"); refreshFrequency != nil {
metadataConf, err := conf.Sub("metadata")
if err != nil {
return err
}
if !metadataConf.IsSet("refresh_interval") {
// User has not explicitly set metadata.refresh_interval,
// but they have set the (deprecated) refresh_frequency,
// so use that.
if err := conf.Merge(confmap.NewFromStringMap(map[string]any{
"metadata": map[string]any{
"refresh_interval": refreshFrequency,
},
})); err != nil {
return err
}
}
}
return conf.Unmarshal(c)
}
29 changes: 12 additions & 17 deletions receiver/kafkametricsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ package kafkametricsreceiver
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/scraper/scraperhelper"

Expand All @@ -28,24 +28,19 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

expectedClientConfig := configkafka.NewDefaultClientConfig()
expectedClientConfig.Brokers = []string{"10.10.10.10:9092"}
expectedClientConfig.Metadata.Full = false
expectedClientConfig.Metadata.RefreshInterval = time.Nanosecond // set by refresh_frequency

assert.Equal(t, &Config{
ControllerConfig: scraperhelper.NewDefaultControllerConfig(),
ClusterAlias: "kafka-test",
Brokers: []string{"10.10.10.10:9092"},
ProtocolVersion: "2.0.0",
TopicMatch: "test_\\w+",
GroupMatch: "test_\\w+",
Authentication: configkafka.AuthenticationConfig{
TLS: &configtls.ClientConfig{
Config: configtls.Config{
CAFile: "ca.pem",
CertFile: "cert.pem",
KeyFile: "key.pem",
},
},
},
RefreshFrequency: 1,
ClientID: defaultClientID,
ClientConfig: expectedClientConfig,

ClusterAlias: "kafka-test",
TopicMatch: "test_\\w+",
GroupMatch: "test_\\w+",
RefreshFrequency: time.Nanosecond,
Scrapers: []string{"brokers", "topics", "consumers"},
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
}, cfg)
Expand Down
Loading