Skip to content

Commit 914d5a7

Browse files
authored
Merge branch 'main' into s3exporter-canned-acl
2 parents 125f594 + ad684d6 commit 914d5a7

16 files changed

+501
-72
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: new_component
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkatopicsobserver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Adding implementation and tests of the component's logic."
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37665]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

extension/observer/endpoints.go

+13
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const (
3636
HostPortType EndpointType = "hostport"
3737
// ContainerType is a container endpoint.
3838
ContainerType EndpointType = "container"
39+
// KafkaTopicType is a kafka topic endpoint
40+
KafkaTopicType EndpointType = "kafka.topics"
3941
)
4042

4143
var (
@@ -45,6 +47,7 @@ var (
4547
_ EndpointDetails = (*K8sNode)(nil)
4648
_ EndpointDetails = (*HostPort)(nil)
4749
_ EndpointDetails = (*Container)(nil)
50+
_ EndpointDetails = (*KafkaTopic)(nil)
4851
)
4952

5053
// EndpointDetails provides additional context about an endpoint such as a Pod or Port.
@@ -387,3 +390,13 @@ func (n *K8sNode) Env() EndpointEnv {
387390
func (n *K8sNode) Type() EndpointType {
388391
return K8sNodeType
389392
}
393+
394+
type KafkaTopic struct{}
395+
396+
func (k *KafkaTopic) Env() EndpointEnv {
397+
return map[string]any{}
398+
}
399+
400+
func (k *KafkaTopic) Type() EndpointType {
401+
return KafkaTopicType
402+
}

extension/observer/endpoints_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,20 @@ func TestEndpointEnv(t *testing.T) {
282282
"host": "192.68.73.2",
283283
},
284284
},
285+
{
286+
name: "Kafka topic",
287+
endpoint: Endpoint{
288+
ID: EndpointID("topic1"),
289+
Target: "topic1",
290+
Details: &KafkaTopic{},
291+
},
292+
want: EndpointEnv{
293+
"id": "topic1",
294+
"type": "kafka.topics",
295+
"host": "topic1",
296+
"endpoint": "topic1",
297+
},
298+
},
285299
}
286300
for _, tt := range tests {
287301
t.Run(tt.name, func(t *testing.T) {

extension/observer/kafkatopicsobserver/README.md

+3-4
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
<!-- status autogenerated section -->
33
| Status | |
44
| ------------- |-----------|
5-
| Stability | [beta] |
6-
| Distributions | [contrib] |
5+
| Stability | [development] |
6+
| Distributions | [] |
77
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aextension%2Fkafkatopicsobserver%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aextension%2Fkafkatopicsobserver) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aextension%2Fkafkatopicsobserver%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aextension%2Fkafkatopicsobserver) |
88
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) |
99

10-
[beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta
11-
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
10+
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
1211
<!-- end autogenerated section -->
1312

1413
The Kafka topics observer extension is a [Receiver Creator](../../../receiver/receivercreator/README.md)-compatible "watch observer" that will detect and report

extension/observer/kafkatopicsobserver/config.go

+27-5
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
package kafkatopicsobserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/kafkatopicsobserver"
55

66
import (
7+
"fmt"
78
"time"
89

10+
"go.uber.org/multierr"
11+
912
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
1013
)
1114

@@ -23,11 +26,30 @@ type Config struct {
2326
// Session interval for the Kafka consumer
2427
SessionTimeout time.Duration `mapstructure:"session_timeout"`
2528
// Heartbeat interval for the Kafka consumer
26-
HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
27-
Authentication kafka.Authentication `mapstructure:"auth"`
28-
TopicRegex string `mapstructure:"topic_regex"`
29+
HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
30+
Authentication kafka.Authentication `mapstructure:"auth"`
31+
TopicRegex string `mapstructure:"topic_regex"`
32+
TopicsSyncInterval time.Duration `mapstructure:"topics_sync_interval"`
2933
}
3034

31-
func (config Config) Validate() error {
32-
return nil
35+
func (config *Config) Validate() (errs error) {
36+
if len(config.Brokers) == 0 {
37+
errs = multierr.Append(errs, fmt.Errorf("brokers list must be specified"))
38+
}
39+
if len(config.ProtocolVersion) == 0 {
40+
errs = multierr.Append(errs, fmt.Errorf("protocol_version must be specified"))
41+
}
42+
if len(config.TopicRegex) == 0 {
43+
errs = multierr.Append(errs, fmt.Errorf("topic_regex must be specified"))
44+
}
45+
if config.TopicsSyncInterval <= 0 {
46+
errs = multierr.Append(errs, fmt.Errorf("topics_sync_interval must be greater than 0"))
47+
}
48+
if config.SessionTimeout <= 0 {
49+
errs = multierr.Append(errs, fmt.Errorf("session_timeout must be greater than 0"))
50+
}
51+
if config.HeartbeatInterval <= 0 {
52+
errs = multierr.Append(errs, fmt.Errorf("heartbeat_interval must be greater than 0"))
53+
}
54+
return errs
3355
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kafkatopicsobserver
5+
6+
import (
7+
"path/filepath"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/confmap"
15+
"go.opentelemetry.io/collector/confmap/confmaptest"
16+
"go.opentelemetry.io/collector/confmap/xconfmap"
17+
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/kafkatopicsobserver/internal/metadata"
19+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
20+
)
21+
22+
func TestLoadConfig(t *testing.T) {
23+
t.Parallel()
24+
25+
tests := []struct {
26+
id component.ID
27+
expected component.Config
28+
expectedError string
29+
}{
30+
{
31+
id: component.NewID(metadata.Type),
32+
expected: NewFactory().CreateDefaultConfig(),
33+
expectedError: "protocol_version must be specified; topic_regex must be specified",
34+
},
35+
{
36+
id: component.NewIDWithName(metadata.Type, "all_settings"),
37+
expected: &Config{
38+
ProtocolVersion: "3.7.0",
39+
Brokers: []string{"1.2.3.4:9092", "2.3.4.5:9092"},
40+
TopicRegex: "^topic[0-9]$",
41+
TopicsSyncInterval: 5 * time.Second,
42+
ResolveCanonicalBootstrapServersOnly: false,
43+
SessionTimeout: 30 * time.Second,
44+
HeartbeatInterval: 20 * time.Second,
45+
Authentication: kafka.Authentication{
46+
PlainText: &kafka.PlainTextConfig{
47+
Username: "fooUser",
48+
Password: "fooPassword",
49+
},
50+
},
51+
},
52+
},
53+
}
54+
for _, tt := range tests {
55+
t.Run(tt.id.String(), func(t *testing.T) {
56+
cfg := loadConfig(t, tt.id)
57+
if tt.expectedError != "" {
58+
assert.EqualError(t, xconfmap.Validate(cfg), tt.expectedError)
59+
} else {
60+
assert.NoError(t, xconfmap.Validate(cfg))
61+
}
62+
assert.Equal(t, tt.expected, cfg)
63+
})
64+
}
65+
}
66+
67+
func TestValidateConfig(t *testing.T) {
68+
cfg := &Config{
69+
Brokers: []string{},
70+
ProtocolVersion: "3.7.0",
71+
TopicRegex: "^test[0-9]$",
72+
}
73+
assert.Equal(t, "brokers list must be specified; topics_sync_interval must be greater than 0; session_timeout must be greater than 0; heartbeat_interval must be greater than 0", xconfmap.Validate(cfg).Error())
74+
75+
cfg = &Config{
76+
Brokers: []string{"1.2.3.4:9092"},
77+
ProtocolVersion: "",
78+
TopicRegex: "^topic[0-9]$",
79+
TopicsSyncInterval: 1 * time.Second,
80+
SessionTimeout: 1 * time.Second,
81+
HeartbeatInterval: 1 * time.Second,
82+
}
83+
assert.Equal(t, "protocol_version must be specified", xconfmap.Validate(cfg).Error())
84+
85+
cfg = &Config{
86+
Brokers: []string{"1.2.3.4:9092"},
87+
ProtocolVersion: "3.7.0",
88+
TopicRegex: "",
89+
TopicsSyncInterval: 1 * time.Second,
90+
SessionTimeout: 1 * time.Second,
91+
HeartbeatInterval: 1 * time.Second,
92+
}
93+
assert.Equal(t, "topic_regex must be specified", xconfmap.Validate(cfg).Error())
94+
95+
cfg = &Config{
96+
Brokers: []string{"1.2.3.4:9092"},
97+
ProtocolVersion: "3.7.0",
98+
TopicRegex: "^topic[0-9]$",
99+
TopicsSyncInterval: 0 * time.Second,
100+
SessionTimeout: 1 * time.Second,
101+
HeartbeatInterval: 1 * time.Second,
102+
}
103+
assert.Equal(t, "topics_sync_interval must be greater than 0", xconfmap.Validate(cfg).Error())
104+
105+
cfg = &Config{
106+
Brokers: []string{"1.2.3.4:9092"},
107+
ProtocolVersion: "3.7.0",
108+
TopicRegex: "^topic[0-9]$",
109+
TopicsSyncInterval: 1 * time.Second,
110+
SessionTimeout: 0 * time.Second,
111+
HeartbeatInterval: 1 * time.Second,
112+
}
113+
assert.Equal(t, "session_timeout must be greater than 0", xconfmap.Validate(cfg).Error())
114+
115+
cfg = &Config{
116+
Brokers: []string{"1.2.3.4:9092"},
117+
ProtocolVersion: "3.7.0",
118+
TopicRegex: "^topic[0-9]$",
119+
TopicsSyncInterval: 1 * time.Second,
120+
SessionTimeout: 1 * time.Second,
121+
HeartbeatInterval: 0 * time.Second,
122+
}
123+
assert.Equal(t, "heartbeat_interval must be greater than 0", xconfmap.Validate(cfg).Error())
124+
125+
cfg = &Config{
126+
Brokers: []string{"1.2.3.4:9092"},
127+
ProtocolVersion: "3.7.0",
128+
TopicRegex: "^topic[0-9]$",
129+
TopicsSyncInterval: 1 * time.Second,
130+
SessionTimeout: 1 * time.Second,
131+
HeartbeatInterval: 1 * time.Second,
132+
}
133+
assert.NoError(t, xconfmap.Validate(cfg))
134+
}
135+
136+
func loadConf(tb testing.TB, path string, id component.ID) *confmap.Conf {
137+
cm, err := confmaptest.LoadConf(filepath.Join("testdata", path))
138+
require.NoError(tb, err)
139+
sub, err := cm.Sub(id.String())
140+
require.NoError(tb, err)
141+
return sub
142+
}
143+
144+
func loadConfig(tb testing.TB, id component.ID) *Config {
145+
factory := NewFactory()
146+
cfg := factory.CreateDefaultConfig()
147+
sub := loadConf(tb, "config.yaml", id)
148+
require.NoError(tb, sub.Unmarshal(cfg))
149+
return cfg.(*Config)
150+
}

0 commit comments

Comments
 (0)