Skip to content

Commit ec50ae6

Browse files
axwFiery-Fenix
authored andcommitted
exporter/kafkaexporter: tidy up; support partitioning in all encodings (open-telemetry#39001)
#### Description - Move marshaler implementations to internal package - Simplify code to obtain a marshaler given an encoding name - Make marshalers independent of Sarama (Kafka client) - Implement a generic Kafka exporter with type parameter for signal-specific parts - Create more tightly scoped unit tests for encodings, independent of the general functionality of the exporter - Added tests for partitioning logic #### Link to tracking issue Fixes open-telemetry#38999 #### Testing Unit tests added. #### Documentation N/A
1 parent 6fa4ddc commit ec50ae6

18 files changed

+1095
-1505
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkaexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: enable partitioning for all encodings
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39001, 38999]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
With the exception of Jaeger encodings which have their own partitioning logic,
20+
partitioning is now independent of the encoding used. This means that all encodings
21+
now support partitioning.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [user]

exporter/kafkaexporter/config.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,18 @@ type Config struct {
3131
Encoding string `mapstructure:"encoding"`
3232

3333
// PartitionTracesByID sets the message key of outgoing trace messages to the trace ID.
34-
// Please note: does not have any effect on Jaeger encoding exporters since Jaeger exporters include
35-
// trace ID as the message key by default.
34+
//
35+
// NOTE: this does not have any effect for Jaeger encodings. Jaeger encodings always use
36+
// use the trace ID for the message key.
3637
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`
3738

39+
// PartitionMetricsByResourceAttributes controls the partitioning of metrics messages by
40+
// resource. If this is true, then the message key will be set to a hash of the resource's
41+
// identifying attributes.
3842
PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`
3943

44+
// PartitionLogsByResourceAttributes controls the partitioning of logs messages by resource.
45+
// If this is true, then the message key will be set to a hash of the resource's identifying
46+
// attributes.
4047
PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`
4148
}

exporter/kafkaexporter/factory.go

+12-9
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,16 @@ func createTracesExporter(
7070
ctx,
7171
set,
7272
&oCfg,
73-
exp.tracesPusher,
73+
exp.exportData,
7474
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
7575
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
7676
// and will rely on the sarama Producer Timeout logic.
7777
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
7878
exporterhelper.WithRetry(oCfg.BackOffConfig),
7979
exporterhelper.WithQueue(oCfg.QueueSettings),
80-
exporterhelper.WithStart(exp.start),
81-
exporterhelper.WithShutdown(exp.Close))
80+
exporterhelper.WithStart(exp.Start),
81+
exporterhelper.WithShutdown(exp.Close),
82+
)
8283
}
8384

8485
func createMetricsExporter(
@@ -98,15 +99,16 @@ func createMetricsExporter(
9899
ctx,
99100
set,
100101
&oCfg,
101-
exp.metricsDataPusher,
102+
exp.exportData,
102103
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
103104
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
104105
// and will rely on the sarama Producer Timeout logic.
105106
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
106107
exporterhelper.WithRetry(oCfg.BackOffConfig),
107108
exporterhelper.WithQueue(oCfg.QueueSettings),
108-
exporterhelper.WithStart(exp.start),
109-
exporterhelper.WithShutdown(exp.Close))
109+
exporterhelper.WithStart(exp.Start),
110+
exporterhelper.WithShutdown(exp.Close),
111+
)
110112
}
111113

112114
func createLogsExporter(
@@ -126,13 +128,14 @@ func createLogsExporter(
126128
ctx,
127129
set,
128130
&oCfg,
129-
exp.logsDataPusher,
131+
exp.exportData,
130132
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
131133
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
132134
// and will rely on the sarama Producer Timeout logic.
133135
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
134136
exporterhelper.WithRetry(oCfg.BackOffConfig),
135137
exporterhelper.WithQueue(oCfg.QueueSettings),
136-
exporterhelper.WithStart(exp.start),
137-
exporterhelper.WithShutdown(exp.Close))
138+
exporterhelper.WithStart(exp.Start),
139+
exporterhelper.WithShutdown(exp.Close),
140+
)
138141
}

exporter/kafkaexporter/factory_test.go

+14-22
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,9 @@ func TestCreateMetricExporter(t *testing.T) {
3838
t.Parallel()
3939

4040
tests := []struct {
41-
name string
42-
conf *Config
43-
marshalers []MetricsMarshaler
44-
err *net.DNSError
41+
name string
42+
conf *Config
43+
err *net.DNSError
4544
}{
4645
{
4746
name: "valid config (no validating broker)",
@@ -69,8 +68,7 @@ func TestCreateMetricExporter(t *testing.T) {
6968
conf.Metadata.Full = false
7069
conf.Encoding = defaultEncoding
7170
}),
72-
marshalers: nil,
73-
err: nil,
71+
err: nil,
7472
},
7573
}
7674

@@ -102,10 +100,9 @@ func TestCreateLogExporter(t *testing.T) {
102100
t.Parallel()
103101

104102
tests := []struct {
105-
name string
106-
conf *Config
107-
marshalers []LogsMarshaler
108-
err *net.DNSError
103+
name string
104+
conf *Config
105+
err *net.DNSError
109106
}{
110107
{
111108
name: "valid config (no validating broker)",
@@ -133,8 +130,7 @@ func TestCreateLogExporter(t *testing.T) {
133130
conf.Metadata.Full = false
134131
conf.Encoding = defaultEncoding
135132
}),
136-
marshalers: nil,
137-
err: nil,
133+
err: nil,
138134
},
139135
}
140136

@@ -166,10 +162,9 @@ func TestCreateTraceExporter(t *testing.T) {
166162
t.Parallel()
167163

168164
tests := []struct {
169-
name string
170-
conf *Config
171-
marshalers []TracesMarshaler
172-
err *net.DNSError
165+
name string
166+
conf *Config
167+
err *net.DNSError
173168
}{
174169
{
175170
name: "valid config (no validating brokers)",
@@ -178,17 +173,15 @@ func TestCreateTraceExporter(t *testing.T) {
178173
conf.Brokers = []string{"invalid:9092"}
179174
conf.ProtocolVersion = "2.0.0"
180175
}),
181-
marshalers: nil,
182-
err: nil,
176+
err: nil,
183177
},
184178
{
185179
name: "invalid config (validating brokers)",
186180
conf: applyConfigOption(func(conf *Config) {
187181
conf.Brokers = []string{"invalid:9092"}
188182
conf.ProtocolVersion = "2.0.0"
189183
}),
190-
marshalers: nil,
191-
err: &net.DNSError{},
184+
err: &net.DNSError{},
192185
},
193186
{
194187
name: "default_encoding",
@@ -197,8 +190,7 @@ func TestCreateTraceExporter(t *testing.T) {
197190
conf.Metadata.Full = false
198191
conf.Encoding = defaultEncoding
199192
}),
200-
marshalers: nil,
201-
err: nil,
193+
err: nil,
202194
},
203195
}
204196

exporter/kafkaexporter/go.mod

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ require (
1010
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.123.0
1111
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.123.0
1212
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.123.0
13+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.123.0
1314
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.123.0
1415
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.123.0
1516
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.123.0
16-
github.com/openzipkin/zipkin-go v0.4.3
1717
github.com/stretchr/testify v1.10.0
1818
go.opentelemetry.io/collector/component v1.29.1-0.20250402200755-cb5c3f4fb9dc
1919
go.opentelemetry.io/collector/component/componenttest v0.123.1-0.20250402200755-cb5c3f4fb9dc
@@ -26,7 +26,6 @@ require (
2626
go.opentelemetry.io/collector/exporter/exportertest v0.123.1-0.20250402200755-cb5c3f4fb9dc
2727
go.opentelemetry.io/collector/pdata v1.29.1-0.20250402200755-cb5c3f4fb9dc
2828
go.opentelemetry.io/collector/pdata/testdata v0.123.1-0.20250402200755-cb5c3f4fb9dc
29-
go.opentelemetry.io/collector/semconv v0.123.1-0.20250402200755-cb5c3f4fb9dc
3029
go.uber.org/goleak v1.3.0
3130
go.uber.org/multierr v1.11.0
3231
go.uber.org/zap v1.27.0
@@ -79,6 +78,7 @@ require (
7978
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
8079
github.com/modern-go/reflect2 v1.0.2 // indirect
8180
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/core/xidutils v0.123.0 // indirect
81+
github.com/openzipkin/zipkin-go v0.4.3 // indirect
8282
github.com/pierrec/lz4/v4 v4.1.22 // indirect
8383
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
8484
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
@@ -100,6 +100,7 @@ require (
100100
go.opentelemetry.io/collector/receiver v1.29.1-0.20250402200755-cb5c3f4fb9dc // indirect
101101
go.opentelemetry.io/collector/receiver/receivertest v0.123.1-0.20250402200755-cb5c3f4fb9dc // indirect
102102
go.opentelemetry.io/collector/receiver/xreceiver v0.123.1-0.20250402200755-cb5c3f4fb9dc // indirect
103+
go.opentelemetry.io/collector/semconv v0.123.1-0.20250402200755-cb5c3f4fb9dc // indirect
103104
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect
104105
go.opentelemetry.io/otel v1.35.0 // indirect
105106
go.opentelemetry.io/otel/log v0.11.0 // indirect
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package marshaler // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/marshaler"
5+
6+
import (
7+
"bytes"
8+
9+
"github.com/gogo/protobuf/jsonpb"
10+
jaegerproto "github.com/jaegertracing/jaeger-idl/model/v1"
11+
"go.opentelemetry.io/collector/pdata/ptrace"
12+
"go.uber.org/multierr"
13+
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
15+
)
16+
17+
var (
18+
_ TracesMarshaler = JaegerProtoSpanMarshaler{}
19+
_ TracesMarshaler = JaegerJSONSpanMarshaler{}
20+
)
21+
22+
type JaegerProtoSpanMarshaler struct{}
23+
24+
type JaegerJSONSpanMarshaler struct{}
25+
26+
func (JaegerProtoSpanMarshaler) MarshalTraces(traces ptrace.Traces) ([]Message, error) {
27+
return marshalJaeger(traces, marshalJaegerSpanProto)
28+
}
29+
30+
func (JaegerJSONSpanMarshaler) MarshalTraces(traces ptrace.Traces) ([]Message, error) {
31+
return marshalJaeger(traces, marshalJaegerSpanJSON)
32+
}
33+
34+
func marshalJaeger(traces ptrace.Traces, marshal marshalJaegerSpanFunc) ([]Message, error) {
35+
batches := jaeger.ProtoFromTraces(traces)
36+
var messages []Message
37+
38+
var errs error
39+
for _, batch := range batches {
40+
for _, span := range batch.Spans {
41+
span.Process = batch.Process
42+
bts, err := marshal(span)
43+
// continue to process spans that can be serialized
44+
if err != nil {
45+
errs = multierr.Append(errs, err)
46+
continue
47+
}
48+
key := []byte(span.TraceID.String())
49+
messages = append(messages, Message{Key: key, Value: bts})
50+
}
51+
}
52+
return messages, errs
53+
}
54+
55+
type marshalJaegerSpanFunc func(*jaegerproto.Span) ([]byte, error)
56+
57+
func marshalJaegerSpanProto(span *jaegerproto.Span) ([]byte, error) {
58+
return span.Marshal()
59+
}
60+
61+
func marshalJaegerSpanJSON(span *jaegerproto.Span) ([]byte, error) {
62+
var m jsonpb.Marshaler
63+
out := new(bytes.Buffer)
64+
err := m.Marshal(out, span)
65+
return out.Bytes(), err
66+
}

exporter/kafkaexporter/jaeger_marshaler_test.go renamed to exporter/kafkaexporter/internal/marshaler/jaeger_marshaler_test.go

+9-19
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package kafkaexporter
4+
package marshaler
55

66
import (
77
"bytes"
88
"testing"
99

10-
"github.com/IBM/sarama"
1110
"github.com/gogo/protobuf/jsonpb"
1211
"github.com/stretchr/testify/assert"
1312
"github.com/stretchr/testify/require"
@@ -38,33 +37,24 @@ func TestJaegerMarshaler(t *testing.T) {
3837
require.NoError(t, jsonMarshaler.Marshal(jsonByteBuffer, batches[0].Spans[0]))
3938

4039
tests := []struct {
41-
unmarshaler TracesMarshaler
42-
encoding string
43-
messages []*sarama.ProducerMessage
40+
marshaler TracesMarshaler
41+
encoding string
42+
messages []Message
4443
}{
4544
{
46-
unmarshaler: jaegerMarshaler{
47-
marshaler: jaegerProtoSpanMarshaler{},
48-
},
49-
encoding: "jaeger_proto",
50-
messages: []*sarama.ProducerMessage{{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes), Key: sarama.ByteEncoder(messageKey)}},
45+
marshaler: JaegerProtoSpanMarshaler{},
46+
messages: []Message{{Value: jaegerProtoBytes, Key: messageKey}},
5147
},
5248
{
53-
unmarshaler: jaegerMarshaler{
54-
marshaler: jaegerJSONSpanMarshaler{
55-
pbMarshaler: &jsonpb.Marshaler{},
56-
},
57-
},
58-
encoding: "jaeger_json",
59-
messages: []*sarama.ProducerMessage{{Topic: "topic", Value: sarama.ByteEncoder(jsonByteBuffer.Bytes()), Key: sarama.ByteEncoder(messageKey)}},
49+
marshaler: JaegerJSONSpanMarshaler{},
50+
messages: []Message{{Value: jsonByteBuffer.Bytes(), Key: messageKey}},
6051
},
6152
}
6253
for _, test := range tests {
6354
t.Run(test.encoding, func(t *testing.T) {
64-
messages, err := test.unmarshaler.Marshal(td, "topic")
55+
messages, err := test.marshaler.MarshalTraces(td)
6556
require.NoError(t, err)
6657
assert.Equal(t, test.messages, messages)
67-
assert.Equal(t, test.encoding, test.unmarshaler.Encoding())
6858
})
6959
}
7060
}

0 commit comments

Comments
 (0)