-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Kafka: support templated topic name #38888
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Pinging code owners:
See Adding Labels via Comments if you do not have permissions to add labels yourself. |
Not sure, if this single change would also resolve the similar receiver issue (see #32735) or if the same changes would be necessary there. |
@aklemp yes, we should sort out the receiver at the same time. I updated the description. |
Issue filed by code owner, removing |
I like the idea but I am worried about the potential impact, say I have templated out the topic name based on a tenant context, what happens if the client doesn't have permission to create topics, or if I have exceeded the amount of topics possible? (I don't remember what is the upper limit on topics). Then on the flip side, say the exporter is creating all of these topics and sending data fine, how will the consumer know what topics to consume from? |
Clarified on Slack, but for posterity: these are already issues that we would need to deal with, since it's already possible to dynamically choose the topic. Regarding permissions and limits, I wouldn't propose changing the defaults, just making it possible to include additional client metadata in the topic name. We should make it clear in logs and metrics when errors are related to lack of permissions. Regarding the receiver side, there are a couple of options: there's https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/observer/kafkatopicsobserver, which you can use with receivercreator to dynamically create receivers for Kafka topics. I think this may be problematic at scale, as every receiver would have its own client, and make its own network connections -- so the network connections (and other resources) will be proportional to the number of topics. The other option is to add support for topic regex on the receiver side, which is what I suggest since it avoids the issue described above. Sarama does not support this (see IBM/sarama#1379 and IBM/sarama#1382). On the other hand, it is supported by franz-go (https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#ConsumeRegex) and confluent-kafka-go (https://github.com/confluentinc/confluent-kafka-go?tab=readme-ov-file#examples). |
My 5 cents on this: dynamically created topics create more problems than they help in my experience.
In the regulated context i'm currently working, it is actually forbidden to create topics on demand. |
@aklemp I agree with your points, but that is not applicable in my use case. The scenario I am trying to enable is multi-tenancy (https://kafka.apache.org/documentation/#multitenancy-topic-naming), where the tenant ID is associated with a client, but which the client cannot spoof. The number of tenants, and thus topics, are controlled -- so we can estimate load, and there is no additional security risk. We also do not rely on auto-topic creation, we orchestrate this separately -- so we can (and do) tune partitions, retention, etc. Anyway, after thinking about this a bit more, I'm coming around to having per-signal type topic and encoding config. So my new proposal is:
(1) will fix #32735 and #35432 |
With 4, the kafka topic observer is intended to solve that for you. |
@MovieStoreGuy understood, but see #38888 (comment):
If you have hundreds of topics to consume, then you'll end up with hundreds of connections per broker. That can be avoided with regex topic support in the receiver itself. |
I have a rough PoC here using OTTL for simultaneously routing and partitioning using OTTL: https://github.com/open-telemetry/opentelemetry-collector-contrib/compare/main...axw:opentelemetry-collector-contrib:kafkaexporter-topicnamer?expand=1#diff-66b6c342218721c7c6fb45eb5b225182ad8ef742fbc4da7b1c48754693dace4f Most of the branch is copy-pasta to enable context inference for OTTL expressions. To use the client metadata we will need to introduce a standard "request" context similar to the one supported by the routing processor. I'll look into this next week. This approach will enable arbitrary topic routing as well as arbitrary partitioning/message keying. |
While reading open-telemetry/opentelemetry-collector#10825 and open-telemetry/opentelemetry-collector#12795 it occurred to me that the OTTL-based routing I mentioned above can be more generally useful. Instead of baking this in the the Kafka exporter, I think we should either expand the transform processor or introduce a new processor concerned only with partitioning batches using OTTL -- probably the latter. The exporter would just need to learn how to set the topic and/or message key from a metadata value. Then the same functionality can be used with other exporters too, e.g. Pulsar. The config I have in mind is: processors:
partitioner/logs:
keys:
logs_topic: "Concat(request.metadata[\"X-Tenant-Id\"]).otlp_logs"
logs_message_key: "log.trace_id"
partitioner/metrics:
keys:
metrics_topic: "Concat(request.metadata[\"X-Tenant-Id\"]).otlp_metrics"
partitioner/traces:
keys:
traces_topic: "Concat(request.metadata[\"X-Tenant-Id\"]).otlp_spans"
traces_message_key: "span.trace_id"
exporter:
kafka:
logs:
topic_from_metadata: logs_topic
message_key_from_metadata: logs_message_key
metrics:
topic_from_metadata: metrics_topic
traces:
topic_from_metadata: traces_topic
message_key_from_metadata: traces_message_key
... (Note that the key names above are arbitrary. The values are OTTL expressions using context inference.) |
Component(s)
exporter/kafka
Is your feature request related to a problem? Please describe.
The kafkaexporter component can be configured in various ways to determine the topic to which messages are published:
topic
to explicitly specify the topic nametopic_from_attribute
to extract the topic name from a resource attributeThis is complex, and there are problems with each of these options:
topic
in a way that matches the default, i.e. you cannot configure signal-specific topic names: [exporter/kafka] Replace "topic" setting by "traces_topic", "logs_topic" and "metrics_topic" #35432topic_from_attribute
may behave incorrectly: topic_from_attribute does not work as expected #37470Describe the solution you'd like
Replace all of these with a templated topic name, e.g.
otlp_{signal_record_type}s
where "signal_type" expands to "log", "span", or "metric"; variable name is just for illustration. The{...}
bit would be treated as an expression, e.g. using OTTL. So you could for example include:topic: {request["x-tenant-id"]}.whatever
topic: {resource.attributes["service.name"]}-logs
See also #35432 (comment)
Assuming we use OTTL for the template expressions, then we may be able to rely on OTTL context inference to decide when to determine the topic: per request, resource, scope, record, etc. Otherwise, we could limit determining the topic to the resource level.
We should also support a templated topic for kafkareceiver to resolve #32735, but in that case the only expression that would make sense is the signal type. We'll need to consider how the templating syntax interacts with support for topic regular expressions, which the receiver should also ideally support.
Describe alternatives you've considered
No response
Additional context
No response
The text was updated successfully, but these errors were encountered: