Skip to content

Commit d6dc135

Browse files
authored
[exporterhelper][queuebatcher] partitioner as the interface to get partition key (#12803)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This RP implements a type `partitioner` that serves as the interface to get partition key. <!-- Issue number if applicable --> #### Link to tracking issue #12795 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 9958693 commit d6dc135

File tree

9 files changed

+91
-2
lines changed

9 files changed

+91
-2
lines changed

exporter/debugexporter/go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,5 @@ replace go.opentelemetry.io/collector/featuregate => ../../featuregate
124124
replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xextension
125125

126126
replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry
127+
128+
replace go.opentelemetry.io/collector/client => ../../client

exporter/exporterhelper/internal/queue_sender.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020

2121
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
2222
type QueueBatchSettings[T any] struct {
23-
Encoding queuebatch.Encoding[T]
24-
Sizers map[request.SizerType]request.Sizer[T]
23+
Encoding queuebatch.Encoding[T]
24+
Sizers map[request.SizerType]request.Sizer[T]
25+
Partitioner queuebatch.Partitioner[T]
2526
}
2627

2728
// NewDefaultQueueConfig returns the default config for queuebatch.Config.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
10+
)
11+
12+
// Partitioner is an interface that returns the the partition key of the given element.
13+
type Partitioner[T any] interface {
14+
GetKey(context.Context, T) string
15+
}
16+
17+
type GetKeyFunc[T any] func(context.Context, T) string
18+
19+
func (f GetKeyFunc[T]) GetKey(ctx context.Context, t T) string {
20+
return f(ctx, t)
21+
}
22+
23+
type basePartitioner struct {
24+
GetKeyFunc[request.Request]
25+
}
26+
27+
func NewPartitioner(
28+
getKeyFunc GetKeyFunc[request.Request],
29+
) Partitioner[request.Request] {
30+
return &basePartitioner{
31+
GetKeyFunc: getKeyFunc,
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch
5+
6+
import (
7+
"context"
8+
"strconv"
9+
"testing"
10+
11+
"github.com/stretchr/testify/require"
12+
13+
"go.opentelemetry.io/collector/client"
14+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
15+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
16+
)
17+
18+
func TestPartitioner_GetKeyFromRequest(t *testing.T) {
19+
partitioner := NewPartitioner(func(_ context.Context, req request.Request) string {
20+
return strconv.Itoa(req.(*requesttest.FakeRequest).ItemsCount())
21+
})
22+
23+
require.Equal(t, "2", partitioner.GetKey(context.Background(), &requesttest.FakeRequest{Items: 2}))
24+
require.Equal(t, "3", partitioner.GetKey(context.Background(), &requesttest.FakeRequest{Items: 3}))
25+
require.Equal(t, "4", partitioner.GetKey(context.Background(), &requesttest.FakeRequest{Items: 4}))
26+
}
27+
28+
func TestPartitioner_GetKeyFromContext(t *testing.T) {
29+
partitioner := NewPartitioner(func(ctx context.Context, _ request.Request) string {
30+
return client.FromContext(ctx).Metadata.Get("metadata_key")[0]
31+
})
32+
33+
ctx1 := client.NewContext(context.Background(), client.Info{
34+
Metadata: client.NewMetadata(map[string][]string{"metadata_key": {"partition1"}}),
35+
})
36+
require.Equal(t, "partition1", partitioner.GetKey(ctx1, &requesttest.FakeRequest{Items: 2}))
37+
38+
ctx2 := client.NewContext(context.Background(), client.Info{
39+
Metadata: client.NewMetadata(map[string][]string{"metadata_key": {"partition2"}}),
40+
})
41+
require.Equal(t, "partition2", partitioner.GetKey(ctx2, &requesttest.FakeRequest{Items: 2}))
42+
}

exporter/exporterhelper/xexporterhelper/go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,5 @@ replace go.opentelemetry.io/collector/featuregate => ../../../featuregate
117117
replace go.opentelemetry.io/collector/confmap => ../../../confmap
118118

119119
replace go.opentelemetry.io/collector/internal/telemetry => ../../../internal/telemetry
120+
121+
replace go.opentelemetry.io/collector/client => ../../../client

exporter/exportertest/go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,5 @@ replace go.opentelemetry.io/collector/featuregate => ../../featuregate
107107
replace go.opentelemetry.io/collector/confmap => ../../confmap
108108

109109
replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry
110+
111+
replace go.opentelemetry.io/collector/client => ../../client

exporter/go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.23.0
55
require (
66
github.com/cenkalti/backoff/v5 v5.0.2
77
github.com/stretchr/testify v1.10.0
8+
go.opentelemetry.io/collector/client v1.29.0
89
go.opentelemetry.io/collector/component v1.30.0
910
go.opentelemetry.io/collector/component/componenttest v0.124.0
1011
go.opentelemetry.io/collector/config/configretry v1.30.0
@@ -112,3 +113,5 @@ replace go.opentelemetry.io/collector/featuregate => ../featuregate
112113
replace go.opentelemetry.io/collector/confmap => ../confmap
113114

114115
replace go.opentelemetry.io/collector/internal/telemetry => ../internal/telemetry
116+
117+
replace go.opentelemetry.io/collector/client => ../client

exporter/nopexporter/go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,5 @@ replace go.opentelemetry.io/collector/featuregate => ../../featuregate
107107
replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xextension
108108

109109
replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry
110+
111+
replace go.opentelemetry.io/collector/client => ../../client

exporter/xexporter/go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,5 @@ replace go.opentelemetry.io/collector/featuregate => ../../featuregate
8888
replace go.opentelemetry.io/collector/confmap => ../../confmap
8989

9090
replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry
91+
92+
replace go.opentelemetry.io/collector/client => ../../client

0 commit comments

Comments
 (0)