Skip to content

kafkareceiver: refactor to make consumer generic #39360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 22, 2025

Conversation

axw
Copy link
Contributor

@axw axw commented Apr 12, 2025

Description

Remove code that is repeated for every signal, and implement a generic consumer that calls out to a signal-specific message handler.

We also update all the tests to use kfake rather than mocks so we're exercising all of the receiver. These tests are parallelised, so all tests run faster than before, and with greater coverage.

Finally, we add max_fetch_wait (defaults to 250ms) to the receiver's configuration. This was added for speeding up the tests, but may also be useful for users of the receiver.

Link to tracking issue

N/A

Testing

See description - tests now use the real client, rather than mocks, using https://github.com/twmb/franz-go/tree/master/pkg/kfake.

Documentation

Updated README.

@axw axw force-pushed the kafkareceiver-generic branch 4 times, most recently from b5131cd to 9317964 Compare April 14, 2025 03:07
Remove code that is repeated for every signal,
and implement a generic consumer that calls out
to a signal-specific message handler.

We also update all the tests to use kfake rather
than mocks so we're exercising all of the receiver.
These tests are parallelised, so all tests run
faster than before, and with greater coverage.

Finally, we add `max_fetch_wait` (defaults to 250ms)
to the receiver's configuration. This was added for
speeding up the tests, but may also be useful for
users of the receiver.
@axw axw force-pushed the kafkareceiver-generic branch from 9317964 to 1b76806 Compare April 14, 2025 03:20
@axw axw marked this pull request as ready for review April 14, 2025 03:43
@axw axw requested a review from a team as a code owner April 14, 2025 03:43
@axw
Copy link
Contributor Author

axw commented Apr 14, 2025

Note to reviewer(s): I would suggest not looking at the diff, but open up kafka_exporter.go and kafka_exporter_test.go as if it's new code.

@atoulme atoulme marked this pull request as draft April 17, 2025 05:31
@atoulme
Copy link
Contributor

atoulme commented Apr 17, 2025

Moving to draft for now, please resolve the conflict and mark ready for review again.

@axw axw marked this pull request as ready for review April 17, 2025 07:18
func (c *kafkaConsumer) consumeLoop(handler sarama.ConsumerGroupHandler) {
defer close(c.consumeLoopClosed)

ctx := context.Background()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why not to use a passed context and rely on the background context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the loop does is call the consumer group's Consume method in a loop. There's only one way the loop should terminate, which is when the consumer group is closed, and that happens when Shutdown is called. Passing in a context would introduce another way the loop might exit, adding complexity.

@MovieStoreGuy MovieStoreGuy merged commit eac240c into open-telemetry:main Apr 22, 2025
172 checks passed
@github-actions github-actions bot added this to the next release milestone Apr 22, 2025
@axw axw deleted the kafkareceiver-generic branch April 22, 2025 23:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants