Skip to content

[stefexporter] Implement async exporting #39958

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

tigrannajaryan
Copy link
Member

Description

  • Introduced 2 new helpers: ConnManager and Sync2Async. These can be used by any other async exporters in the future. For now they are in stefexpoter internal package, but they have no coupling with stefexporter and can be moved to a shared package and used by any exporter.

  • The previous implementation flushed STEF connection data on every export call and waited for acknowledgement. This limited the throughput and hurt compression ratios. The new implementation eliminates both of these limitations. Flushing is now done more rarely and acknowledgement waiting is now asynchronous, which allows more data to be written to the same STEF/gRPC stream while waiting for the ack to be received.

Testing

Added unit tests and tested manually against the receiver in https://github.com/splunk/stef/tree/main/otelcol/internal/stefreceiver

Documentation

README.md updated.

@github-actions github-actions bot requested a review from dmitryax May 8, 2025 20:56
@tigrannajaryan tigrannajaryan force-pushed the tigran/stefexporter branch 2 times, most recently from cb6703e to 24a725a Compare May 8, 2025 21:22
- Introduced 2 new helpers: ConnManager and Sync2Async.
  These can be used by any other async exporters in the
  future. For now they are in stefexpoter internal package,
  but they have no coupling with stefexporter and can be
  moved to a shared package and used by any exporter.

- The previous implementation flushed STEF connection data
  on every export call and waited for acknowledgement.
  This limited the throughput and hurt compression ratios.
  The new implementation eliminates both of these
  limitations. Flushing is now done more rarely and
  acknowledgement waiting is now asynchronous, which allows
  more data to be written to the same STEF/gRPC stream
  while waiting for the ack to be received.
@tigrannajaryan tigrannajaryan force-pushed the tigran/stefexporter branch from 24a725a to 99f6b58 Compare May 8, 2025 21:33
- Introduced 2 new helpers: ConnManager and Sync2Async.
  These can be used by any other async exporters in the
  future. For now they are in stefexpoter internal package,
  but they have no coupling with stefexporter and can be
  moved to a shared package and used by any exporter.

- The previous implementation flushed STEF connection data
  on every export call and waited for acknowledgement.
  This limited the throughput and hurt compression ratios.
  The new implementation eliminates both of these
  limitations. Flushing is now done more rarely and
  acknowledgement waiting is now asynchronous, which allows
  more data to be written to the same STEF/gRPC stream
  while waiting for the ack to be received.
@tigrannajaryan tigrannajaryan marked this pull request as ready for review May 8, 2025 23:01
@tigrannajaryan tigrannajaryan requested a review from a team as a code owner May 8, 2025 23:01
@tigrannajaryan
Copy link
Member Author

I am going to request a Copilot review, curious what it can do.

@tigrannajaryan tigrannajaryan requested a review from Copilot May 9, 2025 02:41
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements asynchronous exporting by introducing two new helpers (ConnManager and Sync2Async) to improve throughput and compression ratios for metric exports.

  • Introduces asynchronous conversion via Sync2Async.
  • Replaces per-export connection flush/ack handling with a more efficient connection management strategy.
  • Updates testing and documentation to reflect new async behavior.

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
receiver/stefreceiver/go.mod Adds new dependency to support testing with clockwork.
exporter/stefexporter/internal/sync2async_test.go Provides tests for Sync2Async behavior; potential loop variable capture.
exporter/stefexporter/internal/sync2async.go Implements conversion logic from sync to async call.
exporter/stefexporter/internal/stefconn.go Implements STEF connection handling and acknowledgment.
exporter/stefexporter/internal/connmanager_test.go Adds tests for connection management including flush and reconnect.
exporter/stefexporter/exporter_test.go Adjusts tests to validate new async exporting behavior.
exporter/stefexporter/exporter.go Refactors connection handling and metric export using ConnManager and Sync2Async.
exporter/stefexporter/README.md Updates documentation with new settings and usage notes.
.chloggen/tigran_stefexporter.yaml Updates changelog to reflect async exporting implementation.

- Introduced 2 new helpers: ConnManager and Sync2Async.
  These can be used by any other async exporters in the
  future. For now they are in stefexpoter internal package,
  but they have no coupling with stefexporter and can be
  moved to a shared package and used by any exporter.

- The previous implementation flushed STEF connection data
  on every export call and waited for acknowledgement.
  This limited the throughput and hurt compression ratios.
  The new implementation eliminates both of these
  limitations. Flushing is now done more rarely and
  acknowledgement waiting is now asynchronous, which allows
  more data to be written to the same STEF/gRPC stream
  while waiting for the ack to be received.
@tigrannajaryan tigrannajaryan force-pushed the tigran/stefexporter branch from fb9f8a9 to 5afbbee Compare May 9, 2025 13:15
- Queuing, timeout and retry settings, particularly:
- The `timeout` setting controls how long the exporter waits for ACK of a data sent
over STEF/gRPC stream.
- The `num_consumers` setting defines how many unacknowledged batches can be in-flight.
Copy link
Member Author

@tigrannajaryan tigrannajaryan May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the setting does not quite match what it controls. I wanted to keep this existing setting and avoid creating a new one, but I am not sure now this was a very good idea. cc @dmitryax

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a regular exporter, it controls outbound concurrency as well, so I think the name is fine. The description clarifies it well enough

Comment on lines 111 to 115
logger *zap.Logger,
creator ConnCreator,
targetConnCount uint,
flushPeriod time.Duration,
reconnectPeriod time.Duration,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this may become a shared API in the future and will need to evolve it may make sense to use options or struct pattern to make parameter list extensible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to a settings struct.

targetConnCount uint,
flushPeriod time.Duration,
reconnectPeriod time.Duration,
) *ConnManager {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to validate inputs and return error (e.g. is flushPeriod==0 valid?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added validation.

// Flush the connection. This is typically to send any buffered data.
// Will be called periodically (see ConnManager flushPeriod) and
// before ConnManager.Stop returns.
Flush() error
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a context parameter here? Flush supposedly does I/O and can block.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Context parameter.


// Flush any pending data over the connection.
func (s *StefConn) Flush() error {
return s.writer.Flush()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can block. We need a way to cancel this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this honour the Context cancellation.

@tigrannajaryan tigrannajaryan force-pushed the tigran/stefexporter branch from 4a57bf6 to 52841b1 Compare May 9, 2025 18:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants