Skip to content

Commit 9317964

Browse files
committed
kafkareceiver: refactor to make consumer generic
Remove code that is repeated for every signal, and implement a generic consumer that calls out to a signal-specific message handler. We also update all the tests to use kfake rather than mocks so we're exercising all of the receiver. These tests are parallelised, so all tests run faster than before, and with greater coverage. Finally, we add `max_fetch_wait` (defaults to 250ms) to the receiver's configuration. This was added for speeding up the tests, but may also be useful for users of the receiver.
1 parent 3e50dcd commit 9317964

16 files changed

+845
-2005
lines changed

.chloggen/kafkareceiver-generic.yaml

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add `max_fetch_wait` config setting
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39360]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
This setting allows you to specify the maximum time that the broker will wait for
20+
min_fetch_size bytes of data to be available before sending a response to the client.
21+
Defaults to 250ms.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [user]

internal/kafka/client.go

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func NewSaramaConsumerGroup(
5959
saramaConfig.Consumer.Fetch.Min = consumerConfig.MinFetchSize
6060
saramaConfig.Consumer.Fetch.Default = consumerConfig.DefaultFetchSize
6161
saramaConfig.Consumer.Fetch.Max = consumerConfig.MaxFetchSize
62+
saramaConfig.Consumer.MaxWaitTime = consumerConfig.MaxFetchWait
6263
saramaConfig.Consumer.Offsets.AutoCommit.Enable = consumerConfig.AutoCommit.Enable
6364
saramaConfig.Consumer.Offsets.AutoCommit.Interval = consumerConfig.AutoCommit.Interval
6465
saramaConfig.Consumer.Offsets.Initial = saramaInitialOffsets[consumerConfig.InitialOffset]

internal/kafka/configkafka/config.go

+5
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ type ConsumerConfig struct {
9898

9999
// The maximum bytes per fetch from Kafka (default "0", no limit)
100100
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
101+
102+
// The maximum amount of time to wait for MinFetchSize bytes to be
103+
// available before the broker returns a response (default 250ms)
104+
MaxFetchWait time.Duration `mapstructure:"max_fetch_wait"`
101105
}
102106

103107
func NewDefaultConsumerConfig() ConsumerConfig {
@@ -112,6 +116,7 @@ func NewDefaultConsumerConfig() ConsumerConfig {
112116
},
113117
MinFetchSize: 1,
114118
MaxFetchSize: 0,
119+
MaxFetchWait: 250 * time.Millisecond,
115120
DefaultFetchSize: 1048576,
116121
}
117122
}

internal/kafka/configkafka/config_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ func TestConsumerConfig(t *testing.T) {
141141
MinFetchSize: 10,
142142
DefaultFetchSize: 1024,
143143
MaxFetchSize: 4096,
144+
MaxFetchWait: time.Second,
144145
},
145146
},
146147

internal/kafka/configkafka/testdata/consumer_config.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ kafka/full:
1010
min_fetch_size: 10
1111
default_fetch_size: 1024
1212
max_fetch_size: 4096
13+
max_fetch_wait: 1s
1314

1415
# Invalid configurations
1516
kafka/invalid_initial_offset:

internal/kafka/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/aws/aws-sdk-go-v2/credentials v1.17.66
1010
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
1111
github.com/stretchr/testify v1.10.0
12+
github.com/twmb/franz-go/pkg/kadm v1.15.0
1213
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250320172111-35ab5e5f5327
1314
github.com/xdg-go/scram v1.1.2
1415
go.opentelemetry.io/collector/component v1.29.1-0.20250411074447-4fb7c24ebecc

internal/kafka/go.sum

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/kafka/kafkatest/cluster.go

+2
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,7 @@ func NewCluster(tb testing.TB, opts ...kfake.Opt) (*kfake.Cluster, configkafka.C
2121

2222
cfg := configkafka.NewDefaultClientConfig()
2323
cfg.Brokers = cluster.ListenAddrs()
24+
// We need to set the protocol version to 2.3.0 to make Sarama happy.
25+
cfg.ProtocolVersion = "2.3.0"
2426
return cluster, cfg
2527
}

receiver/kafkareceiver/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ The following settings can be optionally configured:
4747
- `min_fetch_size` (default = `1`): The minimum number of message bytes to fetch in a request, defaults to 1 byte.
4848
- `default_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request, defaults to 1MB.
4949
- `max_fetch_size` (default = `0`): The maximum number of message bytes to fetch in a request, defaults to unlimited.
50+
- `max_fetch_wait` (default = `250ms`): The maximum amount of time the broker should wait for `min_fetch_size` bytes to be available before returning anyway.
5051
- `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options.
5152
- `auth`
5253
- `plain_text` (Deprecated in v0.123.0: use sasl with mechanism set to PLAIN instead.)

receiver/kafkareceiver/factory_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestWithTracesUnmarshalers(t *testing.T) {
4141
cfg := createDefaultConfig()
4242
cfg.Traces.Encoding = "custom"
4343
receiver, err := f.CreateTraces(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
44-
tracesConsumer, ok := receiver.(*kafkaTracesConsumer)
44+
tracesConsumer, ok := receiver.(*kafkaConsumer)
4545
require.True(t, ok)
4646
require.Equal(t, "custom", tracesConsumer.config.Traces.Encoding)
4747
require.NoError(t, err)
@@ -50,7 +50,7 @@ func TestWithTracesUnmarshalers(t *testing.T) {
5050
t.Run("default_encoding", func(t *testing.T) {
5151
cfg := createDefaultConfig()
5252
receiver, err := f.CreateTraces(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
53-
tracesConsumer, ok := receiver.(*kafkaTracesConsumer)
53+
tracesConsumer, ok := receiver.(*kafkaConsumer)
5454
require.True(t, ok)
5555
require.Equal(t, defaultTracesEncoding, tracesConsumer.config.Traces.Encoding)
5656
require.NoError(t, err)
@@ -75,7 +75,7 @@ func TestWithMetricsUnmarshalers(t *testing.T) {
7575
cfg := createDefaultConfig()
7676
cfg.Metrics.Encoding = "custom"
7777
receiver, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
78-
metricsConsumer, ok := receiver.(*kafkaMetricsConsumer)
78+
metricsConsumer, ok := receiver.(*kafkaConsumer)
7979
require.True(t, ok)
8080
require.Equal(t, "custom", metricsConsumer.config.Metrics.Encoding)
8181
require.NoError(t, err)
@@ -84,7 +84,7 @@ func TestWithMetricsUnmarshalers(t *testing.T) {
8484
t.Run("default_encoding", func(t *testing.T) {
8585
cfg := createDefaultConfig()
8686
receiver, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
87-
metricsConsumer, ok := receiver.(*kafkaMetricsConsumer)
87+
metricsConsumer, ok := receiver.(*kafkaConsumer)
8888
require.True(t, ok)
8989
require.Equal(t, defaultMetricsEncoding, metricsConsumer.config.Metrics.Encoding)
9090
require.NoError(t, err)
@@ -109,7 +109,7 @@ func TestWithLogsUnmarshalers(t *testing.T) {
109109
cfg := createDefaultConfig()
110110
cfg.Logs.Encoding = "custom"
111111
receiver, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
112-
logsConsumer, ok := receiver.(*kafkaLogsConsumer)
112+
logsConsumer, ok := receiver.(*kafkaConsumer)
113113
require.True(t, ok)
114114
require.Equal(t, "custom", logsConsumer.config.Logs.Encoding)
115115
require.NoError(t, err)
@@ -118,7 +118,7 @@ func TestWithLogsUnmarshalers(t *testing.T) {
118118
t.Run("default_encoding", func(t *testing.T) {
119119
cfg := createDefaultConfig()
120120
receiver, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
121-
logsConsumer, ok := receiver.(*kafkaLogsConsumer)
121+
logsConsumer, ok := receiver.(*kafkaConsumer)
122122
require.True(t, ok)
123123
require.Equal(t, defaultLogsEncoding, logsConsumer.config.Logs.Encoding)
124124
require.NoError(t, err)

receiver/kafkareceiver/go.mod

+5-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ require (
1414
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.123.0
1515
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.123.0
1616
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.123.0
17+
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
1718
github.com/stretchr/testify v1.10.0
19+
github.com/twmb/franz-go v1.18.1
20+
github.com/twmb/franz-go/pkg/kadm v1.15.0
21+
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250320172111-35ab5e5f5327
1822
go.opentelemetry.io/collector/client v1.29.1-0.20250411074447-4fb7c24ebecc
1923
go.opentelemetry.io/collector/component v1.29.1-0.20250411074447-4fb7c24ebecc
2024
go.opentelemetry.io/collector/component/componenttest v0.123.1-0.20250411074447-4fb7c24ebecc
@@ -88,8 +92,8 @@ require (
8892
github.com/openzipkin/zipkin-go v0.4.3 // indirect
8993
github.com/pierrec/lz4/v4 v4.1.22 // indirect
9094
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
91-
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
9295
github.com/relvacode/iso8601 v1.6.0 // indirect
96+
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
9397
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
9498
github.com/xdg-go/scram v1.1.2 // indirect
9599
github.com/xdg-go/stringprep v1.0.4 // indirect

receiver/kafkareceiver/go.sum

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/kafkareceiver/header_extraction.go

-92
This file was deleted.

0 commit comments

Comments
 (0)