Skip to content

Commit da097ef

Browse files
authored
kafkaexporter: use configkafka, extract producer (#38665)
#### Description Use configkafka and extract a function for constructing a sarama.SyncProducer given configkafka.ClientConfig and configkafka.ProducerConfig. As part of the change, `client_id` has been updated to default to "otel-collector" instead of "sarama" by default. This is a breaking change for anyone relying on client ID for monitoring or quota management. #### Link to tracking issue Part of #38411 #### Testing Updated unit tests. Manually tested against Redpanda: `$ docker run --publish=9093:9093 --health-cmd "rpk cluster health | grep 'Healthy:.*true'" docker.redpanda.com/redpandadata/redpanda:v23.1.11 redpanda start --kafka-addr=internal://0.0.0.0:9092,external://0.0.0.0:9093 --smp=1 --memory=1G --mode=dev-container` Ran the collector with `kafkametrics -> kafkaexporter -> Redpanda -> kafkareceiver -> debugexporter`: ```yaml receivers: kafka: brokers: [localhost:9093] kafkametrics: brokers: [localhost:9093] collection_interval: 10s scrapers: [topics, consumers] exporters: debug: verbosity: detailed kafka: brokers: [localhost:9093] service: pipelines: metrics/kafka: receivers: [kafka] exporters: [debug] metrics: receivers: [kafkametrics] exporters: [kafka] ``` #### Documentation Updated README.
1 parent aa8f718 commit da097ef

17 files changed

+108
-493
lines changed
+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkaexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: change default client_id to "otel-collector"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38411]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The exporter now uses the "configkafka" package which consolidates
20+
common configuration structures and default values. As a result of
21+
this change, we update the default client_id value to "otel-collector".
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [user]

exporter/kafkaexporter/README.md

+5-6
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ Kafka exporter exports logs, metrics, and traces to Kafka. This exporter uses a
1717
that blocks and does not batch messages, therefore it should be used with batch and queued retry
1818
processors for higher throughput and resiliency. Message payload encoding is configurable.
1919

20-
The following settings are required:
21-
- `protocol_version` (no default): Kafka protocol version e.g. `2.0.0`.
20+
There are no required settings.
2221

2322
The following settings can be optionally configured:
2423
- `brokers` (default = localhost:9092): The list of kafka brokers.
24+
- `protocol_version` (default = 2.1.0): Kafka protocol version.
2525
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
26-
- `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
26+
- `client_id` (default = "otel-collector"): The client ID to configure the Kafka client with. The client ID will be used for all produce requests.
2727
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the default kafka topic to export to. See [Destination Topic](#destination-topic) below for more details.
2828
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. See [Destination Topic](#destination-topic) below for more details.
2929
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
@@ -92,8 +92,8 @@ The following settings can be optionally configured:
9292
- `requests_per_second` is the average number of requests per seconds.
9393
- `producer`
9494
- `max_message_bytes` (default = 1000000) the maximum permitted size of a message in bytes
95-
- `required_acks` (default = 1) controls when a message is regarded as transmitted. https://pkg.go.dev/github.com/IBM/[email protected]#RequiredAcks
96-
- `compression` (default = 'none') the compression used when producing messages to kafka. The options are: `none`, `gzip`, `snappy`, `lz4`, and `zstd` https://pkg.go.dev/github.com/IBM/[email protected]#CompressionCodec
95+
- `required_acks` (default = 1) controls when a message is regarded as transmitted. https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#acks
96+
- `compression` (default = 'none') the compression used when producing messages to kafka. The options are: `none`, `gzip`, `snappy`, `lz4`, and `zstd` https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#compression-type
9797
- `flush_max_messages` (default = 0) The maximum number of messages the producer will send in a single broker request.
9898

9999
Example configuration:
@@ -103,7 +103,6 @@ exporters:
103103
kafka:
104104
brokers:
105105
- localhost:9092
106-
protocol_version: 2.0.0
107106
```
108107
109108
## Destination Topic

exporter/kafkaexporter/config.go

+4-106
Original file line numberDiff line numberDiff line change
@@ -4,38 +4,22 @@
44
package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
55

66
import (
7-
"fmt"
8-
"time"
9-
10-
"github.com/IBM/sarama"
117
"go.opentelemetry.io/collector/component"
128
"go.opentelemetry.io/collector/config/configretry"
139
"go.opentelemetry.io/collector/exporter/exporterhelper"
1410

1511
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
1612
)
1713

