Skip to content

Commit 93a3ded

Browse files
[extension/kafkatopicsobserver] add kafka topics observer extension (#37669)
#### Description Initial PR for adding kafka topics observer extension. Continuation of #37372 after renaming a branch. #### Link to tracking issue New component #37665 Co-authored-by: Sean Marciniak <[email protected]>
1 parent 51773c7 commit 93a3ded

22 files changed

+662
-1
lines changed
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: new_component
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkatopicsobserver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Adding new kafka topics observer extension
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37665]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

.github/CODEOWNERS

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ extension/observer/dockerobserver/ @open-telemetry
114114
extension/observer/ecsobserver/ @open-telemetry/collector-contrib-approvers @dmitryax
115115
extension/observer/hostobserver/ @open-telemetry/collector-contrib-approvers @MovieStoreGuy
116116
extension/observer/k8sobserver/ @open-telemetry/collector-contrib-approvers @dmitryax @ChrsMark
117+
extension/observer/kafkatopicsobserver/ @open-telemetry/collector-contrib-approvers @MovieStoreGuy
117118
extension/oidcauthextension/ @open-telemetry/collector-contrib-approvers @jpkrohling
118119
extension/opampcustommessages/ @open-telemetry/collector-contrib-approvers @evan-bradley
119120
extension/opampextension/ @open-telemetry/collector-contrib-approvers @portertech @evan-bradley @tigrannajaryan

.github/ISSUE_TEMPLATE/bug_report.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ body:
117117
- extension/observer/ecstaskobserver
118118
- extension/observer/hostobserver
119119
- extension/observer/k8sobserver
120+
- extension/observer/kafkatopicsobserver
120121
- extension/oidcauth
121122
- extension/opamp
122123
- extension/opampcustommessages

.github/ISSUE_TEMPLATE/feature_request.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ body:
111111
- extension/observer/ecstaskobserver
112112
- extension/observer/hostobserver
113113
- extension/observer/k8sobserver
114+
- extension/observer/kafkatopicsobserver
114115
- extension/oidcauth
115116
- extension/opamp
116117
- extension/opampcustommessages

.github/ISSUE_TEMPLATE/other.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ body:
111111
- extension/observer/ecstaskobserver
112112
- extension/observer/hostobserver
113113
- extension/observer/k8sobserver
114+
- extension/observer/kafkatopicsobserver
114115
- extension/oidcauth
115116
- extension/opamp
116117
- extension/opampcustommessages

.github/ISSUE_TEMPLATE/unmaintained.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ body:
116116
- extension/observer/ecstaskobserver
117117
- extension/observer/hostobserver
118118
- extension/observer/k8sobserver
119+
- extension/observer/kafkatopicsobserver
119120
- extension/oidcauth
120121
- extension/opamp
121122
- extension/opampcustommessages

