Skip to content

Commit 964a652

Browse files
authored
[exporter/loadbalancing] Add return_hostnames option to k8s resolver (#35411)
**Description:** Adds an optional configuration option to the k8s resolver which allows for hostnames to be returned instead of IPs. This allows certain scenarios like using istio in sidecar mode. Requirements have been added to the documentation. **Link to tracking Issue:** #18412 **Testing:** Added corresponding hostname-based tests for adding backends/endpoints as well as deleting them. There were also tests missing for the k8s handler and so some tests were added as well there. Specifically failing if you want hostnames, but endpoints are returned that do not have hostnames. Aside from unit tests, also ran this in our lab cluster and deleted pods or performed rollouts to our statefulset. Somewhat tangential to the PR itself, but istio now reports mTLS traffic with zero workarounds required which was the motivation for the issue. **Documentation:** Added documentation explaining the new option and the requirements needed to use it. Also added an additional "important" note object specifically calling out that the k8s resolver needs RBAC to work.
1 parent ab0f6a2 commit 964a652

8 files changed

+259
-33
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: loadbalancingexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Adds a an optional configuration to the k8s resolver which returns hostnames instead of IPs for headless services pointing at statefulsets
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [18412]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/loadbalancingexporter/README.md

+3
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
9696
* `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace.
9797
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods.
9898
* `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used.
99+
* `return_hostnames` will return hostnames instead of IPs. This is useful in certain situations like using istio in sidecar mode. To use this feature, the `service` must be a headless `Service`, pointing at a `StatefulSet`, and the `service` must be what is specified under `.spec.serviceName` in the `StatefulSet`.
99100
* The `aws_cloud_map` node accepts the following properties:
100101
* `namespace` The CloudMap namespace where the service is register, e.g. `cloudmap`. If no `namespace` is specified, this will fail to start the Load Balancer exporter.
101102
* `service_name` The name of the service that you specified when you registered the instance, e.g. `otelcollectors`. If no `service_name` is specified, this will fail to start the Load Balancer exporter.
@@ -231,6 +232,8 @@ service:
231232
```
232233
233234
Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md))
235+
> [!IMPORTANT]
236+
> The k8s resolver requires proper permissions. See [the full example](./example/k8s-resolver/README.md) for more information.
234237
235238
```yaml
236239
receivers:

exporter/loadbalancingexporter/config.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ type DNSResolver struct {
6969

7070
// K8sSvcResolver defines the configuration for the DNS resolver
7171
type K8sSvcResolver struct {
72-
Service string `mapstructure:"service"`
73-
Ports []int32 `mapstructure:"ports"`
74-
Timeout time.Duration `mapstructure:"timeout"`
72+
Service string `mapstructure:"service"`
73+
Ports []int32 `mapstructure:"ports"`
74+
Timeout time.Duration `mapstructure:"timeout"`
75+
ReturnHostnames bool `mapstructure:"return_hostnames"`
7576
}
7677

7778
type AWSCloudMapResolver struct {

exporter/loadbalancingexporter/loadbalancer.go

+1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ func newLoadBalancer(logger *zap.Logger, cfg component.Config, factory component
102102
oCfg.Resolver.K8sSvc.Service,
103103
oCfg.Resolver.K8sSvc.Ports,
104104
oCfg.Resolver.K8sSvc.Timeout,
105+
oCfg.Resolver.K8sSvc.ReturnHostnames,
105106
telemetry,
106107
)
107108
if err != nil {

exporter/loadbalancingexporter/resolver_k8s.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type k8sResolver struct {
6161

6262
endpoints []string
6363
onChangeCallbacks []func([]string)
64+
returnNames bool
6465

6566
stopCh chan struct{}
6667
updateLock sync.RWMutex
@@ -75,6 +76,7 @@ func newK8sResolver(clt kubernetes.Interface,
7576
service string,
7677
ports []int32,
7778
timeout time.Duration,
79+
returnNames bool,
7880
tb *metadata.TelemetryBuilder,
7981
) (*k8sResolver, error) {
8082
if len(service) == 0 {
@@ -115,9 +117,10 @@ func newK8sResolver(clt kubernetes.Interface,
115117

116118
epsStore := &sync.Map{}
117119
h := &handler{
118-
endpoints: epsStore,
119-
logger: logger,
120-
telemetry: tb,
120+
endpoints: epsStore,
121+
logger: logger,
122+
telemetry: tb,
123+
returnNames: returnNames,
121124
}
122125
r := &k8sResolver{
123126
logger: logger,
@@ -131,6 +134,7 @@ func newK8sResolver(clt kubernetes.Interface,
131134
stopCh: make(chan struct{}),
132135
lwTimeout: timeout,
133136
telemetry: tb,
137+
returnNames: returnNames,
134138
}
135139
h.callback = r.resolve
136140

@@ -187,13 +191,19 @@ func (r *k8sResolver) resolve(ctx context.Context) ([]string, error) {
187191
defer r.shutdownWg.Done()
188192

189193
var backends []string
190-
r.endpointsStore.Range(func(address, _ any) bool {
191-
addr := address.(string)
194+
var ep string
195+
r.endpointsStore.Range(func(host, _ any) bool {
196+
switch r.returnNames {
197+
case true:
198+
ep = fmt.Sprintf("%s.%s.%s", host, r.svcName, r.svcNs)
199+
default:
200+
ep = host.(string)
201+
}
192202
if len(r.port) == 0 {
193-
backends = append(backends, addr)
203+
backends = append(backends, ep)
194204
} else {
195205
for _, port := range r.port {
196-
backends = append(backends, net.JoinHostPort(addr, strconv.FormatInt(int64(port), 10)))
206+
backends = append(backends, net.JoinHostPort(ep, strconv.FormatInt(int64(port), 10)))
197207
}
198208
}
199209
return true

exporter/loadbalancingexporter/resolver_k8s_handler.go

+45-12
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,31 @@ import (
1717

1818
var _ cache.ResourceEventHandler = (*handler)(nil)
1919

20+
const (
21+
epMissingHostnamesMsg = "Endpoints object missing hostnames"
22+
)
23+
2024
type handler struct {
21-
endpoints *sync.Map
22-
callback func(ctx context.Context) ([]string, error)
23-
logger *zap.Logger
24-
telemetry *metadata.TelemetryBuilder
25+
endpoints *sync.Map
26+
callback func(ctx context.Context) ([]string, error)
27+
logger *zap.Logger
28+
telemetry *metadata.TelemetryBuilder
29+
returnNames bool
2530
}
2631

2732
func (h handler) OnAdd(obj any, _ bool) {
2833
var endpoints map[string]bool
34+
var ok bool
2935

3036
switch object := obj.(type) {
3137
case *corev1.Endpoints:
32-
endpoints = convertToEndpoints(object)
38+
ok, endpoints = convertToEndpoints(h.returnNames, object)
39+
if !ok {
40+
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
41+
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
42+
return
43+
}
44+
3345
default: // unsupported
3446
h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj))
3547
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
@@ -56,8 +68,14 @@ func (h handler) OnUpdate(oldObj, newObj any) {
5668
return
5769
}
5870

59-
oldEndpoints := convertToEndpoints(oldEps)
60-
newEndpoints := convertToEndpoints(newEps)
71+
_, oldEndpoints := convertToEndpoints(h.returnNames, oldEps)
72+
hostnameOk, newEndpoints := convertToEndpoints(h.returnNames, newEps)
73+
if !hostnameOk {
74+
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", newEps))
75+
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
76+
return
77+
}
78+
6179
changed := false
6280

6381
// Iterate through old endpoints and remove those that are not in the new list.
@@ -80,6 +98,7 @@ func (h handler) OnUpdate(oldObj, newObj any) {
8098
} else {
8199
h.logger.Debug("No changes detected in the endpoints for the service", zap.Any("old", oldEps), zap.Any("new", newEps))
82100
}
101+
83102
default: // unsupported
84103
h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj))
85104
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
@@ -89,13 +108,20 @@ func (h handler) OnUpdate(oldObj, newObj any) {
89108

90109
func (h handler) OnDelete(obj any) {
91110
var endpoints map[string]bool
111+
var ok bool
112+
92113
switch object := obj.(type) {
93114
case *cache.DeletedFinalStateUnknown:
94115
h.OnDelete(object.Obj)
95116
return
96117
case *corev1.Endpoints:
97118
if object != nil {
98-
endpoints = convertToEndpoints(object)
119+
ok, endpoints = convertToEndpoints(h.returnNames, object)
120+
if !ok {
121+
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
122+
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
123+
return
124+
}
99125
}
100126
default: // unsupported
101127
h.logger.Warn("Got an unexpected Kubernetes data type during the removal of the pods for a service", zap.Any("obj", obj))
@@ -110,14 +136,21 @@ func (h handler) OnDelete(obj any) {
110136
}
111137
}
112138

113-
func convertToEndpoints(eps ...*corev1.Endpoints) map[string]bool {
114-
ipAddress := map[string]bool{}
139+
func convertToEndpoints(retNames bool, eps ...*corev1.Endpoints) (bool, map[string]bool) {
140+
res := map[string]bool{}
115141
for _, ep := range eps {
116142
for _, subsets := range ep.Subsets {
117143
for _, addr := range subsets.Addresses {
118-
ipAddress[addr.IP] = true
144+
if retNames {
145+
if addr.Hostname == "" {
146+
return false, nil
147+
}
148+
res[addr.Hostname] = true
149+
} else {
150+
res[addr.IP] = true
151+
}
119152
}
120153
}
121154
}
122-
return ipAddress
155+
return true, res
123156
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package loadbalancingexporter
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
corev1 "k8s.io/api/core/v1"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
)
13+
14+
func TestConvertToEndpoints(tst *testing.T) {
15+
// Create dummy Endpoints objects
16+
endpoints1 := &corev1.Endpoints{
17+
ObjectMeta: metav1.ObjectMeta{
18+
Name: "test-endpoints-1",
19+
Namespace: "test-namespace",
20+
},
21+
Subsets: []corev1.EndpointSubset{
22+
{
23+
Addresses: []corev1.EndpointAddress{
24+
{
25+
Hostname: "pod-1",
26+
IP: "192.168.10.101",
27+
},
28+
},
29+
},
30+
},
31+
}
32+
endpoints2 := &corev1.Endpoints{
33+
ObjectMeta: metav1.ObjectMeta{
34+
Name: "test-endpoints-2",
35+
Namespace: "test-namespace",
36+
},
37+
Subsets: []corev1.EndpointSubset{
38+
{
39+
Addresses: []corev1.EndpointAddress{
40+
{
41+
Hostname: "pod-2",
42+
IP: "192.168.10.102",
43+
},
44+
},
45+
},
46+
},
47+
}
48+
endpoints3 := &corev1.Endpoints{
49+
ObjectMeta: metav1.ObjectMeta{
50+
Name: "test-endpoints-3",
51+
Namespace: "test-namespace",
52+
},
53+
Subsets: []corev1.EndpointSubset{
54+
{
55+
Addresses: []corev1.EndpointAddress{
56+
{
57+
IP: "192.168.10.103",
58+
},
59+
},
60+
},
61+
},
62+
}
63+
64+
tests := []struct {
65+
name string
66+
returnNames bool
67+
includedEndpoints []*corev1.Endpoints
68+
expectedEndpoints map[string]bool
69+
wantNil bool
70+
}{
71+
{
72+
name: "return hostnames",
73+
returnNames: true,
74+
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2},
75+
expectedEndpoints: map[string]bool{"pod-1": true, "pod-2": true},
76+
wantNil: false,
77+
},
78+
{
79+
name: "return IPs",
80+
returnNames: false,
81+
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2, endpoints3},
82+
expectedEndpoints: map[string]bool{"192.168.10.101": true, "192.168.10.102": true, "192.168.10.103": true},
83+
wantNil: false,
84+
},
85+
{
86+
name: "missing hostname",
87+
returnNames: true,
88+
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints3},
89+
expectedEndpoints: nil,
90+
wantNil: true,
91+
},
92+
}
93+
94+
for _, tt := range tests {
95+
tst.Run(tt.name, func(tst *testing.T) {
96+
ok, res := convertToEndpoints(tt.returnNames, tt.includedEndpoints...)
97+
if tt.wantNil {
98+
assert.Nil(tst, res)
99+
} else {
100+
assert.Equal(tst, tt.expectedEndpoints, res)
101+
}
102+
assert.Equal(tst, !tt.wantNil, ok)
103+
})
104+
}
105+
}

0 commit comments

Comments
 (0)