14+
var _ component.Config = (*Config)(nil)
15+
1816
// Config defines configuration for Kafka exporter.
1917
type Config struct {
2018
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
2119
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
2220
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
23-
24-
// The list of kafka brokers (default localhost:9092)
25-
Brokers []string `mapstructure:"brokers"`
26-
27-
// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
28-
// each of the provided brokers. It will then do a PTR lookup for each
29-
// returned IP, and that set of names becomes the broker list. This can be
30-
// required in SASL environments.
31-
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
32-
33-
// Kafka protocol version
34-
ProtocolVersion string `mapstructure:"protocol_version"`
35-
36-
// ClientID to configure the Kafka client with. This can be leveraged by
37-
// Kafka to enforce ACLs, throttling quotas, and more.
38-
ClientID string `mapstructure:"client_id"`
21+
configkafka.ClientConfig `mapstructure:",squash"`
22+
Producer configkafka.ProducerConfig `mapstructure:"producer"`
3923

4024
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
4125
Topic string `mapstructure:"topic"`
@@ -54,90 +38,4 @@ type Config struct {
5438
PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`
5539

5640
PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`
57-
58-
// Metadata is the namespace for metadata management properties used by the
59-
// Client, and shared by the Producer/Consumer.
60-
Metadata Metadata `mapstructure:"metadata"`
61-
62-
// Producer is the namespaces for producer properties used only by the Producer
63-
Producer Producer `mapstructure:"producer"`
64-
65-
// Authentication defines used authentication mechanism.
66-
Authentication configkafka.AuthenticationConfig `mapstructure:"auth"`
67-
}
68-
69-
// Metadata defines configuration for retrieving metadata from the broker.
70-
type Metadata struct {
71-
// Whether to maintain a full set of metadata for all topics, or just
72-
// the minimal set that has been necessary so far. The full set is simpler
73-
// and usually more convenient, but can take up a substantial amount of
74-
// memory if you have many topics and partitions. Defaults to true.
75-
Full bool `mapstructure:"full"`
76-
77-
// Retry configuration for metadata.
78-
// This configuration is useful to avoid race conditions when broker
79-
// is starting at the same time as collector.
80-
Retry MetadataRetry `mapstructure:"retry"`
81-
}
82-
83-
// Producer defines configuration for producer
84-
type Producer struct {
85-
// Maximum message bytes the producer will accept to produce.
86-
MaxMessageBytes int `mapstructure:"max_message_bytes"`
87-
88-
// RequiredAcks Number of acknowledgements required to assume that a message has been sent.
89-
// https://pkg.go.dev/github.com/IBM/[email protected]#RequiredAcks
90-
// The options are:
91-
// 0 -> NoResponse. doesn't send any response
92-
// 1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default )
93-
// -1 -> WaitForAll. waits for all in-sync replicas to commit before responding.
94-
RequiredAcks sarama.RequiredAcks `mapstructure:"required_acks"`
95-
96-
// Compression Codec used to produce messages
97-
// https://pkg.go.dev/github.com/IBM/[email protected]#CompressionCodec
98-
// The options are: 'none', 'gzip', 'snappy', 'lz4', and 'zstd'
99-
Compression string `mapstructure:"compression"`
100-
101-
// The maximum number of messages the producer will send in a single
102-
// broker request. Defaults to 0 for unlimited. Similar to
103-
// `queue.buffering.max.messages` in the JVM producer.
104-
FlushMaxMessages int `mapstructure:"flush_max_messages"`
105-
}
106-
107-
// MetadataRetry defines retry configuration for Metadata.
108-
type MetadataRetry struct {
109-
// The total number of times to retry a metadata request when the
110-
// cluster is in the middle of a leader election or at startup (default 3).
111-
Max int `mapstructure:"max"`
112-
// How long to wait for leader election to occur before retrying
113-
// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
114-
Backoff time.Duration `mapstructure:"backoff"`
115-
}
116-
117-
var _ component.Config = (*Config)(nil)
118-
119-
// Validate checks if the exporter configuration is valid
120-
func (cfg *Config) Validate() error {
121-
if cfg.Producer.RequiredAcks < -1 || cfg.Producer.RequiredAcks > 1 {
122-
return fmt.Errorf("producer.required_acks has to be between -1 and 1. configured value %v", cfg.Producer.RequiredAcks)
123-
}
124-
_, err := saramaProducerCompressionCodec(cfg.Producer.Compression)
125-
return err
126-
}
127-
128-
func saramaProducerCompressionCodec(compression string) (sarama.CompressionCodec, error) {
129-
switch compression {
130-
case "none":
131-
return sarama.CompressionNone, nil
132-
case "gzip":
133-
return sarama.CompressionGZIP, nil
134-
case "snappy":
135-
return sarama.CompressionSnappy, nil
136-
case "lz4":
137-
return sarama.CompressionLZ4, nil
138-
case "zstd":
139-
return sarama.CompressionZSTD, nil
140-
default:
141-
return sarama.CompressionNone, fmt.Errorf("producer.compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value %v", compression)
142-
}
14341
}

0 commit comments

Comments
 (0)