Skip to content

Commit a47a6ed

Browse files
marclopFiery-Fenix
authored andcommitted
kafkareceiver: propagate Kafka headers as metadata (open-telemetry#39227)
#### Description Similar to open-telemetry#39132, but for the Kafka receiver, updates the receiver to propagate Kafka headers as client.Info (metadata). Allowing downstream processors and exporters to access the values via the enriched context. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Closes open-telemetry#39129 <!--Describe what testing was performed and which tests were added.--> #### Testing Unit tests <!--Describe the documentation added.--> #### Documentation `Readme.md` to point out how it can be used with the kafka receiver. Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent ba7764e commit a47a6ed

File tree

6 files changed

+233
-21
lines changed

6 files changed

+233
-21
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Propagate Kafka headers as metadata
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39129]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Allwos the Kafka receiver to propagate Kafka headers as client.Info (metadata). Allowing downstream processors and exporters to access the values via the enriched context.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ Kafka receiver receives traces, metrics, and logs from Kafka. Message payload en
1717

1818
Note that metrics and logs only support OTLP.
1919

20+
If used in conjunction with the `kafkaexporter` configured with `include_metadata_keys`. The Kafka receiver will also propagate the Kafka headers to the downstream pipeline, giving access to the rest of the pipeline to arbitrary metadata keys and values.
21+
2022
## Getting Started
2123

2224
There are no required settings.

receiver/kafkareceiver/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.123.0
1616
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.123.0
1717
github.com/stretchr/testify v1.10.0
18+
go.opentelemetry.io/collector/client v1.29.1-0.20250402200755-cb5c3f4fb9dc
1819
go.opentelemetry.io/collector/component v1.29.1-0.20250402200755-cb5c3f4fb9dc
1920
go.opentelemetry.io/collector/component/componenttest v0.123.1-0.20250402200755-cb5c3f4fb9dc
2021
go.opentelemetry.io/collector/config/configretry v1.29.1-0.20250402200755-cb5c3f4fb9dc

receiver/kafkareceiver/go.sum

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/kafkareceiver/kafka_receiver.go

+31-6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/IBM/sarama"
1414
"github.com/cenkalti/backoff/v4"
15+
"go.opentelemetry.io/collector/client"
1516
"go.opentelemetry.io/collector/component"
1617
"go.opentelemetry.io/collector/config/configretry"
1718
"go.opentelemetry.io/collector/consumer"
@@ -514,7 +515,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
514515
session.MarkMessage(message, "")
515516
}
516517

517-
ctx := c.obsrecv.StartTracesOp(session.Context())
518+
// If the Kafka exporter has propagated headers in the message,
519+
// create a new context with client.Info in it.
520+
ctx := newContextWithHeaders(session.Context(), message.Headers)
521+
ctx = c.obsrecv.StartTracesOp(ctx)
518522
attrs := attribute.NewSet(
519523
attribute.String(attrInstanceName, c.id.String()),
520524
attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))),
@@ -535,7 +539,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
535539

536540
c.headerExtractor.extractHeadersTraces(traces, message)
537541
spanCount := traces.SpanCount()
538-
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
542+
err = c.nextConsumer.ConsumeTraces(ctx, traces)
539543
c.obsrecv.EndTracesOp(ctx, c.encoding, spanCount, err)
540544
if err != nil {
541545
if errorRequiresBackoff(err) && c.backOff != nil {
@@ -628,7 +632,10 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
628632
session.MarkMessage(message, "")
629633
}
630634

631-
ctx := c.obsrecv.StartMetricsOp(session.Context())
635+
// If the Kafka exporter has propagated headers in the message,
636+
// create a new context with client.Info in it.
637+
ctx := newContextWithHeaders(session.Context(), message.Headers)
638+
ctx = c.obsrecv.StartMetricsOp(ctx)
632639
attrs := attribute.NewSet(
633640
attribute.String(attrInstanceName, c.id.String()),
634641
attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))),
@@ -649,7 +656,7 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
649656
c.headerExtractor.extractHeadersMetrics(metrics, message)
650657

