Skip to content

Commit 6efe1fc

Browse files
committed
exporter/kafkaexporter: add signal-specific config
Deprecate `topic` and `encoding`, and introduce signal-specific equivalents: - `logs::topic`, `metrics::topic`, and `traces::topic` - `logs::encoding`, `metrics::encoding`, and `traces::encoding` This enables users to explicitly define a configuration equivalent to the default configuration, or some variation thereof. It also enables specifying different encodings for each signal type, which may be important due to the fact that some encodings only support a subset of signals. Closes open-telemetry#35432
1 parent 27dabac commit 6efe1fc

File tree

9 files changed

+233
-51
lines changed

9 files changed

+233
-51
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkaexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: deprecate `topic` and `encoding`, introduce signal-specific configuration
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35432]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/kafkaexporter/README.md

+35-12
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,27 @@ Kafka exporter exports logs, metrics, and traces to Kafka. This exporter uses a
1717
that blocks and does not batch messages, therefore it should be used with batch and queued retry
1818
processors for higher throughput and resiliency. Message payload encoding is configurable.
1919

20+
## Configuration settings
21+
2022
There are no required settings.
2123

2224
The following settings can be optionally configured:
2325
- `brokers` (default = localhost:9092): The list of kafka brokers.
2426
- `protocol_version` (default = 2.1.0): Kafka protocol version.
2527
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
2628
- `client_id` (default = "otel-collector"): The client ID to configure the Kafka client with. The client ID will be used for all produce requests.
27-
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the default kafka topic to export to. See [Destination Topic](#destination-topic) below for more details.
29+
- `logs`
30+
- `topic` (default = otlp\_logs): The name of the Kafka topic to which logs will be exported.
31+
- `encoding` (default = otlp\_proto): The encoding for logs. See [Supported encodings](#supported-encodings).
32+
- `metrics`
33+
- `topic` (default = otlp\_metrics): The name of the Kafka topic from which to consume metrics.
34+
- `encoding` (default = otlp\_proto): The encoding for metrics. See [Supported encodings](#supported-encodings).
35+
- `traces`
36+
- `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces.
37+
- `encoding` (default = otlp\_proto): The encoding for traces. See [Supported encodings](#supported-encodings).
38+
- `topic` (Deprecated in v0.124.0: use `logs::topic`, `metrics::topic`, and `traces::topic`) If specified, this is used as the default topic, but will be overridden by signal-specific configuration. See [Destination Topic](#destination-topic) below for more details.
2839
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. See [Destination Topic](#destination-topic) below for more details.
29-
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
30-
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
31-
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
32-
- The following encodings are valid *only* for **traces**.
33-
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID.
34-
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.
35-
- `zipkin_proto`: the payload is serialized to Zipkin v2 proto Span.
36-
- `zipkin_json`: the payload is serialized to Zipkin v2 JSON Span.
37-
- The following encodings are valid *only* for **logs**.
38-
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
40+
- `encoding` (Deprecated in v0.124.0: use `logs::encoding`, `metrics::encoding`, and `traces::encoding`) If specified, this is used as the default encoding, but will be overridden by signal-specific configuration. See [Supported encodings](#supported-encodings) below for more details.
3941
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
4042
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
4143
- `partition_logs_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
@@ -96,6 +98,25 @@ The following settings can be optionally configured:
9698
- `compression` (default = 'none') the compression used when producing messages to kafka. The options are: `none`, `gzip`, `snappy`, `lz4`, and `zstd` https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#compression-type
9799
- `flush_max_messages` (default = 0) The maximum number of messages the producer will send in a single broker request.
98100

101+
### Supported encodings
102+
103+
The Kafka exporter supports encoding extensions, as well as the following built-in encodings.
104+
105+
Available for all signals:
106+
- `otlp_proto`: data is encoded as OTLP Protobuf Protobuf
107+
- `otlp_json`: data is encoded as OTLP JSON
108+
109+
Available only for traces:
110+
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID.
111+
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.
112+
- `zipkin_proto`: the payload is serialized to Zipkin v2 proto Span.
113+
- `zipkin_json`: the payload is serialized to Zipkin v2 JSON Span.
114+
115+
Available only for logs:
116+
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
117+
118+
### Example configuration
119+
99120
Example configuration:
100121

101122
```yaml
@@ -106,7 +127,9 @@ exporters:
106127
```
107128
108129
## Destination Topic
130+
109131
The destination topic can be defined in a few different ways and takes priority in the following order:
132+
110133
1. When `topic_from_attribute` is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
111134
2. If a prior component in the collector pipeline sets the topic on the context via the `topic.WithTopic` function (from the `github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic` package), the value set in the context is used.
112-
3. Finally, the `topic` configuration is used as a default/fallback destination.
135+
3. Finally, the `<signal>::topic` configuration is used for the signal-specific destination topic. If this is not explicitly configured, the `topic` configuration (deprecated in v0.124.0) is used as a fallback for all signals.

exporter/kafkaexporter/config.go

+75-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collect
66
import (
77
"go.opentelemetry.io/collector/component"
88
"go.opentelemetry.io/collector/config/configretry"
9+
"go.opentelemetry.io/collector/confmap"
910
"go.opentelemetry.io/collector/exporter/exporterhelper"
1011

1112
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
@@ -21,13 +22,32 @@ type Config struct {
2122
configkafka.ClientConfig `mapstructure:",squash"`
2223
Producer configkafka.ProducerConfig `mapstructure:"producer"`
2324

24-
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
25+
// Logs holds configuration about how logs should be sent to Kafka.
26+
Logs SignalConfig `mapstructure:"logs"`
27+
28+
// Metrics holds configuration about how metrics should be sent to Kafka.
29+
Metrics SignalConfig `mapstructure:"metrics"`
30+
31+
// Traces holds configuration about how traces should be sent to Kafka.
32+
Traces SignalConfig `mapstructure:"traces"`
33+
34+
// Topic holds the name of the Kafka topic to which data should be exported.
35+
//
36+
// Topic has no default. If explicitly specified, it will take precedence over
37+
// the default values of logs::topic, metrics::topic, and traces::topic.
38+
//
39+
// Deprecated [v0.124.0]: use logs::topic, metrics::topic, and traces::topic instead.
2540
Topic string `mapstructure:"topic"`
2641

2742
// TopicFromAttribute is the name of the attribute to use as the topic name.
2843
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
2944

30-
// Encoding of messages (default "otlp_proto")
45+
// Encoding holds the encoding of Kafka message values.
46+
//
47+
// Encoding has no default. If explicitly specified, it will take precedence over
48+
// the default values of logs::encoding, metrics::encoding, and traces::encoding.
49+
//
50+
// Deprecated [v0.124.0]: use logs::encoding, metrics::encoding, and traces::encoding instead.
3151
Encoding string `mapstructure:"encoding"`
3252

3353
// PartitionTracesByID sets the message key of outgoing trace messages to the trace ID.
@@ -46,3 +66,56 @@ type Config struct {
4666
// attributes.
4767
PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`
4868
}
69+
70+
func (c *Config) Unmarshal(conf *confmap.Conf) error {
71+
if err := conf.Unmarshal(c); err != nil {
72+
return err
73+
}
74+
// Check if deprecated fields have been explicitly set,
75+
// in which case they should be used instead of signal-
76+
// specific defaults.
77+
var zeroConfig Config
78+
if err := conf.Unmarshal(&zeroConfig); err != nil {
79+
return err
80+
}
81+
if c.Topic != "" {
82+
if zeroConfig.Logs.Topic == "" {
83+
c.Logs.Topic = c.Topic
84+
}
85+
if zeroConfig.Metrics.Topic == "" {
86+
c.Metrics.Topic = c.Topic
87+
}
88+
if zeroConfig.Traces.Topic == "" {
89+
c.Traces.Topic = c.Topic
90+
}
91+
}
92+
if c.Encoding != "" {
93+
if zeroConfig.Logs.Encoding == "" {
94+
c.Logs.Encoding = c.Encoding
95+
}
96+
if zeroConfig.Metrics.Encoding == "" {
97+
c.Metrics.Encoding = c.Encoding
98+
}
99+
if zeroConfig.Traces.Encoding == "" {
100+
c.Traces.Encoding = c.Encoding
101+
}
102+
}
103+
return conf.Unmarshal(c)
104+
}
105+
106+
// SignalConfig holds signal-specific configuration for the Kafka exporter.
107+
type SignalConfig struct {
108+
// Topic holds the name of the Kafka topic to which messages of the
109+
// signal type should be produced.
110+
//
111+
// The default depends on the signal type:
112+
// - "otlp_spans" for traces
113+
// - "otlp_metrics" for metrics
114+
// - "otlp_logs" for logs
115+
Topic string `mapstructure:"topic"`
116+
117+
// Encoding holds the encoding of messages for the signal type.
118+
//
119+
// Defaults to "otlp_proto".
120+
Encoding string `mapstructure:"encoding"`
121+
}

exporter/kafkaexporter/config_test.go

+58-1
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,70 @@ func TestLoadConfig(t *testing.T) {
6060
config.RequiredAcks = configkafka.WaitForAll
6161
return config
6262
}(),
63+
Logs: SignalConfig{
64+
Topic: "spans",
65+
Encoding: "otlp_proto",
66+
},
67+
Metrics: SignalConfig{
68+
Topic: "spans",
69+
Encoding: "otlp_proto",
70+
},
71+
Traces: SignalConfig{
72+
Topic: "spans",
73+
Encoding: "otlp_proto",
74+
},
6375
Topic: "spans",
64-
Encoding: "otlp_proto",
6576
PartitionTracesByID: true,
6677
PartitionMetricsByResourceAttributes: true,
6778
PartitionLogsByResourceAttributes: true,
6879
},
6980
},
81+
{
82+
id: component.NewIDWithName(metadata.Type, "legacy_topic"),
83+
expected: &Config{
84+
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
85+
BackOffConfig: configretry.NewDefaultBackOffConfig(),
86+
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
87+
ClientConfig: configkafka.NewDefaultClientConfig(),
88+
Producer: configkafka.NewDefaultProducerConfig(),
89+
Logs: SignalConfig{
90+
Topic: "legacy_topic",
91+
Encoding: "otlp_proto",
92+
},
93+
Metrics: SignalConfig{
94+
Topic: "metrics_topic",
95+
Encoding: "otlp_proto",
96+
},
97+
Traces: SignalConfig{
98+
Topic: "legacy_topic",
99+
Encoding: "otlp_proto",
100+
},
101+
Topic: "legacy_topic",
102+
},
103+
},
104+
{
105+
id: component.NewIDWithName(metadata.Type, "legacy_encoding"),
106+
expected: &Config{
107+
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
108+
BackOffConfig: configretry.NewDefaultBackOffConfig(),
109+
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
110+
ClientConfig: configkafka.NewDefaultClientConfig(),
111+
Producer: configkafka.NewDefaultProducerConfig(),
112+
Logs: SignalConfig{
113+
Topic: "otlp_logs",
114+
Encoding: "legacy_encoding",
115+
},
116+
Metrics: SignalConfig{
117+
Topic: "otlp_metrics",
118+
Encoding: "metrics_encoding",
119+
},
120+
Traces: SignalConfig{
121+
Topic: "otlp_spans",
122+
Encoding: "legacy_encoding",
123+
},
124+
Encoding: "legacy_encoding",
125+
},
126+
},
70127
}
71128

72129
for _, tt := range tests {

exporter/kafkaexporter/factory.go

+19-25
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ import (
1717
)
1818

1919
const (
20-
defaultTracesTopic = "otlp_spans"
21-
defaultMetricsTopic = "otlp_metrics"
22-
defaultLogsTopic = "otlp_logs"
23-
defaultEncoding = "otlp_proto"
20+
defaultLogsTopic = "otlp_logs"
21+
defaultLogsEncoding = "otlp_proto"
22+
defaultMetricsTopic = "otlp_metrics"
23+
defaultMetricsEncoding = "otlp_proto"
24+
defaultTracesTopic = "otlp_spans"
25+
defaultTracesEncoding = "otlp_proto"
26+
2427
// partitioning metrics by resource attributes is disabled by default
2528
defaultPartitionMetricsByResourceAttributesEnabled = false
2629
// partitioning logs by resource attributes is disabled by default
@@ -45,9 +48,18 @@ func createDefaultConfig() component.Config {
4548
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
4649
ClientConfig: configkafka.NewDefaultClientConfig(),
4750
Producer: configkafka.NewDefaultProducerConfig(),
48-
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
49-
Topic: "",
50-
Encoding: defaultEncoding,
51+
Logs: SignalConfig{
52+
Topic: defaultLogsTopic,
53+
Encoding: defaultLogsEncoding,
54+
},
55+
Metrics: SignalConfig{
56+
Topic: defaultMetricsTopic,
57+
Encoding: defaultMetricsEncoding,
58+
},
59+
Traces: SignalConfig{
60+
Topic: defaultTracesTopic,
61+
Encoding: defaultTracesEncoding,
62+
},
5163
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
5264
PartitionLogsByResourceAttributes: defaultPartitionLogsByResourceAttributesEnabled,
5365
}
@@ -59,12 +71,6 @@ func createTracesExporter(
5971
cfg component.Config,
6072
) (exporter.Traces, error) {
6173
oCfg := *(cfg.(*Config)) // Clone the config
62-
if oCfg.Topic == "" {
63-
oCfg.Topic = defaultTracesTopic
64-
}
65-
if oCfg.Encoding == "otlp_json" {
66-
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
67-
}
6874
exp := newTracesExporter(oCfg, set)
6975
return exporterhelper.NewTraces(
7076
ctx,
@@ -88,12 +94,6 @@ func createMetricsExporter(
8894
cfg component.Config,
8995
) (exporter.Metrics, error) {
9096
oCfg := *(cfg.(*Config)) // Clone the config
91-
if oCfg.Topic == "" {
92-
oCfg.Topic = defaultMetricsTopic
93-
}
94-
if oCfg.Encoding == "otlp_json" {
95-
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
96-
}
9797
exp := newMetricsExporter(oCfg, set)
9898
return exporterhelper.NewMetrics(
9999
ctx,
@@ -117,12 +117,6 @@ func createLogsExporter(
117117
cfg component.Config,
118118
) (exporter.Logs, error) {
119119
oCfg := *(cfg.(*Config)) // Clone the config
120-
if oCfg.Topic == "" {
121-
oCfg.Topic = defaultLogsTopic
122-
}
123-
if oCfg.Encoding == "otlp_json" {
124-
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
125-
}
126120
exp := newLogsExporter(oCfg, set)
127121
return exporterhelper.NewLogs(
128122
ctx,

exporter/kafkaexporter/factory_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestCreateMetricExporter(t *testing.T) {
6666
conf: applyConfigOption(func(conf *Config) {
6767
// Disabling broker check to ensure encoding work
6868
conf.Metadata.Full = false
69-
conf.Encoding = defaultEncoding
69+
conf.Encoding = "otlp_proto"
7070
}),
7171
err: nil,
7272
},
@@ -128,7 +128,7 @@ func TestCreateLogExporter(t *testing.T) {
128128
conf: applyConfigOption(func(conf *Config) {
129129
// Disabling broker check to ensure encoding work
130130
conf.Metadata.Full = false
131-
conf.Encoding = defaultEncoding
131+
conf.Encoding = "otlp_proto"
132132
}),
133133
err: nil,
134134
},
@@ -188,7 +188,7 @@ func TestCreateTraceExporter(t *testing.T) {
188188
conf: applyConfigOption(func(conf *Config) {
189189
// Disabling broker check to ensure encoding work
190190
conf.Metadata.Full = false
191-
conf.Encoding = defaultEncoding
191+
conf.Encoding = "otlp_proto"
192192
}),
193193
err: nil,
194194
},

0 commit comments

Comments
 (0)