Skip to content

Commit f0c9700

Browse files
committed
[chore] exporter/kafkaexporter: tidy up
- Move marshaler implementations to internal package - Simplify code to obtain a marshaler given an encoding name - Make marshalers independent of Sarama (Kafka client) - Extract partitioning logic out of marshalers, with the exception of Jaeger marshalers - Implement a generic Kafka exporter with type parameter for signal-specific parts - Create more tightly scoped unit tests for encodings, independent of the general functionality of the exporter
1 parent 49ccc5d commit f0c9700

17 files changed

+1041
-1502
lines changed

exporter/kafkaexporter/config.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,18 @@ type Config struct {
3131
Encoding string `mapstructure:"encoding"`
3232

3333
// PartitionTracesByID sets the message key of outgoing trace messages to the trace ID.
34-
// Please note: does not have any effect on Jaeger encoding exporters since Jaeger exporters include
35-
// trace ID as the message key by default.
34+
//
35+
// NOTE: this does not have any effect for Jaeger encodings. Jaeger encodings always use
36+
// use the trace ID for the message key.
3637
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`
3738

39+
// PartitionMetricsByResourceAttributes controls the partitioning of metrics messages by
40+
// resource. If this is true, then the message key will be set to a hash of the resource's
41+
// identifying attributes.
3842
PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`
3943

44+
// PartitionLogsByResourceAttributes controls the partitioning of logs messages by resource.
45+
// If this is true, then the message key will be set to a hash of the resource's identifying
46+
// attributes.
4047
PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`
4148
}

exporter/kafkaexporter/factory.go

+12-9
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,16 @@ func createTracesExporter(
7070
ctx,
7171
set,
7272
&oCfg,
73-
exp.tracesPusher,
73+
exp.exportData,
7474
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
7575
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
7676
// and will rely on the sarama Producer Timeout logic.
7777
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
7878
exporterhelper.WithRetry(oCfg.BackOffConfig),
7979
exporterhelper.WithQueue(oCfg.QueueSettings),
80-
exporterhelper.WithStart(exp.start),
81-
exporterhelper.WithShutdown(exp.Close))
80+
exporterhelper.WithStart(exp.Start),
81+
exporterhelper.WithShutdown(exp.Close),
82+
)
8283
}
8384

8485
func createMetricsExporter(
@@ -98,15 +99,16 @@ func createMetricsExporter(
9899
ctx,
99100
set,
100101
&oCfg,
101-
exp.metricsDataPusher,
102+
exp.exportData,
102103
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
103104
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
104105
// and will rely on the sarama Producer Timeout logic.
105106
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
106107
exporterhelper.WithRetry(oCfg.BackOffConfig),
107108
exporterhelper.WithQueue(oCfg.QueueSettings),
108-
exporterhelper.WithStart(exp.start),
109-
exporterhelper.WithShutdown(exp.Close))
109+
exporterhelper.WithStart(exp.Start),
110+
exporterhelper.WithShutdown(exp.Close),
111+
)
110112
}
111113

112114
func createLogsExporter(
@@ -126,13 +128,14 @@ func createLogsExporter(
126128
ctx,
127129
set,
128130
&oCfg,
129-
exp.logsDataPusher,
131+
exp.exportData,
130132
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
131133
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
132134
// and will rely on the sarama Producer Timeout logic.
133135
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
134136
exporterhelper.WithRetry(oCfg.BackOffConfig),
135137
exporterhelper.WithQueue(oCfg.QueueSettings),
136-
exporterhelper.WithStart(exp.start),
137-
exporterhelper.WithShutdown(exp.Close))
138+
exporterhelper.WithStart(exp.Start),
139+
exporterhelper.WithShutdown(exp.Close),
140+
)
138141
}

exporter/kafkaexporter/factory_test.go

+14-22
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,9 @@ func TestCreateMetricExporter(t *testing.T) {
3838
t.Parallel()
3939

4040
tests := []struct {
41-
name string
42-
conf *Config
43-
marshalers []MetricsMarshaler
44-
err *net.DNSError
41+
name string
42+
conf *Config
43+
err *net.DNSError
4544
}{
4645
{
4746
name: "valid config (no validating broker)",
@@ -69,8 +68,7 @@ func TestCreateMetricExporter(t *testing.T) {
6968
conf.Metadata.Full = false
7069
conf.Encoding = defaultEncoding
7170
}),
72-
marshalers: nil,
73-
err: nil,
71+
err: nil,
7472
},
7573
}
7674

@@ -102,10 +100,9 @@ func TestCreateLogExporter(t *testing.T) {
102100
t.Parallel()
103101

104102
tests := []struct {
105-
name string
106-
conf *Config
107-
marshalers []LogsMarshaler
108-
err *net.DNSError
103+
name string
104+
conf *Config
105+
err *net.DNSError
109106
}{
110107
{
111108
name: "valid config (no validating broker)",
@@ -133,8 +130,7 @@ func TestCreateLogExporter(t *testing.T) {
133130
conf.Metadata.Full = false
134131
conf.Encoding = defaultEncoding
135132
}),
136-
marshalers: nil,
137-
err: nil,
133+
err: nil,
138134
},
139135
}
140136

@@ -166,10 +162,9 @@ func TestCreateTraceExporter(t *testing.T) {
166162
t.Parallel()
167163

168164
tests := []struct {
169-
name string
170-
conf *Config
171-
marshalers []TracesMarshaler
172-
err *net.DNSError
165+
name string
166+
conf *Config
167+
err *net.DNSError
173168
}{
174169
{
175170
name: "valid config (no validating brokers)",
@@ -178,17 +173,15 @@ func TestCreateTraceExporter(t *testing.T) {
178173
conf.Brokers = []string{"invalid:9092"}
179174
conf.ProtocolVersion = "2.0.0"
180175
}),
181-
marshalers: nil,
182-
err: nil,
176+
err: nil,
183177
},
184178
{
185179
name: "invalid config (validating brokers)",
186180
conf: applyConfigOption(func(conf *Config) {
187181
conf.Brokers = []string{"invalid:9092"}
188182
conf.ProtocolVersion = "2.0.0"
189183
}),
190-
marshalers: nil,
191-
err: &net.DNSError{},
184+
err: &net.DNSError{},
192185
},
193186
{
194187
name: "default_encoding",
@@ -197,8 +190,7 @@ func TestCreateTraceExporter(t *testing.T) {
197190
conf.Metadata.Full = false
198191
conf.Encoding = defaultEncoding
199192
}),
200-
marshalers: nil,
201-
err: nil,
193+
err: nil,
202194
},
203195
}
204196

exporter/kafkaexporter/go.mod

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ require (
1010
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.122.0
1111
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.122.0
1212
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.122.0
13+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.122.0
1314
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.122.0
1415
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.122.0
1516
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.122.0
16-
github.com/openzipkin/zipkin-go v0.4.3
1717
github.com/stretchr/testify v1.10.0
1818
go.opentelemetry.io/collector/component v1.28.2-0.20250319144947-41a9ea7f7402
1919
go.opentelemetry.io/collector/component/componenttest v0.122.2-0.20250319144947-41a9ea7f7402
@@ -26,7 +26,6 @@ require (
2626
go.opentelemetry.io/collector/exporter/exportertest v0.122.2-0.20250319144947-41a9ea7f7402
2727
go.opentelemetry.io/collector/pdata v1.28.2-0.20250319144947-41a9ea7f7402
2828
go.opentelemetry.io/collector/pdata/testdata v0.122.2-0.20250319144947-41a9ea7f7402
29-
go.opentelemetry.io/collector/semconv v0.122.2-0.20250319144947-41a9ea7f7402
3029
go.uber.org/goleak v1.3.0
3130
go.uber.org/multierr v1.11.0
3231
go.uber.org/zap v1.27.0
@@ -79,6 +78,7 @@ require (
7978
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
8079
github.com/modern-go/reflect2 v1.0.2 // indirect
8180
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/core/xidutils v0.122.0 // indirect
81+
github.com/openzipkin/zipkin-go v0.4.3 // indirect
8282
github.com/pierrec/lz4/v4 v4.1.22 // indirect
8383
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
8484
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
@@ -99,6 +99,7 @@ require (
9999
go.opentelemetry.io/collector/receiver v1.28.2-0.20250319144947-41a9ea7f7402 // indirect
100100
go.opentelemetry.io/collector/receiver/receivertest v0.122.2-0.20250319144947-41a9ea7f7402 // indirect
101101
go.opentelemetry.io/collector/receiver/xreceiver v0.122.2-0.20250319144947-41a9ea7f7402 // indirect
102+
go.opentelemetry.io/collector/semconv v0.122.2-0.20250319144947-41a9ea7f7402 // indirect
102103
go.opentelemetry.io/otel v1.35.0 // indirect
103104
go.opentelemetry.io/otel/metric v1.35.0 // indirect
104105
go.opentelemetry.io/otel/sdk v1.35.0 // indirect
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package marshaler // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/marshaler"
5+
6+
import (
7+
"bytes"
8+
9+
"github.com/gogo/protobuf/jsonpb"
10+
jaegerproto "github.com/jaegertracing/jaeger-idl/model/v1"
11+
"go.opentelemetry.io/collector/pdata/ptrace"
12+
"go.uber.org/multierr"
13+
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
15+
)
16+
17+
var (
18+
_ TracesMarshaler = JaegerProtoSpanMarshaler{}
19+
_ TracesMarshaler = JaegerJSONSpanMarshaler{}
20+
)
21+
22+
type JaegerProtoSpanMarshaler struct{}
23+
24+
type JaegerJSONSpanMarshaler struct{}
25+
26+
func (JaegerProtoSpanMarshaler) MarshalTraces(traces ptrace.Traces) ([]Message, error) {
27+
return marshalJaeger(traces, marshalJaegerSpanProto)
28+
}
29+
30+
func (JaegerJSONSpanMarshaler) MarshalTraces(traces ptrace.Traces) ([]Message, error) {
31+
return marshalJaeger(traces, marshalJaegerSpanJSON)
32+
}
33+
34+
func marshalJaeger(traces ptrace.Traces, marshal marshalJaegerSpanFunc) ([]Message, error) {
35+
batches := jaeger.ProtoFromTraces(traces)
36+
var messages []Message
37+
38+
var errs error
39+
for _, batch := range batches {
40+
for _, span := range batch.Spans {
41+
span.Process = batch.Process
42+
bts, err := marshal(span)
43+
// continue to process spans that can be serialized
44+
if err != nil {
45+
errs = multierr.Append(errs, err)
46+
continue
47+
}
48+
key := []byte(span.TraceID.String())
49+
messages = append(messages, Message{Key: key, Value: bts})
50+
}
51+
}
52+
return messages, errs
53+
}
54+
55+
type marshalJaegerSpanFunc func(*jaegerproto.Span) ([]byte, error)
56+
57+
func marshalJaegerSpanProto(span *jaegerproto.Span) ([]byte, error) {
58+
return span.Marshal()
59+
}
60+
61+
func marshalJaegerSpanJSON(span *jaegerproto.Span) ([]byte, error) {
62+
var m jsonpb.Marshaler
63+
out := new(bytes.Buffer)
64+
err := m.Marshal(out, span)
65+
return out.Bytes(), err
66+
}

exporter/kafkaexporter/jaeger_marshaler_test.go renamed to exporter/kafkaexporter/internal/marshaler/jaeger_marshaler_test.go

+9-19
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package kafkaexporter
4+
package marshaler
55

66
import (
77
"bytes"
88
"testing"
99

10-
"github.com/IBM/sarama"
1110
"github.com/gogo/protobuf/jsonpb"
1211
"github.com/stretchr/testify/assert"
1312
"github.com/stretchr/testify/require"
@@ -38,33 +37,24 @@ func TestJaegerMarshaler(t *testing.T) {
3837
require.NoError(t, jsonMarshaler.Marshal(jsonByteBuffer, batches[0].Spans[0]))
3938

4039
tests := []struct {
41-
unmarshaler TracesMarshaler
42-
encoding string
43-
messages []*sarama.ProducerMessage
40+
marshaler TracesMarshaler
41+
encoding string
42+
messages []Message
4443
}{
4544
{
46-
unmarshaler: jaegerMarshaler{
47-
marshaler: jaegerProtoSpanMarshaler{},
48-
},
49-
encoding: "jaeger_proto",
50-
messages: []*sarama.ProducerMessage{{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes), Key: sarama.ByteEncoder(messageKey)}},
45+
marshaler: JaegerProtoSpanMarshaler{},
46+
messages: []Message{{Value: jaegerProtoBytes, Key: messageKey}},
5147
},
5248
{
53-
unmarshaler: jaegerMarshaler{
54-
marshaler: jaegerJSONSpanMarshaler{
55-
pbMarshaler: &jsonpb.Marshaler{},
56-
},
57-
},
58-
encoding: "jaeger_json",
59-
messages: []*sarama.ProducerMessage{{Topic: "topic", Value: sarama.ByteEncoder(jsonByteBuffer.Bytes()), Key: sarama.ByteEncoder(messageKey)}},
49+
marshaler: JaegerJSONSpanMarshaler{},
50+
messages: []Message{{Value: jsonByteBuffer.Bytes(), Key: messageKey}},
6051
},
6152
}
6253
for _, test := range tests {
6354
t.Run(test.encoding, func(t *testing.T) {
64-
messages, err := test.unmarshaler.Marshal(td, "topic")
55+
messages, err := test.marshaler.MarshalTraces(td)
6556
require.NoError(t, err)
6657
assert.Equal(t, test.messages, messages)
67-
assert.Equal(t, test.encoding, test.unmarshaler.Encoding())
6858
})
6959
}
7060
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package marshaler // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/marshaler"
5+
6+
import (
7+
"go.opentelemetry.io/collector/pdata/plog"
8+
"go.opentelemetry.io/collector/pdata/pmetric"
9+
"go.opentelemetry.io/collector/pdata/ptrace"
10+
)
11+
12+
// Message represents a Kafka message.
13+
//
14+
// Note that the topic and message headers are set by the Kafka exporter
15+
// code, and not by the marshaler.
16+
type Message struct {
17+
// Key is an optional message key.
18+
//
19+
// Marshalers may set this, but it is generally expected that the
20+
// Kafka producer will set this based partition_* configuration.
21+
Key []byte
22+
23+
// Value is the message payload.
24+
Value []byte
25+
}
26+
27+
// TracesMarshaler marshals a ptrace.Traces into one or more Messages.
28+
type TracesMarshaler interface {
29+
MarshalTraces(traces ptrace.Traces) ([]Message, error)
30+
}
31+
32+
// MetricsMarshaler marshals a pmetric.Metrics into one or more Messages.
33+
type MetricsMarshaler interface {
34+
MarshalMetrics(metrics pmetric.Metrics) ([]Message, error)
35+
}
36+
37+
// LogsMarshaler marshals a plog.Logs into one or more Messages.
38+
type LogsMarshaler interface {
39+
MarshalLogs(logs plog.Logs) ([]Message, error)
40+
}

0 commit comments

Comments
 (0)