651658
dataPointCount := metrics.DataPointCount()
652-
err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics)
659+
err = c.nextConsumer.ConsumeMetrics(ctx, metrics)
653660
c.obsrecv.EndMetricsOp(ctx, c.encoding, dataPointCount, err)
654661
if err != nil {
655662
if errorRequiresBackoff(err) && c.backOff != nil {
@@ -742,7 +749,10 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
742749
session.MarkMessage(message, "")
743750
}
744751

745-
ctx := c.obsrecv.StartLogsOp(session.Context())
752+
// If the Kafka exporter has propagated headers in the message,
753+
// create a new context with client.Info in it.
754+
ctx := newContextWithHeaders(session.Context(), message.Headers)
755+
ctx = c.obsrecv.StartLogsOp(ctx)
746756
attrs := attribute.NewSet(
747757
attribute.String(attrInstanceName, c.id.String()),
748758
attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))),
@@ -762,7 +772,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
762772
}
763773
c.headerExtractor.extractHeadersLogs(logs, message)
764774
logRecordCount := logs.LogRecordCount()
765-
err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
775+
err = c.nextConsumer.ConsumeLogs(ctx, logs)
766776
c.obsrecv.EndLogsOp(ctx, c.encoding, logRecordCount, err)
767777
if err != nil {
768778
if errorRequiresBackoff(err) && c.backOff != nil {
@@ -840,3 +850,18 @@ func newExponentialBackOff(config configretry.BackOffConfig) *backoff.Exponentia
840850
func errorRequiresBackoff(err error) bool {
841851
return err.Error() == errMemoryLimiterDataRefused.Error()
842852
}
853+
854+
func newContextWithHeaders(ctx context.Context,
855+
headers []*sarama.RecordHeader,
856+
) context.Context {
857+
if len(headers) == 0 {
858+
return ctx
859+
}
860+
m := make(map[string][]string, len(headers))
861+
for _, header := range headers {
862+
key := string(header.Key)
863+
value := string(header.Value)
864+
m[key] = append(m[key], value)
865+
}
866+
return client.NewContext(ctx, client.Info{Metadata: client.NewMetadata(m)})
867+
}

receiver/kafkareceiver/kafka_receiver_test.go

+170-15
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@ import (
77
"context"
88
"errors"
99
"sync"
10+
"sync/atomic"
1011
"testing"
1112
"time"
1213

1314
"github.com/IBM/sarama"
1415
"github.com/cenkalti/backoff/v4"
1516
"github.com/stretchr/testify/assert"
1617
"github.com/stretchr/testify/require"
18+
"go.opentelemetry.io/collector/client"
1719
"go.opentelemetry.io/collector/component/componenttest"
20+
"go.opentelemetry.io/collector/consumer"
1821
"go.opentelemetry.io/collector/consumer/consumertest"
1922
"go.opentelemetry.io/collector/pdata/plog"
2023
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -98,11 +101,22 @@ func TestTracesConsumerGroupHandler(t *testing.T) {
98101

99102
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)})
100103
require.NoError(t, err)
104+
var called atomic.Bool
101105
c := tracesConsumerGroupHandler{
102-
unmarshaler: &ptrace.ProtoUnmarshaler{},
103-
logger: zap.NewNop(),
104-
ready: make(chan bool),
105-
nextConsumer: consumertest.NewNop(),
106+
unmarshaler: &ptrace.ProtoUnmarshaler{},
107+
logger: zap.NewNop(),
108+
ready: make(chan bool),
109+
nextConsumer: func() consumer.Traces {
110+
c, err := consumer.NewTraces(func(ctx context.Context, _ ptrace.Traces) error {
111+
defer called.Store(true)
112+
info := client.FromContext(ctx)
113+
assert.Equal(t, []string{"abcdefg"}, info.Metadata.Get("x-tenant-id"))
114+
assert.Equal(t, []string{"1234", "5678"}, info.Metadata.Get("x-request-ids"))
115+
return nil
116+
})
117+
require.NoError(t, err)
118+
return c
119+
}(),
106120
obsrecv: obsrecv,
107121
headerExtractor: &nopHeaderExtractor{},
108122
telemetryBuilder: telemetryBuilder,
@@ -128,9 +142,25 @@ func TestTracesConsumerGroupHandler(t *testing.T) {
128142
wg.Done()
129143
}()
130144

131-
groupClaim.messageChan <- &sarama.ConsumerMessage{}
145+
groupClaim.messageChan <- &sarama.ConsumerMessage{
146+
Headers: []*sarama.RecordHeader{
147+
{
148+
Key: []byte("x-tenant-id"),
149+
Value: []byte("abcdefg"),
150+
},
151+
{
152+
Key: []byte("x-request-ids"),
153+
Value: []byte("1234"),
154+
},
155+
{
156+
Key: []byte("x-request-ids"),
157+
Value: []byte("5678"),
158+
},
159+
},
160+
}
132161
close(groupClaim.messageChan)
133162
wg.Wait()
163+
assert.True(t, called.Load()) // Ensure nextConsumer was called.
134164
}
135165

136166
func TestTracesConsumerGroupHandler_session_done(t *testing.T) {
@@ -363,11 +393,22 @@ func TestMetricsConsumerGroupHandler(t *testing.T) {
363393

364394
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)})
365395
require.NoError(t, err)
396+
var called atomic.Bool
366397
c := metricsConsumerGroupHandler{
367-
unmarshaler: &pmetric.ProtoUnmarshaler{},
368-
logger: zap.NewNop(),
369-
ready: make(chan bool),
370-
nextConsumer: consumertest.NewNop(),
398+
unmarshaler: &pmetric.ProtoUnmarshaler{},
399+
logger: zap.NewNop(),
400+
ready: make(chan bool),
401+
nextConsumer: func() consumer.Metrics {
402+
c, err := consumer.NewMetrics(func(ctx context.Context, _ pmetric.Metrics) error {
403+
defer called.Store(true)
404+
info := client.FromContext(ctx)
405+
assert.Equal(t, []string{"abcdefg"}, info.Metadata.Get("x-tenant-id"))
406+
assert.Equal(t, []string{"1234", "5678"}, info.Metadata.Get("x-request-ids"))
407+
return nil
408+
})
409+
require.NoError(t, err)
410+
return c
411+
}(),
371412
obsrecv: obsrecv,
372413
headerExtractor: &nopHeaderExtractor{},
373414
telemetryBuilder: telemetryBuilder,
@@ -393,9 +434,25 @@ func TestMetricsConsumerGroupHandler(t *testing.T) {
393434
wg.Done()
394435
}()
395436

396-
groupClaim.messageChan <- &sarama.ConsumerMessage{}
437+
groupClaim.messageChan <- &sarama.ConsumerMessage{
438+
Headers: []*sarama.RecordHeader{
439+
{
440+
Key: []byte("x-tenant-id"),
441+
Value: []byte("abcdefg"),
442+
},
443+
{
444+
Key: []byte("x-request-ids"),
445+
Value: []byte("1234"),
446+
},
447+
{
448+
Key: []byte("x-request-ids"),
449+
Value: []byte("5678"),
450+
},
451+
},
452+
}
397453
close(groupClaim.messageChan)
398454
wg.Wait()
455+
assert.True(t, called.Load()) // Ensure nextConsumer was called.
399456
}
400457

