-
Notifications
You must be signed in to change notification settings - Fork 2.7k
[exporter/loadbalancingexporter] support k8s service resolver #22776
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@codeboten Hi 😄 I submitted a pull request a few days ago and have not heard back from you, do you need more information or have another request? Thank you for your time! |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
@jpkrohling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @fyuan1316, pinging @jpkrohling as the code owner to review the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really cool, thank you! There are a few comments, and I couldn't test it manually as I couldn't build this due to:
go: open /home/jpkroehling/Projects/github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/go.mod: no such file or directory
go: open /home/jpkroehling/Projects/github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/go.mod: no such file or directory
go test -race -timeout 300s -parallel 4 --tags="" ./...
go: open /home/jpkroehling/Projects/github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/go.mod: no such file or directory
make: *** [Makefile.Common:98: test] Error 1
I'll give it another try once you rebase this.
var tests = []struct { | ||
name string | ||
cfg *Config | ||
want *loadBalancerImp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove this one
}, | ||
}, | ||
}, | ||
want: nil, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If both tests are using the same value(nil), you don't need it here.
t.Run(tt.name, func(t *testing.T) { | ||
p, err := newLoadBalancer(exportertest.NewNopCreateSettings(), tt.cfg, nil) | ||
|
||
if tt.wantErr != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can replace the if/else here with:
assert.Equal(t, tt.wantErr)
case *corev1.Endpoints: | ||
endpoints = convertToEndpoints(object) | ||
default: // unsupported | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this happens, you definitely want to know about. Log a warn here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done ( log warn in unsupported branch. )
} | ||
h.logger.Debug("onAdd check", zap.Bool("changed", changed)) | ||
if changed { | ||
h.callback(context.TODO()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either a Background(), or something with a timeout. Same on other similar places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
endpoints := convertToEndpoints(newEps) | ||
changed := false | ||
for _, ep := range endpoints { | ||
if _, loaded := h.endpoints.LoadOrStore(ep, ep); !loaded { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of storing a map[string]string, you could store a map[string]bool, and set this value here to true. You'd save some space with that, while still having the semantics you need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -66,6 +66,12 @@ exporters: | |||
- backend-2:4317 | |||
- backend-3:4317 | |||
- backend-4:4317 | |||
## use k8s service resolver, if collector runs in kubernetes environment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new resolver deserves more information than that. There are other parts of the documentation that needs to be changed, especially line 31.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return true | ||
}, time.Second, 20*time.Millisecond) | ||
|
||
// step-3 delete, simulate deletion of backends |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to have this on its own test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't get your point here, you mean a separate test method for the delete operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant that the step 3 should be on its own test function: instead of having one test doing everything, have smaller tests exercising smaller units of your code.
@jpkrohling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good, I had just a couple more comments.
@@ -28,13 +28,16 @@ This also supports service name based exporting for traces. If you have two or m | |||
Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the processor. | |||
|
|||
* The `otlp` property configures the template used for building the OTLP exporter. Refer to the OTLP Exporter documentation for information on which options are available. Note that the `endpoint` property should not be set and will be overridden by this exporter with the backend endpoint. | |||
* The `resolver` accepts either a `static` node, or a `dns`. If both are specified, `dns` takes precedence. | |||
* The `resolver` accepts a `static` node, a `dns` or a `k8s` node. If all three are specified, `k8s` takes precedence. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* The `resolver` accepts a `static` node, a `dns` or a `k8s` node. If all three are specified, `k8s` takes precedence. | |
* The `resolver` accepts a `static` node, a `dns` or a `k8s` service. If all three are specified, `k8s` takes precedence. |
* The `hostname` property inside a `dns` node specifies the hostname to query in order to obtain the list of IP addresses. | ||
* The `dns` node also accepts the following optional properties: | ||
* `hostname` DNS hostname to resolve. | ||
* `port` port to be used for exporting the traces to the IP addresses resolved from `hostname`. If `port` is not specified, the default port 4317 is used. | ||
* `interval` resolver interval in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `5s` will be used. | ||
* `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used. | ||
* The `k8s` node accepts the following optional properties: | ||
* `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace. | ||
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. | |
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods. |
@@ -83,6 +86,49 @@ service: | |||
- loadbalancing | |||
``` | |||
|
|||
K8s resolver example |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
K8s resolver example | |
Kubernetes resolver example |
|
||
var ( | ||
errNoSvc = errors.New("no service specified to resolve the backends") | ||
k8sResolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "k8s service") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use k8s
here, as the other ones use the config key as the resolver name.
k8sResolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "k8s service") | |
k8sResolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "k8s") |
if len(nAddr) > 1 { | ||
namespace = nAddr[1] | ||
} else { | ||
logger.Info("no namespace was provided, introspection firstly") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.Info("no namespace was provided, introspection firstly") | |
logger.Info("the namespace for the Kubernetes service wasn't provided, trying to determine the current namespace", zap.String("name", name)) |
case *corev1.Endpoints: | ||
endpoints = convertToEndpoints(object) | ||
default: // unsupported | ||
h.logger.Warn("onAdd unable to handle object", zap.Any("obj", obj)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be best to record the object's type, rather than the object itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The message could also be more user-focused: Got an unexpected Kubernetes data type during the inclusion of a new pods for the service
(and add the service).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that's much better, will adjust accordingly.
h.callback(context.Background()) | ||
} | ||
default: // unsupported | ||
h.logger.Warn("onUpdate unable to handle object", zap.Any("obj", oldObj)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
h.logger.Warn("onUpdate unable to handle object", zap.Any("obj", oldObj)) | |
h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, this makes the meaning very clear.
endpoints = convertToEndpoints(object) | ||
} | ||
default: // unsupported | ||
h.logger.Warn("onDelete unable to handle object", zap.Any("obj", obj)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: improve the message, perhaps based on the previous suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return true | ||
}, time.Second, 20*time.Millisecond) | ||
|
||
// step-3 delete, simulate deletion of backends |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant that the step 3 should be on its own test function: instead of having one test doing everything, have smaller tests exercising smaller units of your code.
@fyuan1316 hi, after resolving conflicts, I think we can push this PR to get merged. :-P |
refs:open-telemetry#22776 Signed-off-by: Jared Tan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I resolved the go.mod conflicts, @jpkrohling can you resolve the conversations if they've been addressed and give this another reivew
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a changelog for the enhancement
refs:open-telemetry#22776 Signed-off-by: Jared Tan <[email protected]>
I think this configuration is short and clear, and it states precisely the minimum requirements to run the k8s service resolver. Can we put it in the readme as a documentation description? |
+1 for me, I used this way to test in my environment before, so I'm in favor of using those CR to describe it (maybe in the next separate PR). |
I created a new issue to follow up on document change #24287 . |
// Check whether the namespace file exists. | ||
// If not, we are not running in cluster so can't guess the namespace. | ||
if _, err := os.Stat(inClusterNamespacePath); os.IsNotExist(err) { | ||
return "", fmt.Errorf("not running in-cluster, please specify LeaderElectionNamespace") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LeaderElectionNamespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, maybe we just need to use namespaces and "LeaderElectionNamespace" has no role in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
endpoints = convertToEndpoints(object) | ||
default: // unsupported | ||
h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj)) | ||
_ = stats.RecordWithTags(context.Background(), k8sResolverSuccessFalseMutators, mNumResolutions.M(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, create an issue to track this. Note that the same pattern repeats elsewhere.
Hi @jpkrohling : There are currently two follow-up issues: PTAL |
It looks like the CI is still failing:
|
0918ff2
to
c9e78df
Compare
Thanks ! @jpkrohling |
The tests are failing:
|
Sorry, there's still an error. It looks like the difference between local configuration (with kubeconfig configured) and ci not configured, leads to different expected results. I would prefer a simple solution to this problem as follows: p, err := newLoadBalancer(exportertest.NewNopCreateSettings(), cfg, nil)
assert.Nil(t, p)
assert.Equal(t, errNoSvc, err) modified to p, err := newLoadBalancer(exportertest.NewNopCreateSettings(), cfg, nil)
assert.Nil(t, p)
assert.True(t, errors.Is(err, clientcmd.ErrEmptyConfig) || errors.Is(err, errNoSvc)) What do you think? @jpkrohling |
I believe there's a |
Since clientcmd.errConfigurationInvalid was not exported and errNoSvc is not a struct, my attempts at assert.ErrorAs were unsuccessful. assert.True(t, clientcmd.IsConfigurationInvalid(err) || errors.Is(err, errNoSvc)) |
My bad, looks like the previous modification got lost. I'll update it later after I test it locally. By the way, I'm curious to know if I can trigger the CI pipeline to execute by myself, for example, is there a label way to do it? |
Signed-off-by: Yuan Fang <[email protected]>
Tested locally and passed, hopefully this time it will work. make -j2 golint GROUP=other
make -j2 goporto
make gotest-with-cover GROUP=other
make gotest GROUP=other |
It only requires approval for the first contribution. Once this is merged, your next contributions won't require an approval to be executed. |
Got it, thanks. |
@jpkrohling Please take a look. Is it possible to merge now? |
Description:
Add k8s service resolver for exporter/loadbalancingexporter
The exporter/loadbalancingexporter component currently supports both static and dns resolvers, and does not currently support the ability to load balance pods when the collector application is running in a kubernetes environment. (Backends address discovery is achieved by monitoring kubernetes endpoints resources). This pr provides that capability.
Link to tracking Issue:
suitable for scenarios where services are located in a k8s cluster #18412
Testing:
Documentation: