Skip to content

Commit 4a57bf6

Browse files
Fixes based on review
1 parent 5afbbee commit 4a57bf6

File tree

4 files changed

+141
-54
lines changed

4 files changed

+141
-54
lines changed

exporter/stefexporter/exporter.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,18 @@ func (s *stefExporter) Start(ctx context.Context, host component.Host) error {
7373

7474
// Create a connection creator and manager to take care of the connections.
7575
connCreator := internal.NewStefConnCreator(s.set.Logger, s.grpcConn, s.compression)
76-
s.connMan = internal.NewConnManager(s.set.Logger, connCreator, connCount, flushPeriod, reconnectPeriod)
76+
set := internal.ConnManagerSettings{
77+
Logger: s.set.Logger,
78+
Creator: connCreator,
79+
TargetConnCount: connCount,
80+
FlushPeriod: flushPeriod,
81+
ReconnectPeriod: reconnectPeriod,
82+
}
83+
s.connMan, err = internal.NewConnManager(set)
84+
if err != nil {
85+
return err
86+
}
87+
7788
s.connMan.Start()
7889

7990
// Wrap async implementation of sendMetricsAsync into a sync-callable API.
@@ -90,7 +101,7 @@ func (s *stefExporter) Start(ctx context.Context, host component.Host) error {
90101
return
91102
}
92103
// Connection is established. Return it, this is all we needed for now.
93-
s.connMan.Release(conn)
104+
s.connMan.Release(ctx, conn)
94105
}()
95106

96107
s.started = true
@@ -174,7 +185,7 @@ func (s *stefExporter) sendMetricsAsync(
174185
stefConn.OnAck(expectedAckID, resultChan)
175186

176187
// We are done with the connection.
177-
s.connMan.Release(conn)
188+
s.connMan.Release(ctx, conn)
178189

179190
return internal.DataID(expectedAckID), nil
180191
}

exporter/stefexporter/internal/connmanager.go

