-
Notifications
You must be signed in to change notification settings - Fork 2.7k
kafkareceiver: refactor to make consumer generic #39360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b5131cd
to
9317964
Compare
Remove code that is repeated for every signal, and implement a generic consumer that calls out to a signal-specific message handler. We also update all the tests to use kfake rather than mocks so we're exercising all of the receiver. These tests are parallelised, so all tests run faster than before, and with greater coverage. Finally, we add `max_fetch_wait` (defaults to 250ms) to the receiver's configuration. This was added for speeding up the tests, but may also be useful for users of the receiver.
9317964
to
1b76806
Compare
Note to reviewer(s): I would suggest not looking at the diff, but open up kafka_exporter.go and kafka_exporter_test.go as if it's new code. |
Moving to draft for now, please resolve the conflict and mark ready for review again. |
func (c *kafkaConsumer) consumeLoop(handler sarama.ConsumerGroupHandler) { | ||
defer close(c.consumeLoopClosed) | ||
|
||
ctx := context.Background() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why not to use a passed context and rely on the background context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the loop does is call the consumer group's Consume method in a loop. There's only one way the loop should terminate, which is when the consumer group is closed, and that happens when Shutdown is called. Passing in a context would introduce another way the loop might exit, adding complexity.
Co-authored-by: Sean Marciniak <[email protected]>
Description
Remove code that is repeated for every signal, and implement a generic consumer that calls out to a signal-specific message handler.
We also update all the tests to use kfake rather than mocks so we're exercising all of the receiver. These tests are parallelised, so all tests run faster than before, and with greater coverage.
Finally, we add
max_fetch_wait
(defaults to 250ms) to the receiver's configuration. This was added for speeding up the tests, but may also be useful for users of the receiver.Link to tracking issue
N/A
Testing
See description - tests now use the real client, rather than mocks, using https://github.com/twmb/franz-go/tree/master/pkg/kfake.
Documentation
Updated README.