401458
func TestMetricsConsumerGroupHandler_session_done(t *testing.T) {
@@ -640,11 +697,22 @@ func TestLogsConsumerGroupHandler(t *testing.T) {
640697

641698
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)})
642699
require.NoError(t, err)
700+
var called atomic.Bool
643701
c := logsConsumerGroupHandler{
644-
unmarshaler: &plog.ProtoUnmarshaler{},
645-
logger: zap.NewNop(),
646-
ready: make(chan bool),
647-
nextConsumer: consumertest.NewNop(),
702+
unmarshaler: &plog.ProtoUnmarshaler{},
703+
logger: zap.NewNop(),
704+
ready: make(chan bool),
705+
nextConsumer: func() consumer.Logs {
706+
c, err := consumer.NewLogs(func(ctx context.Context, _ plog.Logs) error {
707+
defer called.Store(true)
708+
info := client.FromContext(ctx)
709+
assert.Equal(t, []string{"abcdefg"}, info.Metadata.Get("x-tenant-id"))
710+
assert.Equal(t, []string{"1234", "5678"}, info.Metadata.Get("x-request-ids"))
711+
return nil
712+
})
713+
require.NoError(t, err)
714+
return c
715+
}(),
648716
obsrecv: obsrecv,
649717
headerExtractor: &nopHeaderExtractor{},
650718
telemetryBuilder: telemetryBuilder,
@@ -670,9 +738,25 @@ func TestLogsConsumerGroupHandler(t *testing.T) {
670738
wg.Done()
671739
}()
672740

673-
groupClaim.messageChan <- &sarama.ConsumerMessage{}
741+
groupClaim.messageChan <- &sarama.ConsumerMessage{
742+
Headers: []*sarama.RecordHeader{
743+
{
744+
Key: []byte("x-tenant-id"),
745+
Value: []byte("abcdefg"),
746+
},
747+
{
748+
Key: []byte("x-request-ids"),
749+
Value: []byte("1234"),
750+
},
751+
{
752+
Key: []byte("x-request-ids"),
753+
Value: []byte("5678"),
754+
},
755+
},
756+
}
674757
close(groupClaim.messageChan)
675758
wg.Wait()
759+
assert.True(t, called.Load()) // Ensure nextConsumer was called.
676760
}
677761

678762
func TestLogsConsumerGroupHandler_session_done(t *testing.T) {
@@ -980,3 +1064,74 @@ func nopTelemetryBuilder(t *testing.T) *metadata.TelemetryBuilder {
9801064
require.NoError(t, err)
9811065
return telemetryBuilder
9821066
}
1067+
1068+
func Test_newContextWithHeaders(t *testing.T) {
1069+
type args struct {
1070+
ctx context.Context
1071+
headers []*sarama.RecordHeader
1072+
}
1073+
tests := []struct {
1074+
name string
1075+
args args
1076+
want map[string][]string
1077+
}{
1078+
{
1079+
name: "no headers",
1080+
args: args{
1081+
ctx: context.Background(),
1082+
headers: []*sarama.RecordHeader{},
1083+
},
1084+
want: map[string][]string{},
1085+
},
1086+
{
1087+
name: "single header",
1088+
args: args{
1089+
ctx: context.Background(),
1090+
headers: []*sarama.RecordHeader{
1091+
{Key: []byte("key1"), Value: []byte("value1")},
1092+
},
1093+
},
1094+
want: map[string][]string{
1095+
"key1": {"value1"},
1096+
},
1097+
},
1098+
{
1099+
name: "multiple headers",
1100+
args: args{
1101+
ctx: context.Background(),
1102+
headers: []*sarama.RecordHeader{
1103+
{Key: []byte("key1"), Value: []byte("value1")},
1104+
{Key: []byte("key2"), Value: []byte("value2")},
1105+
},
1106+
},
1107+
want: map[string][]string{
1108+
"key1": {"value1"},
1109+
"key2": {"value2"},
1110+
},
1111+
},
1112+
{
1113+
name: "duplicate keys",
1114+
args: args{
1115+
ctx: context.Background(),
1116+
headers: []*sarama.RecordHeader{
1117+
{Key: []byte("key1"), Value: []byte("value1")},
1118+
{Key: []byte("key1"), Value: []byte("value2")},
1119+
},
1120+
},
1121+
want: map[string][]string{
1122+
"key1": {"value1", "value2"},
1123+
},
1124+
},
1125+
}
1126+
1127+
for _, tt := range tests {
1128+
t.Run(tt.name, func(t *testing.T) {
1129+
ctx := newContextWithHeaders(tt.args.ctx, tt.args.headers)
1130+
clientInfo := client.FromContext(ctx)
1131+
for k, wantVal := range tt.want {
1132+
val := clientInfo.Metadata.Get(k)
1133+
assert.Equal(t, wantVal, val)
1134+
}
1135+
})
1136+
}
1137+
}

0 commit comments

Comments
 (0)