-
Notifications
You must be signed in to change notification settings - Fork 2.7k
[stefexporter] Implement async exporting #39958
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[stefexporter] Implement async exporting #39958
Conversation
cb6703e
to
24a725a
Compare
- Introduced 2 new helpers: ConnManager and Sync2Async. These can be used by any other async exporters in the future. For now they are in stefexpoter internal package, but they have no coupling with stefexporter and can be moved to a shared package and used by any exporter. - The previous implementation flushed STEF connection data on every export call and waited for acknowledgement. This limited the throughput and hurt compression ratios. The new implementation eliminates both of these limitations. Flushing is now done more rarely and acknowledgement waiting is now asynchronous, which allows more data to be written to the same STEF/gRPC stream while waiting for the ack to be received.
24a725a
to
99f6b58
Compare
- Introduced 2 new helpers: ConnManager and Sync2Async. These can be used by any other async exporters in the future. For now they are in stefexpoter internal package, but they have no coupling with stefexporter and can be moved to a shared package and used by any exporter. - The previous implementation flushed STEF connection data on every export call and waited for acknowledgement. This limited the throughput and hurt compression ratios. The new implementation eliminates both of these limitations. Flushing is now done more rarely and acknowledgement waiting is now asynchronous, which allows more data to be written to the same STEF/gRPC stream while waiting for the ack to be received.
I am going to request a Copilot review, curious what it can do. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements asynchronous exporting by introducing two new helpers (ConnManager and Sync2Async) to improve throughput and compression ratios for metric exports.
- Introduces asynchronous conversion via Sync2Async.
- Replaces per-export connection flush/ack handling with a more efficient connection management strategy.
- Updates testing and documentation to reflect new async behavior.
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
receiver/stefreceiver/go.mod | Adds new dependency to support testing with clockwork. |
exporter/stefexporter/internal/sync2async_test.go | Provides tests for Sync2Async behavior; potential loop variable capture. |
exporter/stefexporter/internal/sync2async.go | Implements conversion logic from sync to async call. |
exporter/stefexporter/internal/stefconn.go | Implements STEF connection handling and acknowledgment. |
exporter/stefexporter/internal/connmanager_test.go | Adds tests for connection management including flush and reconnect. |
exporter/stefexporter/exporter_test.go | Adjusts tests to validate new async exporting behavior. |
exporter/stefexporter/exporter.go | Refactors connection handling and metric export using ConnManager and Sync2Async. |
exporter/stefexporter/README.md | Updates documentation with new settings and usage notes. |
.chloggen/tigran_stefexporter.yaml | Updates changelog to reflect async exporting implementation. |
- Introduced 2 new helpers: ConnManager and Sync2Async. These can be used by any other async exporters in the future. For now they are in stefexpoter internal package, but they have no coupling with stefexporter and can be moved to a shared package and used by any exporter. - The previous implementation flushed STEF connection data on every export call and waited for acknowledgement. This limited the throughput and hurt compression ratios. The new implementation eliminates both of these limitations. Flushing is now done more rarely and acknowledgement waiting is now asynchronous, which allows more data to be written to the same STEF/gRPC stream while waiting for the ack to be received.
fb9f8a9
to
5afbbee
Compare
- Queuing, timeout and retry settings, particularly: | ||
- The `timeout` setting controls how long the exporter waits for ACK of a data sent | ||
over STEF/gRPC stream. | ||
- The `num_consumers` setting defines how many unacknowledged batches can be in-flight. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of the setting does not quite match what it controls. I wanted to keep this existing setting and avoid creating a new one, but I am not sure now this was a very good idea. cc @dmitryax
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a regular exporter, it controls outbound concurrency as well, so I think the name is fine. The description clarifies it well enough
logger *zap.Logger, | ||
creator ConnCreator, | ||
targetConnCount uint, | ||
flushPeriod time.Duration, | ||
reconnectPeriod time.Duration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this may become a shared API in the future and will need to evolve it may make sense to use options or struct pattern to make parameter list extensible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to a settings struct.
targetConnCount uint, | ||
flushPeriod time.Duration, | ||
reconnectPeriod time.Duration, | ||
) *ConnManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to validate inputs and return error (e.g. is flushPeriod==0 valid?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added validation.
// Flush the connection. This is typically to send any buffered data. | ||
// Will be called periodically (see ConnManager flushPeriod) and | ||
// before ConnManager.Stop returns. | ||
Flush() error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want a context parameter here? Flush supposedly does I/O and can block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added Context parameter.
|
||
// Flush any pending data over the connection. | ||
func (s *StefConn) Flush() error { | ||
return s.writer.Flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can block. We need a way to cancel this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made this honour the Context cancellation.
4a57bf6
to
52841b1
Compare
Description
Introduced 2 new helpers: ConnManager and Sync2Async. These can be used by any other async exporters in the future. For now they are in stefexpoter internal package, but they have no coupling with stefexporter and can be moved to a shared package and used by any exporter.
The previous implementation flushed STEF connection data on every export call and waited for acknowledgement. This limited the throughput and hurt compression ratios. The new implementation eliminates both of these limitations. Flushing is now done more rarely and acknowledgement waiting is now asynchronous, which allows more data to be written to the same STEF/gRPC stream while waiting for the ack to be received.
Testing
Added unit tests and tested manually against the receiver in https://github.com/splunk/stef/tree/main/otelcol/internal/stefreceiver
Documentation
README.md updated.