Skip to content

Commit e29a2ec

Browse files
wojtekzylaatoulme
authored andcommitted
[receiver/receivercreator] Ffeat/add kafkatopicsobserver to receivercreator (open-telemetry#39086)
#### Description Add kafkatopicsobserver to the receivercreator configuration <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue New component open-telemetry#37665 #### Testing Manual and unit test #### Documentation Added new sections in README file --------- Co-authored-by: Antoine Toulme <[email protected]>
1 parent 04fd3a2 commit e29a2ec

File tree

8 files changed

+69
-4
lines changed

8 files changed

+69
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receivercreator
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add kafkatopicsobserver to the receivercreator configuration
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37665]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/receivercreator/README.md

+31-2
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ None
153153
|--------------------|-------------------|
154154
| k8s.namespace.name | \`namespace\` |
155155

156+
`type == "kafka.topics"`
157+
158+
None
159+
156160
See `redis/2` in [examples](#examples).
157161

158162

@@ -289,6 +293,12 @@ targeting it will have different variables available.
289293
| labels | A key-value map of user-specified node metadata | Map with String key and value |
290294
| kubelet_endpoint_port | The node Status object's DaemonEndpoints.KubeletEndpoint.Port value | Integer |
291295

296+
### Kafka Topics
297+
| Variable | Description | Data Type |
298+
|-----------------------|----------------------------------------------------------------------|-------------------------------|
299+
| type | `"kafka.topics"` | String |
300+
| id | ID of source endpoint | String |
301+
292302
## Examples
293303

294304
```yaml
@@ -299,6 +309,11 @@ extensions:
299309
observe_services: true
300310
observe_ingresses: true
301311
host_observer:
312+
kafkatopics_observer:
313+
brokers: ["1.2.3.4:9093"]
314+
protocol_version: 3.9.0
315+
topic_regex: "^foo_topic[0-9]$"
316+
topics_sync_interval: 5s
302317
303318
receivers:
304319
receiver_creator/1:
@@ -417,6 +432,20 @@ receivers:
417432
- type: add
418433
field: attributes.log.template
419434
value: lazybox
435+
receiver_creator/kafka:
436+
watch_observers: [kafkatopics_observer]
437+
receivers:
438+
kafka:
439+
rule: type == "kafka.topics"
440+
config:
441+
protocol_version: 3.9.0
442+
topic: '`endpoint`'
443+
encoding: text
444+
brokers: ["1.2.3.4:9093"]
445+
initial_offset: earliest
446+
header_extraction:
447+
extract_headers: true
448+
headers: ["index", "source", "sourcetype", "host"]
420449

421450
processors:
422451
exampleprocessor:
@@ -431,10 +460,10 @@ service:
431460
processors: [exampleprocessor]
432461
exporters: [exampleexporter]
433462
logs:
434-
receivers: [receiver_creator/logs]
463+
receivers: [receiver_creator/logs, receiver_creator/kafka]
435464
processors: [exampleprocessor]
436465
exporters: [exampleexporter]
437-
extensions: [k8s_observer, host_observer]
466+
extensions: [k8s_observer, host_observer, kafkatopics_observer]
438467
```
439468
440469
The full list of settings exposed for this receiver are documented in [config.go](./config.go)

receiver/receivercreator/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {
106106

107107
for endpointType := range cfg.ResourceAttributes {
108108
switch endpointType {
109-
case observer.ContainerType, observer.K8sServiceType, observer.K8sIngressType, observer.HostPortType, observer.K8sNodeType, observer.PodType, observer.PortType, observer.PodContainerType:
109+
case observer.ContainerType, observer.K8sServiceType, observer.K8sIngressType, observer.HostPortType, observer.K8sNodeType, observer.PodType, observer.PortType, observer.PodContainerType, observer.KafkaTopicType:
110110
default:
111111
return fmt.Errorf("resource attributes for unsupported endpoint type %q", endpointType)
112112
}

receiver/receivercreator/config_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ func TestLoadConfig(t *testing.T) {
120120
observer.K8sServiceType: {"k8s.service.key": "k8s.service.value"},
121121
observer.K8sIngressType: {"k8s.ingress.key": "k8s.ingress.value"},
122122
observer.K8sNodeType: {"k8s.node.key": "k8s.node.value"},
123+
observer.KafkaTopicType: {},
123124
},
124125
},
125126
},

receiver/receivercreator/factory.go

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func createDefaultConfig() component.Config {
6666
conventions.AttributeK8SNodeName: "`name`",
6767
conventions.AttributeK8SNodeUID: "`uid`",
6868
},
69+
observer.KafkaTopicType: map[string]string{},
6970
},
7071
receiverTemplates: map[string]receiverTemplate{},
7172
}

receiver/receivercreator/fixtures_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ var k8sNodeEndpoint = observer.Endpoint{
150150
},
151151
}
152152

153+
var kafkaTopicsEndpoint = observer.Endpoint{
154+
ID: "topic1",
155+
Target: "topic1",
156+
Details: &observer.KafkaTopic{},
157+
}
158+
153159
var unsupportedEndpoint = observer.Endpoint{
154160
ID: "endpoint-1",
155161
Target: "localhost:1234",

receiver/receivercreator/rules.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type rule struct {
2222

2323
// ruleRe is used to verify the rule starts type check.
2424
var ruleRe = regexp.MustCompile(
25-
fmt.Sprintf(`^type\s*==\s*(%q|%q|%q|%q|%q|%q|%q|%q)`, observer.PodType, observer.K8sServiceType, observer.K8sIngressType, observer.PortType, observer.PodContainerType, observer.HostPortType, observer.ContainerType, observer.K8sNodeType),
25+
fmt.Sprintf(`^type\s*==\s*(%q|%q|%q|%q|%q|%q|%q|%q|%q)`, observer.PodType, observer.K8sServiceType, observer.K8sIngressType, observer.PortType, observer.PodContainerType, observer.HostPortType, observer.ContainerType, observer.K8sNodeType, observer.KafkaTopicType),
2626
)
2727

2828
// newRule creates a new rule instance.

receiver/receivercreator/rules_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func Test_ruleEval(t *testing.T) {
3434
{"basic k8s.node", args{`type == "k8s.node" && kubelet_endpoint_port == 10250`, k8sNodeEndpoint}, true, false},
3535
{"relocated type builtin", args{`type == "k8s.node" && typeOf("some string") == "string"`, k8sNodeEndpoint}, true, false},
3636
{"pod container", args{`type == "pod.container" and container_image matches "redis"`, podContainerEndpointWithHints}, true, false},
37+
{"kafka topics", args{`type == "kafka.topics"`, kafkaTopicsEndpoint}, true, false},
3738
}
3839
for _, tt := range tests {
3940
t.Run(tt.name, func(t *testing.T) {

0 commit comments

Comments
 (0)