Skip to content

Commit dcf3c51

Browse files
committed
kafkareceiver: add signal-specific topic/encoding
Add signal-specific configuration for topic and encoding. The topics are already signal-specific by default, it just hasn't been possible to explicitly configure a different topic for each signal. Thus if you set the `topic: foo`, it would be used for all signals, which is never going to work with the receiver. Similarly, while the default encoding is the same for all signals (i.e. otlp_proto), some encodings are available only for certain signals, e.g. azure_resource_logs is (obviously) only available for logs. This means you could not use the same receiver for multiple signals unless they each used the same encoding. To address both of these issues we introduce signal-specific configuration: `logs::topic`, `metrics::topic`, `traces::topic`, `logs::encoding`, `metrics::encoding`, and `traces::encoding`. The existing `topic` and `encoding` configuration have been deprecated. If the new fields are set, they will take precedence; otherwise if the deprecated fields are set they will be used. The defaults have not changed. Fixes open-telemetry#32735
1 parent 603b51d commit dcf3c51

11 files changed

+359
-170
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add signal-specific topic and encoding config, deprecate existing topic/encoding config.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [32735]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/README.md

+58-44
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
1414
<!-- end autogenerated section -->
1515

16-
Kafka receiver receives traces, metrics, and logs from Kafka. Message payload encoding is configurable.
17-
18-
Note that metrics and logs only support OTLP.
16+
Kafka receiver receives telemetry data from Kafka, with configurable topics and encodings.
1917

2018
## Getting Started
2119

@@ -26,20 +24,19 @@ The following settings can be optionally configured:
2624
- `brokers` (default = localhost:9092): The list of kafka brokers.
2725
- `protocol_version` (default = 2.1.0): Kafka protocol version.
2826
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
29-
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to read from.
30-
Only one telemetry type may be used for a given topic.
31-
- `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Supports encoding extensions. Tries to load an encoding extension and falls back to internal encodings if no extension was loaded. Available internal encodings:
32-
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively.
33-
- `otlp_json`: the payload is deserialized to `ExportTraceServiceRequest` `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively using JSON encoding.
34-
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
35-
- `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`.
36-
- `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans.
37-
- `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans.
38-
- `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans.
39-
- `raw`: (logs only) the payload's bytes are inserted as the body of a log record.
40-
- `text`: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use `text_<ENCODING>`, like `text_utf-8`, `text_shift_jis`, etc., to customize this behavior.
41-
- `json`: (logs only) the payload is decoded as JSON and inserted as the body of a log record.
42-
- `azure_resource_logs`: (logs only) the payload is converted from Azure Resource Logs format to OTel format.
27+
- `logs`
28+
- `topic` (default = otlp_logs): The name of the Kafka topic from which to consume logs.
29+
- `encoding` (default = otlp_proto): The encoding for the Kafka topic. See below for supported encodings.
30+
- `metrics`
31+
- `topic` (default = otlp_metrics): The name of the Kafka topic from which to consume metrics.
32+
- `encoding` (default = otlp_proto): The encoding for the Kafka topic. See below for supported encodings.
33+
- `traces`
34+
- `topic` (default = otlp_spans): The name of the Kafka topic from which to consume traces.
35+
- `encoding` (default = otlp_proto): The encoding for the Kafka topic. See below for supported encodings.
36+
- `topic` (Deprecated [v0.123.0]: use `logs::topic`, `traces::topic`, or `metrics::topic`).
37+
If this is set, it will take precedence over the default value for those fields.
38+
- `encoding` (Deprecated [v0.123.0]: use `logs::encoding`, `traces::encoding`, or `metrics::encoding`).
39+
If this is set, it will take precedence over the default value for those fields.
4340
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
4441
- `client_id` (default = otel-collector): The consumer client ID that receiver will use
4542
- `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`.
@@ -104,14 +101,40 @@ The following settings can be optionally configured:
104101
- `randomization_factor`: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)
105102
- `max_elapsed_time`: The maximum amount of time trying to backoff before giving up. If set to 0, the retries are never stopped.
106103

107-
Example:
104+
### Supported encodings
105+
106+
The Kafka receiver supports encoding extensions, as well as the following built-in encodings:
107+
108+
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively.
109+
- `otlp_json`: the payload is deserialized to `ExportTraceServiceRequest` `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively using JSON encoding.
110+
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
111+
- `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`.
112+
- `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans.
113+
- `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans.
114+
- `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans.
115+
- `raw`: (logs only) the payload's bytes are inserted as the body of a log record.
116+
- `text`: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use `text_<ENCODING>`, like `text_utf-8`, `text_shift_jis`, etc., to customize this behavior.
117+
- `json`: (logs only) the payload is decoded as JSON and inserted as the body of a log record.
118+
- `azure_resource_logs`: (logs only) the payload is converted from Azure Resource Logs format to OTel format.
119+
120+
### Example configurations
121+
122+
#### Minimal configuration
123+
124+
By default, the receiver does not require any configuration. With the following configuration,
125+
the receiver will consume messages from the default topics from localhost:9092 using the
126+
`otlp_proto` encoding:
127+
108128

109129
```yaml
110130
receivers:
111131
kafka:
112-
protocol_version: 2.0.0
113132
```
114-
Example of connecting to kafka using sasl and TLS:
133+
134+
#### TLS and authentication
135+
136+
In this example the receiver is configured to connect to Kafka using TLS for encryption,
137+
and SASL/SCRAM for authentication:
115138
116139
```yaml
117140
receivers:
@@ -124,39 +147,30 @@ receivers:
124147
tls:
125148
insecure: false
126149
```
127-
Example of header extraction:
150+
151+
#### Header extraction
152+
153+
By default the receiver will ignore Kafka message headers. It is possible to extract
154+
specific headers and attach them as resource attributes to decoded data.
128155
129156
```yaml
130157
receivers:
131158
kafka:
132-
topic: test
133159
header_extraction:
134160
extract_headers: true
135161
headers: ["header1", "header2"]
136162
```
137163
138-
- If we feed following kafka record to `test` topic and use above configs:
139-
```yaml
140-
{
141-
event: Hello,
142-
headers: {
143-
header1: value1,
144-
header2: value2,
145-
}
146-
}
164+
If we produce a Kafka message with headers "header1: value1" and "header2: value2"
165+
with the above configuration, the receiver will attach these headers as resource
166+
attributes with the prefix "kafka.header.", i.e.
167+
147168
```
148-
we will get a log record in collector similar to:
149-
```yaml
150-
{
151-
...
152-
body: Hello,
153-
resource: {
154-
kafka.header.header1: value1,
155-
kafka.header.header2: value2,
156-
},
157-
...
169+
"resource": {
170+
"attributes": {
171+
"kafka.header.header1": "value1",
172+
"kafka.header.header2": "value2",
173+
}
158174
}
175+
...
159176
```
160-
161-
- Here you can see the kafka record header `header1` and `header2` being added to resource attribute.
162-
- Every **matching** kafka header key is prefixed with `kafka.header` string and attached to resource attributes.

receiver/kafkareceiver/config.go

+81-5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect
66
import (
77
"go.opentelemetry.io/collector/component"
88
"go.opentelemetry.io/collector/config/configretry"
9+
"go.opentelemetry.io/collector/confmap"
910

1011
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
1112
)
@@ -17,22 +18,97 @@ type Config struct {
1718
configkafka.ClientConfig `mapstructure:",squash"`
1819
configkafka.ConsumerConfig `mapstructure:",squash"`
1920

20-
// The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs)
21+
// Logs holds configuration about how logs should be consumed.
22+
Logs TopicEncodingConfig `mapstructure:"logs"`
23+
24+
// Metrics holds configuration about how metrics should be consumed.
25+
Metrics TopicEncodingConfig `mapstructure:"metrics"`
26+
27+
// Traces holds configuration about how traces should be consumed.
28+
Traces TopicEncodingConfig `mapstructure:"traces"`
29+
30+
// Topic holds the name of the Kafka topic from which to consume data.
31+
//
32+
// Topic has no default. If explicitly specified, it will take precedence
33+
// over the default values of Logs.Topic, Traces.Topic, and Metrics.Topic.
34+
//
35+
// Deprecated [v0.123.0]: Use Logs.Topic, Traces.Topic, and Metrics.Topic.
2136
Topic string `mapstructure:"topic"`
2237

23-
// Encoding of the messages (default "otlp_proto")
38+
// Encoding holds the expected encoding of messages (default "otlp_proto")
39+
//
40+
// Encoding has no default. If explicitly specified, it will take precedence
41+
// over the default values of Logs.Encoding, Traces.Encoding, and
42+
// Metrics.Encoding.
43+
//
44+
// Deprecated [v0.123.0]: Use Logs.Encoding, Traces.Encoding, and
45+
// Metrics.Encoding.
2446
Encoding string `mapstructure:"encoding"`
2547

26-
// Controls the way the messages are marked as consumed
48+
// MessageMarking controls the way the messages are marked as consumed.
2749
MessageMarking MessageMarking `mapstructure:"message_marking"`
2850

29-
// Extract headers from kafka records
51+
// HeaderExtraction controls extraction of headers from Kafka records.
3052
HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"`
3153

32-
// In case of some errors returned by the next consumer, the receiver will wait and retry the failed message
54+
// ErrorBackoff controls backoff/retry behavior when the next consumer
55+
// returns an error.
3356
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
3457
}
3558

59+
func (c *Config) Unmarshal(conf *confmap.Conf) error {
60+
if err := conf.Unmarshal(c); err != nil {
61+
return err
62+
}
63+
// Check if deprecated fields have been explicitly set,
64+
// in which case they should be used instead of signal-
65+
// specific defaults.
66+
var zeroConfig Config
67+
if err := conf.Unmarshal(&zeroConfig); err != nil {
68+
return err
69+
}
70+
if c.Topic != "" {
71+
if zeroConfig.Logs.Topic == "" {
72+
c.Logs.Topic = c.Topic
73+
}
74+
if zeroConfig.Metrics.Topic == "" {
75+
c.Metrics.Topic = c.Topic
76+
}
77+
if zeroConfig.Traces.Topic == "" {
78+
c.Traces.Topic = c.Topic
79+
}
80+
}
81+
if c.Encoding != "" {
82+
if zeroConfig.Logs.Encoding == "" {
83+
c.Logs.Encoding = c.Encoding
84+
}
85+
if zeroConfig.Metrics.Encoding == "" {
86+
c.Metrics.Encoding = c.Encoding
87+
}
88+
if zeroConfig.Traces.Encoding == "" {
89+
c.Traces.Encoding = c.Encoding
90+
}
91+
}
92+
return conf.Unmarshal(c)
93+
}
94+
95+
// TopicEncodingConfig holds signal-specific topic and encoding configuration.
96+
type TopicEncodingConfig struct {
97+
// Topic holds the name of the Kafka topic from which messages of the
98+
// signal type should be consumed.
99+
//
100+
// The default depends on the signal type:
101+
// - "otlp_spans" for traces
102+
// - "otlp_metrics" for metrics
103+
// - "otlp_logs" for logs
104+
Topic string `mapstructure:"topic"`
105+
106+
// Encoding holds the expected encoding of messages for the signal type
107+
//
108+
// Defaults to "otlp_proto".
109+
Encoding string `mapstructure:"encoding"`
110+
}
111+
36112
type MessageMarking struct {
37113
// If true, the messages are marked after the pipeline execution
38114
After bool `mapstructure:"after"`

receiver/kafkareceiver/config_test.go

+71-4
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,65 @@ func TestLoadConfig(t *testing.T) {
4646
config.GroupID = "the_group_id"
4747
return config
4848
}(),
49-
Topic: "spans",
50-
Encoding: "otlp_proto",
49+
Logs: TopicEncodingConfig{
50+
Topic: "spans",
51+
Encoding: "otlp_proto",
52+
},
53+
Metrics: TopicEncodingConfig{
54+
Topic: "spans",
55+
Encoding: "otlp_proto",
56+
},
57+
Traces: TopicEncodingConfig{
58+
Topic: "spans",
59+
Encoding: "otlp_proto",
60+
},
61+
Topic: "spans",
62+
ErrorBackOff: configretry.BackOffConfig{
63+
Enabled: false,
64+
},
65+
},
66+
},
67+
{
68+
id: component.NewIDWithName(metadata.Type, "legacy_topic"),
69+
expected: &Config{
70+
ClientConfig: configkafka.NewDefaultClientConfig(),
71+
ConsumerConfig: configkafka.NewDefaultConsumerConfig(),
72+
Logs: TopicEncodingConfig{
73+
Topic: "legacy_topic",
74+
Encoding: "otlp_proto",
75+
},
76+
Metrics: TopicEncodingConfig{
77+
Topic: "metrics_topic",
78+
Encoding: "otlp_proto",
79+
},
80+
Traces: TopicEncodingConfig{
81+
Topic: "legacy_topic",
82+
Encoding: "otlp_proto",
83+
},
84+
Topic: "legacy_topic",
85+
ErrorBackOff: configretry.BackOffConfig{
86+
Enabled: false,
87+
},
88+
},
89+
},
90+
{
91+
id: component.NewIDWithName(metadata.Type, "legacy_encoding"),
92+
expected: &Config{
93+
ClientConfig: configkafka.NewDefaultClientConfig(),
94+
ConsumerConfig: configkafka.NewDefaultConsumerConfig(),
95+
Logs: TopicEncodingConfig{
96+
Topic: "otlp_logs",
97+
Encoding: "legacy_encoding",
98+
},
99+
Metrics: TopicEncodingConfig{
100+
Topic: "otlp_metrics",
101+
Encoding: "metrics_encoding",
102+
},
103+
Traces: TopicEncodingConfig{
104+
Topic: "otlp_spans",
105+
Encoding: "legacy_encoding",
106+
},
107+
Encoding: "legacy_encoding",
51108
ErrorBackOff: configretry.BackOffConfig{
52109
Enabled: false,
53110
},
@@ -77,8 +134,18 @@ func TestLoadConfig(t *testing.T) {
77134
config.HeartbeatInterval = 15 * time.Second
78135
return config
79136
}(),
80-
Topic: "logs",
81-
Encoding: "direct",
137+
Logs: TopicEncodingConfig{
138+
Topic: "logs",
139+
Encoding: "direct",
140+
},
141+
Metrics: TopicEncodingConfig{
142+
Topic: "otlp_metrics",
143+
Encoding: "otlp_proto",
144+
},
145+
Traces: TopicEncodingConfig{
146+
Topic: "otlp_spans",
147+
Encoding: "otlp_proto",
148+
},
82149
ErrorBackOff: configretry.BackOffConfig{
83150
Enabled: true,
84151
InitialInterval: 1 * time.Second,

0 commit comments

Comments
 (0)