Skip to content

Commit 397c6fc

Browse files
committed
exporter/kafkaexporter: add signal-specific config
Deprecate `topic` and `encoding`, and introduce signal-specific equivalents: - `logs::topic`, `metrics::topic`, and `traces::topic` - `logs::encoding`, `metrics::encoding`, and `traces::encoding` This enables users to explicitly define a configuration equivalent to the default configuration, or some variation thereof. It also enables specifying different encodings for each signal type, which may be important due to the fact that some encodings only support a subset of signals. Closes open-telemetry#35432
1 parent 27dabac commit 397c6fc

File tree

8 files changed

+198
-39
lines changed

8 files changed

+198
-39
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkaexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: deprecate `topic` and `encoding`, introduce signal-specific configuration
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35432]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/kafkaexporter/config.go

+75-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collect
66
import (
77
"go.opentelemetry.io/collector/component"
88
"go.opentelemetry.io/collector/config/configretry"
9+
"go.opentelemetry.io/collector/confmap"
910
"go.opentelemetry.io/collector/exporter/exporterhelper"
1011

1112
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
@@ -21,13 +22,32 @@ type Config struct {
2122
configkafka.ClientConfig `mapstructure:",squash"`
2223
Producer configkafka.ProducerConfig `mapstructure:"producer"`
2324

24-
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
25+
// Logs holds configuration about how logs should be sent to Kafka.
26+
Logs SignalConfig `mapstructure:"logs"`
27+
28+
// Metrics holds configuration about how metrics should be sent to Kafka.
29+
Metrics SignalConfig `mapstructure:"metrics"`
30+
31+
// Traces holds configuration about how traces should be sent to Kafka.
32+
Traces SignalConfig `mapstructure:"traces"`
33+
34+
// Topic holds the name of the Kafka topic to which data should be exported.
35+
//
36+
// Topic has no default. If explicitly specified, it will take precedence over
37+
// the default values of logs::topic, metrics::topic, and traces::topic.
38+
//
39+
// Deprecated [v0.124.0]: use logs::topic, metrics::topic, and traces::topic instead.
2540
Topic string `mapstructure:"topic"`
2641

2742
// TopicFromAttribute is the name of the attribute to use as the topic name.
2843
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
2944

30-
// Encoding of messages (default "otlp_proto")
45+
// Encoding holds the encoding of Kafka message values.
46+
//
47+
// Encoding has no default. If explicitly specified, it will take precedence over
48+
// the default values of logs::encoding, metrics::encoding, and traces::encoding.
49+
//
50+
// Deprecated [v0.124.0]: use logs::encoding, metrics::encoding, and traces::encoding instead.
3151
Encoding string `mapstructure:"encoding"`
3252

3353
// PartitionTracesByID sets the message key of outgoing trace messages to the trace ID.
@@ -46,3 +66,56 @@ type Config struct {
4666
// attributes.
4767
PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`
4868
}
69+
70+
func (c *Config) Unmarshal(conf *confmap.Conf) error {
71+
if err := conf.Unmarshal(c); err != nil {
72+
return err
73+
}
74+
// Check if deprecated fields have been explicitly set,
75+
// in which case they should be used instead of signal-
76+
// specific defaults.
77+
var zeroConfig Config
78+
if err := conf.Unmarshal(&zeroConfig); err != nil {
79+
return err
80+
}
81+
if c.Topic != "" {
82+
if zeroConfig.Logs.Topic == "" {
83+
c.Logs.Topic = c.Topic
84+
}
85+
if zeroConfig.Metrics.Topic == "" {
86+
c.Metrics.Topic = c.Topic
87+
}
88+
if zeroConfig.Traces.Topic == "" {
89+
c.Traces.Topic = c.Topic
90+
}
91+
}
92+
if c.Encoding != "" {
93+
if zeroConfig.Logs.Encoding == "" {
94+
c.Logs.Encoding = c.Encoding
95+
}
96+
if zeroConfig.Metrics.Encoding == "" {
97+
c.Metrics.Encoding = c.Encoding
98+
}
99+
if zeroConfig.Traces.Encoding == "" {
100+
c.Traces.Encoding = c.Encoding
101+
}
102+
}
103+
return conf.Unmarshal(c)
104+
}
105+
106+
// SignalConfig holds signal-specific configuration for the Kafka exporter.
107+
type SignalConfig struct {
108+
// Topic holds the name of the Kafka topic to which messages of the
109+
// signal type should be produced.
110+
//
111+
// The default depends on the signal type:
112+
// - "otlp_spans" for traces
113+
// - "otlp_metrics" for metrics
114+
// - "otlp_logs" for logs
115+
Topic string `mapstructure:"topic"`
116+
117+
// Encoding holds the encoding of messages for the signal type.
118+
//
119+
// Defaults to "otlp_proto".
120+
Encoding string `mapstructure:"encoding"`
121+
}

exporter/kafkaexporter/config_test.go

+58-1
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,70 @@ func TestLoadConfig(t *testing.T) {
6060
config.RequiredAcks = configkafka.WaitForAll
6161
return config
6262
}(),
63+
Logs: SignalConfig{
64+
Topic: "spans",
65+
Encoding: "otlp_proto",
66+
},
67+
Metrics: SignalConfig{
68+
Topic: "spans",
69+
Encoding: "otlp_proto",
70+
},
71+
Traces: SignalConfig{
72+
Topic: "spans",
73+
Encoding: "otlp_proto",
74+
},
6375
Topic: "spans",
64-
Encoding: "otlp_proto",
6576
PartitionTracesByID: true,
6677
PartitionMetricsByResourceAttributes: true,
6778
PartitionLogsByResourceAttributes: true,
6879
},
6980
},
81+
{
82+
id: component.NewIDWithName(metadata.Type, "legacy_topic"),
83+
expected: &Config{
84+
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
85+
BackOffConfig: configretry.NewDefaultBackOffConfig(),
86+
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
87+
ClientConfig: configkafka.NewDefaultClientConfig(),
88+
Producer: configkafka.NewDefaultProducerConfig(),
89+
Logs: SignalConfig{
90+
Topic: "legacy_topic",
91+
Encoding: "otlp_proto",
92+
},
93+
Metrics: SignalConfig{
94+
Topic: "metrics_topic",
95+
Encoding: "otlp_proto",
96+
},
97+
Traces: SignalConfig{
98+
Topic: "legacy_topic",
99+
Encoding: "otlp_proto",
100+
},
101+
Topic: "legacy_topic",
102+
},
103+
},
104+
{
105+
id: component.NewIDWithName(metadata.Type, "legacy_encoding"),
106+
expected: &Config{
107+
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
108+
BackOffConfig: configretry.NewDefaultBackOffConfig(),
109+
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
110+
ClientConfig: configkafka.NewDefaultClientConfig(),
111+
Producer: configkafka.NewDefaultProducerConfig(),
112+
Logs: SignalConfig{
113+
Topic: "otlp_logs",
114+
Encoding: "legacy_encoding",
115+
},
116+
Metrics: SignalConfig{
117+
Topic: "otlp_metrics",
118+
Encoding: "metrics_encoding",
119+
},
120+
Traces: SignalConfig{
121+
Topic: "otlp_spans",
122+
Encoding: "legacy_encoding",
123+
},
124+
Encoding: "legacy_encoding",
125+
},
126+
},
70127
}
71128

72129
for _, tt := range tests {

exporter/kafkaexporter/factory.go

+19-25
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ import (
1717
)
1818

1919
const (
20-
defaultTracesTopic = "otlp_spans"
21-
defaultMetricsTopic = "otlp_metrics"
22-
defaultLogsTopic = "otlp_logs"
23-
defaultEncoding = "otlp_proto"
20+
defaultLogsTopic = "otlp_logs"
21+
defaultLogsEncoding = "otlp_proto"
22+
defaultMetricsTopic = "otlp_metrics"
23+
defaultMetricsEncoding = "otlp_proto"
24+
defaultTracesTopic = "otlp_spans"
25+
defaultTracesEncoding = "otlp_proto"
26+
2427
// partitioning metrics by resource attributes is disabled by default
2528
defaultPartitionMetricsByResourceAttributesEnabled = false
2629
// partitioning logs by resource attributes is disabled by default
@@ -45,9 +48,18 @@ func createDefaultConfig() component.Config {
4548
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
4649
ClientConfig: configkafka.NewDefaultClientConfig(),
4750
Producer: configkafka.NewDefaultProducerConfig(),
48-
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
49-
Topic: "",
50-
Encoding: defaultEncoding,
51+
Logs: SignalConfig{
52+
Topic: defaultLogsTopic,
53+
Encoding: defaultLogsEncoding,
54+
},
55+
Metrics: SignalConfig{
56+
Topic: defaultMetricsTopic,
57+
Encoding: defaultMetricsEncoding,
58+
},
59+
Traces: SignalConfig{
60+
Topic: defaultTracesTopic,
61+
Encoding: defaultTracesEncoding,
62+
},
5163
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
5264
PartitionLogsByResourceAttributes: defaultPartitionLogsByResourceAttributesEnabled,
5365
}
@@ -59,12 +71,6 @@ func createTracesExporter(
5971
cfg component.Config,
6072
) (exporter.Traces, error) {
6173
oCfg := *(cfg.(*Config)) // Clone the config
62-
if oCfg.Topic == "" {
63-
oCfg.Topic = defaultTracesTopic
64-
}
65-
if oCfg.Encoding == "otlp_json" {
66-
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
67-
}
6874
exp := newTracesExporter(oCfg, set)
6975
return exporterhelper.NewTraces(
7076
ctx,
@@ -88,12 +94,6 @@ func createMetricsExporter(
8894
cfg component.Config,
8995
) (exporter.Metrics, error) {
9096
oCfg := *(cfg.(*Config)) // Clone the config
91-
if oCfg.Topic == "" {
92-
oCfg.Topic = defaultMetricsTopic
93-
}
94-
if oCfg.Encoding == "otlp_json" {
95-
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
96-
}
9797
exp := newMetricsExporter(oCfg, set)
9898
return exporterhelper.NewMetrics(
9999
ctx,
@@ -117,12 +117,6 @@ func createLogsExporter(
117117
cfg component.Config,
118118
) (exporter.Logs, error) {
119119
oCfg := *(cfg.(*Config)) // Clone the config
120-
if oCfg.Topic == "" {
121-
oCfg.Topic = defaultLogsTopic
122-
}
123-
if oCfg.Encoding == "otlp_json" {
124-
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
125-
}
126120
exp := newLogsExporter(oCfg, set)
127121
return exporterhelper.NewLogs(
128122
ctx,

exporter/kafkaexporter/factory_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestCreateMetricExporter(t *testing.T) {
6666
conf: applyConfigOption(func(conf *Config) {
6767
// Disabling broker check to ensure encoding work
6868
conf.Metadata.Full = false
69-
conf.Encoding = defaultEncoding
69+
conf.Encoding = "otlp_proto"
7070
}),
7171
err: nil,
7272
},
@@ -128,7 +128,7 @@ func TestCreateLogExporter(t *testing.T) {
128128
conf: applyConfigOption(func(conf *Config) {
129129
// Disabling broker check to ensure encoding work
130130
conf.Metadata.Full = false
131-
conf.Encoding = defaultEncoding
131+
conf.Encoding = "otlp_proto"
132132
}),
133133
err: nil,
134134
},
@@ -188,7 +188,7 @@ func TestCreateTraceExporter(t *testing.T) {
188188
conf: applyConfigOption(func(conf *Config) {
189189
// Disabling broker check to ensure encoding work
190190
conf.Metadata.Full = false
191-
conf.Encoding = defaultEncoding
191+
conf.Encoding = "otlp_proto"
192192
}),
193193
err: nil,
194194
},

exporter/kafkaexporter/kafka_exporter.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,12 @@ func (e *kafkaExporter[T]) exportData(ctx context.Context, data T) error {
127127
func newTracesExporter(config Config, set exporter.Settings) *kafkaExporter[ptrace.Traces] {
128128
// Jaeger encodings do their own partitioning, so disable trace ID
129129
// partitioning when they are configured.
130-
switch config.Encoding {
130+
switch config.Traces.Encoding {
131131
case "jaeger_proto", "jaeger_json":
132132
config.PartitionTracesByID = false
133133
}
134134
return newKafkaExporter(config, set, func(host component.Host) (kafkaMessager[ptrace.Traces], error) {
135-
marshaler, err := getTracesMarshaler(config.Encoding, host)
135+
marshaler, err := getTracesMarshaler(config.Traces.Encoding, host)
136136
if err != nil {
137137
return nil, err
138138
}
@@ -177,7 +177,7 @@ func (e *kafkaTracesMessager) partitionData(td ptrace.Traces) iter.Seq2[[]byte,
177177

178178
func newLogsExporter(config Config, set exporter.Settings) *kafkaExporter[plog.Logs] {
179179
return newKafkaExporter(config, set, func(host component.Host) (kafkaMessager[plog.Logs], error) {
180-
marshaler, err := getLogsMarshaler(config.Encoding, host)
180+
marshaler, err := getLogsMarshaler(config.Logs.Encoding, host)
181181
if err != nil {
182182
return nil, err
183183
}
@@ -220,7 +220,7 @@ func (e *kafkaLogsMessager) partitionData(ld plog.Logs) iter.Seq2[[]byte, plog.L
220220

221221
func newMetricsExporter(config Config, set exporter.Settings) *kafkaExporter[pmetric.Metrics] {
222222
return newKafkaExporter(config, set, func(host component.Host) (kafkaMessager[pmetric.Metrics], error) {
223-
marshaler, err := getMetricsMarshaler(config.Encoding, host)
223+
marshaler, err := getMetricsMarshaler(config.Metrics.Encoding, host)
224224
if err != nil {
225225
return nil, err
226226
}

exporter/kafkaexporter/kafka_exporter_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func TestTracesPusher_marshal_error(t *testing.T) {
7676
}),
7777
}
7878
config := createDefaultConfig().(*Config)
79-
config.Encoding = "trace_encoding"
79+
config.Traces.Encoding = "trace_encoding"
8080
exp, _ := newMockTracesExporter(t, *config, host)
8181

8282
err := exp.exportData(context.Background(), testdata.GenerateTraces(2))
@@ -108,7 +108,7 @@ func TestTracesPusher_partitioning(t *testing.T) {
108108
})
109109
t.Run("jaeger_partitioning", func(t *testing.T) {
110110
config := createDefaultConfig().(*Config)
111-
config.Encoding = "jaeger_json"
111+
config.Traces.Encoding = "jaeger_json"
112112
exp, producer := newMockTracesExporter(t, *config, componenttest.NewNopHost())
113113

114114
// Jaeger encodings produce one message per span,
@@ -241,7 +241,7 @@ func TestMetricsPusher_marshal_error(t *testing.T) {
241241
}),
242242
}
243243
config := createDefaultConfig().(*Config)
244-
config.Encoding = "metric_encoding"
244+
config.Metrics.Encoding = "metric_encoding"
245245
exp, _ := newMockMetricsExporter(t, *config, host)
246246

247247
err := exp.exportData(context.Background(), testdata.GenerateMetrics(2))
@@ -352,7 +352,7 @@ func TestLogsPusher_marshal_error(t *testing.T) {
352352
}),
353353
}
354354
config := createDefaultConfig().(*Config)
355-
config.Encoding = "log_encoding"
355+
config.Logs.Encoding = "log_encoding"
356356
exp, _ := newMockLogsExporter(t, *config, host)
357357

358358
err := exp.exportData(context.Background(), testdata.GenerateLogs(2))

exporter/kafkaexporter/testdata/config.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,11 @@ kafka:
1919
initial_interval: 10s
2020
max_interval: 60s
2121
max_elapsed_time: 10m
22+
kafka/legacy_topic:
23+
topic: legacy_topic
24+
metrics:
25+
topic: metrics_topic
26+
kafka/legacy_encoding:
27+
encoding: legacy_encoding
28+
metrics:
29+
encoding: metrics_encoding

0 commit comments

Comments
 (0)