cmd/otelcontribcol/builder-config.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ receivers:
167167
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/haproxyreceiver v0.119.0
168168
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver v0.119.0
169169
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/httpcheckreceiver v0.119.0
170-
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver v0.119.0
170+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver v0.119.0
171171
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver v0.119.0
172172
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/iisreceiver v0.119.0
173173
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.119.0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../../../Makefile.Common
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Kafka Topics Observer Extension
2+
<!-- status autogenerated section -->
3+
| Status | |
4+
| ------------- |-----------|
5+
| Stability | [beta] |
6+
| Distributions | [contrib] |
7+
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aextension%2Fkafkatopicsobserver%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aextension%2Fkafkatopicsobserver) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aextension%2Fkafkatopicsobserver%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aextension%2Fkafkatopicsobserver) |
8+
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) |
9+
10+
[beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta
11+
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
12+
<!-- end autogenerated section -->
13+
14+
The Kafka topics observer extension is a [Receiver Creator](../../../receiver/receivercreator/README.md)-compatible "watch observer" that will detect and report
15+
kafka topics in kafka cluster based on regex template. This observer watches available topics and matches them with the
16+
provided regex. If any change in available topics matching the regex is detected, the observer updates the endpoints list.
17+
18+
## Configuration
19+
20+
The following settings are required:
21+
22+
- `protocol_version` (no default): Kafka protocol version e.g. 2.0.0
23+
24+
The following settings can be optionally configured:
25+
26+
- `brokers` (default = localhost:9092): The list of kafka brokers
27+
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
28+
- `topic_regex` regex pattern of the topic name to subscribe to.
29+
- `session_timeout` (default = `10s`): The request timeout for detecting client failures when using Kafka’s group management facilities.
30+
- `heartbeat_interval` (default = `3s`): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
31+
- `auth`
32+
- `plain_text`
33+
- `username`: The username to use.
34+
- `password`: The password to use
35+
- `sasl`
36+
- `username`: The username to use.
37+
- `password`: The password to use
38+
- `mechanism`: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER or PLAIN)
39+
- `aws_msk.region`: AWS Region in case of AWS_MSK_IAM or AWS_MSK_IAM_OAUTHBEARER mechanism
40+
- `aws_msk.broker_addr`: MSK Broker address in case of AWS_MSK_IAM mechanism
41+
- `tls`
42+
- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should
43+
only be used if `insecure` is set to false.
44+
- `cert_file`: path to the TLS cert to use for TLS required connections. Should
45+
only be used if `insecure` is set to false.
46+
- `key_file`: path to the TLS key to use for TLS required connections. Should
47+
only be used if `insecure` is set to false.
48+
- `insecure` (default = false): Disable verifying the server's certificate
49+
chain and host name (`InsecureSkipVerify` in the tls config)
50+
- `server_name_override`: ServerName indicates the name of the server requested by the client
51+
in order to support virtual hosting.
52+
- `kerberos`
53+
- `service_name`: Kerberos service name
54+
- `realm`: Kerberos realm
55+
- `use_keytab`: Use of keytab instead of password, if this is true, keytab file will be used instead of password
56+
- `username`: The Kerberos username used for authenticate with KDC
57+
- `password`: The Kerberos password used for authenticate with KDC
58+
- `config_file`: Path to Kerberos configuration. i.e /etc/krb5.conf
59+
- `keytab_file`: Path to keytab file. i.e /etc/security/kafka.keytab
60+
- `disable_fast_negotiation`: Disable PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation. This is set to `false` by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kafkatopicsobserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/kafkatopicsobserver"
5+
6+
import (
7+
"time"
8+
9+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
10+
)
11+
12+
// Config defines configuration for docker observer
13+
type Config struct {
14+
// The list of kafka brokers (default localhost:9092)
15+
Brokers []string `mapstructure:"brokers"`
16+
// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
17+
// each of the provided brokers. It will then do a PTR lookup for each
18+
// returned IP, and that set of names becomes the broker list. This can be
19+
// required in SASL environments.
20+
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
21+
// Kafka protocol version
22+
ProtocolVersion string `mapstructure:"protocol_version"`
23+
// Session interval for the Kafka consumer
24+
SessionTimeout time.Duration `mapstructure:"session_timeout"`
25+
// Heartbeat interval for the Kafka consumer
26+
HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
27+
Authentication kafka.Authentication `mapstructure:"auth"`
28+
TopicRegex string `mapstructure:"topic_regex"`
29+
}
30+
31+
func (config Config) Validate() error {
32+
return nil
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//go:generate mdatagen metadata.yaml
5+
6+
package kafkatopicsobserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/kafkatopicsobserver"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kafkatopicsobserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/kafkatopicsobserver"
5+
6+
import (
7+
"context"
8+
"sync"
9+
"time"
10+
11+
"go.opentelemetry.io/collector/component"
12+
"go.opentelemetry.io/collector/extension"
13+
"go.uber.org/zap"
14+
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
16+
)
17+
18+
// Mock implementation for the initial PR
19+
var (
20+
_ extension.Extension = (*kafkaTopicsObserver)(nil)
21+
_ observer.EndpointsLister = (*kafkaTopicsObserver)(nil)
22+
_ observer.Observable = (*kafkaTopicsObserver)(nil)
23+
)
24+
25+
type kafkaTopicsObserver struct {
26+
*observer.EndpointsWatcher
27+
logger *zap.Logger
28+
config *Config
29+
cancel func()
30+
once *sync.Once
31+
_ context.Context
32+
}
33+
34+
func newObserver(logger *zap.Logger, config *Config) (extension.Extension, error) {
35+
d := &kafkaTopicsObserver{
36+
logger: logger, config: config,
37+
once: &sync.Once{},
38+
cancel: func() {
39+
},
40+
}
41+
d.EndpointsWatcher = observer.NewEndpointsWatcher(d, time.Second, logger)
42+
return d, nil
43+
}
44+
45+
func (d *kafkaTopicsObserver) ListEndpoints() []observer.Endpoint {
46+
var endpoints []observer.Endpoint
47+
return endpoints
48+
}
49+
50+
func (d *kafkaTopicsObserver) Start(_ context.Context, _ component.Host) error {
51+
return nil
52+
}
53+
54+
func (d *kafkaTopicsObserver) Shutdown(_ context.Context) error {
55+
d.StopListAndWatch()
56+
d.cancel()
57+
return nil
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kafkatopicsobserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/kafkatopicsobserver"
5+
6+
import (
7+
"context"
8+
"time"
9+
10+
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/extension"
12+
13+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/kafkatopicsobserver/internal/metadata"
14+
)
15+
16+
const (
17+
defaultBroker = "localhost:9092"
18+
defaultSessionTimeout = 10 * time.Second
19+
defaultHeartbeatInterval = 3 * time.Second
20+
)
21+
22+
// NewFactory should be called to create a factory with default values.
23+
func NewFactory() extension.Factory {
24+
return extension.NewFactory(
25+
metadata.Type,
26+
createDefaultConfig,
27+
createExtension,
28+
component.StabilityLevelBeta,
29+
)
30+
}
31+
32+
func createDefaultConfig() component.Config {
33+
return &Config{
34+
Brokers: []string{defaultBroker},
35+
SessionTimeout: defaultSessionTimeout,
36+
HeartbeatInterval: defaultHeartbeatInterval,
37+
}
38+
}
39+
40+
func createExtension(
41+
_ context.Context,
42+
settings extension.Settings,
43+
cfg component.Config,
44+
) (extension.Extension, error) {
45+
config := cfg.(*Config)
46+
return newObserver(settings.Logger, config)
47+
}

extension/observer/kafkatopicsobserver/generated_component_test.go

+52
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extension/observer/kafkatopicsobserver/generated_package_test.go

+13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)