+76-34
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@ type ConnManager struct {
2828
// Outside of tests it is a real clock.
2929
clock clockwork.Clock
3030

31-
// Number of connections desirable to maintain.
3231
targetConnCount uint
3332

3433
// Number of current connections, collectively in all pools or acquired.
3534
curConnCount atomic.Int64
3635

37-
// ConnCreator is used to create new connections.
3836
connCreator ConnCreator
3937

4038
// Connection pools. curConnCount connections are either in one
@@ -43,13 +41,7 @@ type ConnManager struct {
4341
idleConns chan *ManagedConn // Ready to be acquired.
4442
recreateConns chan *ManagedConn // Pending to be recreated.
4543

46-
// Approximate period to wait before flushing connections after
47-
// the connection is released by the user. Setting this value to >0 avoids
48-
// unnecessary flushes when the connection is acquired and released frequently.
49-
flushPeriod time.Duration
50-
51-
// Period to reconnect connections. Each connection is periodically
52-
// reconnected approximately every reconnectPeriod.
44+
flushPeriod time.Duration
5345
reconnectPeriod time.Duration
5446

5547
// Flags to indicate if the goroutines are stopped.
@@ -104,28 +96,59 @@ type Conn interface {
10496
// Flush the connection. This is typically to send any buffered data.
10597
// Will be called periodically (see ConnManager flushPeriod) and
10698
// before ConnManager.Stop returns.
107-
Flush() error
99+
Flush(context.Context) error
100+
}
101+
102+
type ConnManagerSettings struct {
103+
Logger *zap.Logger
104+
105+
// Creator helps create new connections.
106+
Creator ConnCreator
107+
108+
// TargetConnCount is the number of connections desirable to maintain.
109+
// Must be >0.
110+
TargetConnCount uint
111+
112+
// FlushPeriod is the approximate period to wait before flushing connections after
113+
// the user releases the connection. Setting this value to >0 avoids
114+
// unnecessary flushes when the connection is acquired and released frequently.
115+
// Setting this to 0 ensures flushing is done after every connection release.
116+
FlushPeriod time.Duration
117+
118+
// ReconnectPeriod is the interval to reconnect connections. Each connection is
119+
// periodically reconnected approximately every reconnectPeriod.
120+
ReconnectPeriod time.Duration
108121
}
109122

110-
func NewConnManager(
111-
logger *zap.Logger,
112-
creator ConnCreator,
113-
targetConnCount uint,
114-
flushPeriod time.Duration,
115-
reconnectPeriod time.Duration,
116-
) *ConnManager {
123+
func NewConnManager(set ConnManagerSettings) (*ConnManager, error) {
124+
if set.Logger == nil {
125+
set.Logger = zap.NewNop()
126+
}
127+
128+
if set.TargetConnCount == 0 {
129+
return nil, errors.New("TargetConnCount must be >0")
130+
}
131+
132+
if set.FlushPeriod <= 0 {
133+
return nil, errors.New("FlushPeriod must be >=0")
134+
}
135+
136+
if set.ReconnectPeriod <= 0 {
137+
return nil, errors.New("ReconnectPeriod must be >0")
138+
}
139+
117140
return &ConnManager{
118-
logger: logger,
141+
logger: set.Logger,
119142
clock: clockwork.NewRealClock(),
120-
connCreator: creator,
121-
targetConnCount: targetConnCount,
122-
idleConns: make(chan *ManagedConn, targetConnCount),
123-
recreateConns: make(chan *ManagedConn, targetConnCount),
124-
flushPeriod: flushPeriod,
125-
reconnectPeriod: reconnectPeriod,
143+
connCreator: set.Creator,
144+
targetConnCount: set.TargetConnCount,
145+
idleConns: make(chan *ManagedConn, set.TargetConnCount),
146+
recreateConns: make(chan *ManagedConn, set.TargetConnCount),
147+
flushPeriod: set.FlushPeriod,
148+
reconnectPeriod: set.ReconnectPeriod,
126149
stoppedCond: NewCancellableCond(),
127150
stopSignal: make(chan struct{}),
128-
}
151+
}, nil
129152
}
130153

131154
// Start starts the connection manager. It will immediately start
@@ -194,7 +217,7 @@ func (c *ConnManager) closeAll(ctx context.Context) error {
194217
if conn.conn != nil {
195218
// Flush if needs a flush and is not discarded.
196219
if !discarded && conn.needsFlush {
197-
if err := conn.conn.Flush(); err != nil {
220+
if err := conn.conn.Flush(ctx); err != nil {
198221
c.logger.Debug("Failed to flush connection", zap.Error(err))
199222
errs = append(errs, err)
200223
continue
@@ -240,14 +263,14 @@ func (c *ConnManager) Acquire(ctx context.Context) (*ManagedConn, error) {
240263
// If the connection was last flushed more than flushPeriod ago, it will
241264
// be flushed otherwise it will be marked as needing to be flushed at the
242265
// next opportunity.
243-
func (c *ConnManager) Release(conn *ManagedConn) {
266+
func (c *ConnManager) Release(ctx context.Context, conn *ManagedConn) {
244267
if !conn.isAcquired {
245268
panic("connection is not acquired")
246269
}
247270
conn.isAcquired = false
248-
if c.clock.Since(conn.lastFlush) >= c.flushPeriod {
271+
if c.clock.Since(conn.lastFlush) >= c.flushPeriod || c.flushPeriod == 0 {
249272
// Time to flush the connection.
250-
if err := conn.conn.Flush(); err != nil {
273+
if err := conn.conn.Flush(ctx); err != nil {
251274
c.logger.Error("Failed to flush connection. Closing connection.", zap.Error(err))
252275
// Something went wrong, we need to recreate the connection since it
253276
// may no longer be usable.
@@ -282,11 +305,23 @@ func (c *ConnManager) flusher() {
282305
c.stoppedCond.Cond.Broadcast()
283306
}()
284307

308+
if c.flushPeriod == 0 {
309+
// flusher is not needed, we will always flush immediately in Release().
310+
return
311+
}
312+
285313
ticker := c.clock.NewTicker(c.flushPeriod)
286314

315+
// Context that cancels on stopSignal.
316+
ctx, cancel := context.WithCancel(context.Background())
317+
go func() {
318+
defer cancel()
319+
<-c.stopSignal
320+
}()
321+
287322
for {
288323
select {
289-
case <-c.stopSignal:
324+
case <-ctx.Done():
290325
return
291326
case <-ticker.Chan():
292327
loop:
@@ -297,7 +332,7 @@ func (c *ConnManager) flusher() {
297332
case conn := <-c.idleConns:
298333
if conn.needsFlush {
299334
conn.needsFlush = false
300-
if err := conn.conn.Flush(); err != nil {
335+
if err := conn.conn.Flush(ctx); err != nil {
301336
c.logger.Error("Failed to flush connection. Closing connection.", zap.Error(err))
302337
// Something went wrong, we need to recreate the connection since it
303338
// may no longer be usable.
@@ -329,19 +364,26 @@ func (c *ConnManager) durationLimiter() {
329364
c.stoppedCond.Cond.Broadcast()
330365
}()
331366

367+
// Context that cancels on stopSignal.
368+
ctx, cancel := context.WithCancel(context.Background())
369+
go func() {
370+
defer cancel()
371+
<-c.stopSignal
372+
}()
373+
332374
// Each connection will be reconnected at approximately reconnectPeriod interval.
333375
// We reconnect one per tick.
334376
ticker := c.clock.NewTicker(c.reconnectPeriod / time.Duration(c.targetConnCount))
335377
defer ticker.Stop()
336378
for {
337379
select {
338-
case <-c.stopSignal:
380+
case <-ctx.Done():
339381
return
340382
case <-ticker.Chan():
341383
// Find an idle connection
342384
var conn *ManagedConn
343385
select {
344-
case <-c.stopSignal:
386+
case <-ctx.Done():
345387
return
346388
case conn = <-c.idleConns:
347389
}
@@ -351,7 +393,7 @@ func (c *ConnManager) durationLimiter() {
351393
if conn.needsFlush {
352394
conn.needsFlush = false
353395
// Flush it first.
354-
if err := conn.conn.Flush(); err != nil {
396+
if err := conn.conn.Flush(ctx); err != nil {
355397
c.logger.Error("Failed to flush connection. Closing connection.", zap.Error(err))
356398
}
357399
}

exporter/stefexporter/internal/connmanager_test.go

+18-15
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (m *mockConn) Close(context.Context) error {
4141
return nil
4242
}
4343

44-
func (m *mockConn) Flush() error {
44+
func (m *mockConn) Flush(ctx context.Context) error {
4545
m.mux.Lock()
4646
m.flushed = true
4747
m.mux.Unlock()
@@ -81,13 +81,16 @@ func doTest(t *testing.T, test testDef) {
8181
for connCount := uint(1); connCount <= 2; connCount++ {
8282
connCreator := &mockConnCreator{}
8383

84-
cm := NewConnManager(
85-
zap.NewNop(),
86-
connCreator,
87-
connCount,
88-
flushPeriod,
89-
reconnectPeriod,
90-
)
84+
set := ConnManagerSettings{
85+
Logger: zap.NewNop(),
86+
Creator: connCreator,
87+
TargetConnCount: connCount,
88+
FlushPeriod: flushPeriod,
89+
ReconnectPeriod: reconnectPeriod,
90+
}
91+
cm, err := NewConnManager(set)
92+
require.NoError(t, err)
93+
9194
if test.clock != nil {
9295
cm.clock = test.clock
9396
}
@@ -119,7 +122,7 @@ func TestConnManagerAcquireRelease(t *testing.T) {
119122
conn, err := cm.Acquire(context.Background())
120123
require.NoError(t, err)
121124
require.NotNil(t, conn)
122-
cm.Release(conn)
125+
cm.Release(context.Background(), conn)
123126
},
124127
},
125128
)
@@ -138,7 +141,7 @@ func TestConnManagerAcquireReleaseMany(t *testing.T) {
138141
conns = append(conns, conn)
139142
}
140143
for _, conn := range conns {
141-
cm.Release(conn)
144+
cm.Release(context.Background(), conn)
142145
}
143146
},
144147
afterStopFunc: func(_ *mockConnCreator, _ uint) {
@@ -174,7 +177,7 @@ func TestConnManagerAcquireDiscardAcquire(t *testing.T) {
174177
conns = append(conns, conn)
175178
}
176179
for _, conn := range conns {
177-
cm.Release(conn)
180+
cm.Release(context.Background(), conn)
178181
}
179182
},
180183
afterStopFunc: func(creator *mockConnCreator, connCount uint) {
@@ -198,7 +201,7 @@ func TestConnManagerAcquireReleaseConcurrent(t *testing.T) {
198201
if err != nil {
199202
return
200203
}
201-
cm.Release(conn)
204+
cm.Release(context.Background(), conn)
202205
}()
203206
}
204207
wg.Wait()
@@ -262,7 +265,7 @@ func TestConnManagerFlush(t *testing.T) {
262265

263266
// Advance the clock so that Release() trigger flush.
264267
cm.clock.(*clockwork.FakeClock).Advance(flushPeriod)
265-
cm.Release(conn)
268+
cm.Release(context.Background(), conn)
266269

267270
// Make sure the flush is done.
268271
assert.Eventually(
@@ -285,7 +288,7 @@ func TestConnManagerFlush(t *testing.T) {
285288
assert.False(t, conn.conn.(*mockConn).Flushed())
286289

287290
// Release the connection first.
288-
cm.Release(conn)
291+
cm.Release(context.Background(), conn)
289292

290293
// Advance the clock so that the connection is flush in the background.
291294
cm.clock.(*clockwork.FakeClock).Advance(flushPeriod)
@@ -311,7 +314,7 @@ func TestConnManagerReconnector(t *testing.T) {
311314
conn, err := cm.Acquire(context.Background())
312315
require.NoError(t, err)
313316
require.NotNil(t, conn)
314-
cm.Release(conn)
317+
cm.Release(context.Background(), conn)
315318

316319
assert.Eventually(
317320
t, func() bool {

0 commit comments

Comments
 (0)