Skip to content

Commit 2e11f9d

Browse files
committed
discovery: pass context through to reliable sender
And remove a context.TODO() that was added in the previous commit.
1 parent 0be9d23 commit 2e11f9d

File tree

3 files changed

+48
-36
lines changed

3 files changed

+48
-36
lines changed

discovery/gossiper.go

+5-9
Original file line numberDiff line numberDiff line change
@@ -602,11 +602,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
602602
NotifyWhenOnline: cfg.NotifyWhenOnline,
603603
NotifyWhenOffline: cfg.NotifyWhenOffline,
604604
MessageStore: cfg.MessageStore,
605-
IsMsgStale: func(message lnwire.Message) bool {
606-
ctx := context.TODO()
607-
608-
return gossiper.isMsgStale(ctx, message)
609-
},
605+
IsMsgStale: gossiper.isMsgStale,
610606
})
611607

612608
return gossiper
@@ -678,7 +674,7 @@ func (d *AuthenticatedGossiper) start(ctx context.Context) error {
678674
// Start the reliable sender. In case we had any pending messages ready
679675
// to be sent when the gossiper was last shut down, we must continue on
680676
// our quest to deliver them to their respective peers.
681-
if err := d.reliableSender.Start(); err != nil {
677+
if err := d.reliableSender.Start(ctx); err != nil {
682678
return err
683679
}
684680

@@ -1889,7 +1885,7 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(ctx context.Context,
18891885
edgeInfo.Info, chanUpdate.ChannelFlags,
18901886
)
18911887
err := d.reliableSender.sendMessage(
1892-
chanUpdate, remotePubKey,
1888+
ctx, chanUpdate, remotePubKey,
18931889
)
18941890
if err != nil {
18951891
log.Errorf("Unable to reliably send %v for "+
@@ -3329,7 +3325,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
33293325
// Now we'll attempt to send the channel update message
33303326
// reliably to the remote peer in the background, so that we
33313327
// don't block if the peer happens to be offline at the moment.
3332-
err := d.reliableSender.sendMessage(upd, remotePubKey)
3328+
err := d.reliableSender.sendMessage(ctx, upd, remotePubKey)
33333329
if err != nil {
33343330
err := fmt.Errorf("unable to reliably send %v for "+
33353331
"channel=%v to peer=%x: %v", upd.MsgType(),
@@ -3464,7 +3460,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(ctx context.Context,
34643460
// Since the remote peer might not be online we'll call a
34653461
// method that will attempt to deliver the proof when it comes
34663462
// online.
3467-
err := d.reliableSender.sendMessage(ann, remotePubKey)
3463+
err := d.reliableSender.sendMessage(ctx, ann, remotePubKey)
34683464
if err != nil {
34693465
err := fmt.Errorf("unable to reliably send %v for "+
34703466
"channel=%v to peer=%x: %v", ann.MsgType(),

discovery/reliable_sender.go

+24-13
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package discovery
22

33
import (
4+
"context"
45
"sync"
56

7+
"github.com/lightningnetwork/lnd/fn/v2"
68
"github.com/lightningnetwork/lnd/lnpeer"
79
"github.com/lightningnetwork/lnd/lnwire"
810
)
@@ -28,7 +30,7 @@ type reliableSenderCfg struct {
2830

2931
// IsMsgStale determines whether a message retrieved from the backing
3032
// MessageStore is seen as stale by the current graph.
31-
IsMsgStale func(lnwire.Message) bool
33+
IsMsgStale func(context.Context, lnwire.Message) bool
3234
}
3335

3436
// peerManager contains the set of channels required for the peerHandler to
@@ -59,8 +61,9 @@ type reliableSender struct {
5961
activePeers map[[33]byte]peerManager
6062
activePeersMtx sync.Mutex
6163

62-
wg sync.WaitGroup
63-
quit chan struct{}
64+
wg sync.WaitGroup
65+
quit chan struct{}
66+
cancel fn.Option[context.CancelFunc]
6467
}
6568

6669
// newReliableSender returns a new reliableSender backed by the given config.
@@ -73,10 +76,13 @@ func newReliableSender(cfg *reliableSenderCfg) *reliableSender {
7376
}
7477

7578
// Start spawns message handlers for any peers with pending messages.
76-
func (s *reliableSender) Start() error {
79+
func (s *reliableSender) Start(ctx context.Context) error {
7780
var err error
7881
s.start.Do(func() {
79-
err = s.resendPendingMsgs()
82+
ctx, cancel := context.WithCancel(ctx)
83+
s.cancel = fn.Some(cancel)
84+
85+
err = s.resendPendingMsgs(ctx)
8086
})
8187
return err
8288
}
@@ -87,6 +93,7 @@ func (s *reliableSender) Stop() {
8793
log.Debugf("reliableSender is stopping")
8894
defer log.Debugf("reliableSender stopped")
8995

96+
s.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
9097
close(s.quit)
9198
s.wg.Wait()
9299
})
@@ -96,7 +103,9 @@ func (s *reliableSender) Stop() {
96103
// event that the peer is currently offline, this will only write the message to
97104
// disk. Once the peer reconnects, this message, along with any others pending,
98105
// will be sent to the peer.
99-
func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error {
106+
func (s *reliableSender) sendMessage(ctx context.Context, msg lnwire.Message,
107+
peerPubKey [33]byte) error {
108+
100109
// We'll start by persisting the message to disk. This allows us to
101110
// resend the message upon restarts and peer reconnections.
102111
if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil {
@@ -106,7 +115,7 @@ func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) er
106115
// Then, we'll spawn a peerHandler for this peer to handle resending its
107116
// pending messages while taking into account its connection lifecycle.
108117
spawnHandler:
109-
msgHandler, ok := s.spawnPeerHandler(peerPubKey)
118+
msgHandler, ok := s.spawnPeerHandler(ctx, peerPubKey)
110119

111120
// If the handler wasn't previously active, we can exit now as we know
112121
// that the message will be sent once the peer online notification is
@@ -134,7 +143,7 @@ spawnHandler:
134143
// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't
135144
// one already active. The boolean returned signals whether there was already
136145
// one active or not.
137-
func (s *reliableSender) spawnPeerHandler(
146+
func (s *reliableSender) spawnPeerHandler(ctx context.Context,
138147
peerPubKey [33]byte) (peerManager, bool) {
139148

140149
s.activePeersMtx.Lock()
@@ -152,7 +161,7 @@ func (s *reliableSender) spawnPeerHandler(
152161
// peerHandler.
153162
if !ok {
154163
s.wg.Add(1)
155-
go s.peerHandler(msgHandler, peerPubKey)
164+
go s.peerHandler(ctx, msgHandler, peerPubKey)
156165
}
157166

158167
return msgHandler, ok
@@ -164,7 +173,9 @@ func (s *reliableSender) spawnPeerHandler(
164173
// offline will be queued and sent once the peer reconnects.
165174
//
166175
// NOTE: This must be run as a goroutine.
167-
func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) {
176+
func (s *reliableSender) peerHandler(ctx context.Context, peerMgr peerManager,
177+
peerPubKey [33]byte) {
178+
168179
defer s.wg.Done()
169180

170181
// We'll start by requesting a notification for when the peer
@@ -252,7 +263,7 @@ out:
252263
// check whether it's stale. This guarantees that
253264
// AnnounceSignatures are sent at least once if we happen to
254265
// already have signatures for both parties.
255-
if s.cfg.IsMsgStale(msg) {
266+
if s.cfg.IsMsgStale(ctx, msg) {
256267
err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey)
257268
if err != nil {
258269
log.Errorf("Unable to remove stale %v message "+
@@ -321,7 +332,7 @@ out:
321332

322333
// resendPendingMsgs retrieves and sends all of the messages within the message
323334
// store that should be reliably sent to their respective peers.
324-
func (s *reliableSender) resendPendingMsgs() error {
335+
func (s *reliableSender) resendPendingMsgs(ctx context.Context) error {
325336
// Fetch all of the peers for which we have pending messages for and
326337
// spawn a peerMsgHandler for each. Once the peer is seen as online, all
327338
// of the pending messages will be sent.
@@ -331,7 +342,7 @@ func (s *reliableSender) resendPendingMsgs() error {
331342
}
332343

333344
for peer := range peers {
334-
s.spawnPeerHandler(peer)
345+
s.spawnPeerHandler(ctx, peer)
335346
}
336347

337348
return nil

discovery/reliable_sender_test.go

+19-14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package discovery
22

33
import (
4+
"context"
45
"fmt"
56
"sync/atomic"
67
"testing"
@@ -11,6 +12,7 @@ import (
1112
"github.com/lightningnetwork/lnd/lnpeer"
1213
"github.com/lightningnetwork/lnd/lntest/wait"
1314
"github.com/lightningnetwork/lnd/lnwire"
15+
"github.com/stretchr/testify/require"
1416
)
1517

1618
// newTestReliableSender creates a new reliable sender instance used for
@@ -32,7 +34,7 @@ func newTestReliableSender(t *testing.T) *reliableSender {
3234
return c
3335
},
3436
MessageStore: newMockMessageStore(),
35-
IsMsgStale: func(lnwire.Message) bool {
37+
IsMsgStale: func(context.Context, lnwire.Message) bool {
3638
return false
3739
},
3840
}
@@ -69,6 +71,7 @@ func assertMsgsSent(t *testing.T, msgChan chan lnwire.Message,
6971
// a peer while taking into account its connection lifecycle works as expected.
7072
func TestReliableSenderFlow(t *testing.T) {
7173
t.Parallel()
74+
ctx := context.Background()
7275

7376
reliableSender := newTestReliableSender(t)
7477

@@ -98,9 +101,8 @@ func TestReliableSenderFlow(t *testing.T) {
98101
msg1 := randChannelUpdate()
99102
var peerPubKey [33]byte
100103
copy(peerPubKey[:], pubKey.SerializeCompressed())
101-
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil {
102-
t.Fatalf("unable to reliably send message: %v", err)
103-
}
104+
err := reliableSender.sendMessage(ctx, msg1, peerPubKey)
105+
require.NoError(t, err)
104106

105107
// Since there isn't a peerHandler for this peer currently active due to
106108
// this being the first message being sent reliably, we should expect to
@@ -114,7 +116,7 @@ func TestReliableSenderFlow(t *testing.T) {
114116

115117
// We'll then attempt to send another additional message reliably.
116118
msg2 := randAnnounceSignatures()
117-
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil {
119+
if err := reliableSender.sendMessage(ctx, msg2, peerPubKey); err != nil {
118120
t.Fatalf("unable to reliably send message: %v", err)
119121
}
120122

@@ -145,9 +147,8 @@ func TestReliableSenderFlow(t *testing.T) {
145147

146148
// Then, we'll send one more message reliably.
147149
msg3 := randChannelUpdate()
148-
if err := reliableSender.sendMessage(msg3, peerPubKey); err != nil {
149-
t.Fatalf("unable to reliably send message: %v", err)
150-
}
150+
err = reliableSender.sendMessage(ctx, msg3, peerPubKey)
151+
require.NoError(t, err)
151152

152153
// Again, this should not request another peer online notification
153154
// request since we are currently waiting for the peer to be offline.
@@ -188,6 +189,7 @@ func TestReliableSenderFlow(t *testing.T) {
188189
// them as stale.
189190
func TestReliableSenderStaleMessages(t *testing.T) {
190191
t.Parallel()
192+
ctx := context.Background()
191193

192194
reliableSender := newTestReliableSender(t)
193195

@@ -206,7 +208,9 @@ func TestReliableSenderStaleMessages(t *testing.T) {
206208

207209
// We'll also override IsMsgStale to mark all messages as stale as we're
208210
// interested in testing the stale message behavior.
209-
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool {
211+
reliableSender.cfg.IsMsgStale = func(_ context.Context,
212+
_ lnwire.Message) bool {
213+
210214
return true
211215
}
212216

@@ -215,7 +219,7 @@ func TestReliableSenderStaleMessages(t *testing.T) {
215219
msg1 := randAnnounceSignatures()
216220
var peerPubKey [33]byte
217221
copy(peerPubKey[:], pubKey.SerializeCompressed())
218-
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil {
222+
if err := reliableSender.sendMessage(ctx, msg1, peerPubKey); err != nil {
219223
t.Fatalf("unable to reliably send message: %v", err)
220224
}
221225

@@ -265,14 +269,15 @@ func TestReliableSenderStaleMessages(t *testing.T) {
265269
}
266270

267271
// Override IsMsgStale to no longer mark messages as stale.
268-
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool {
272+
reliableSender.cfg.IsMsgStale = func(_ context.Context,
273+
_ lnwire.Message) bool {
274+
269275
return false
270276
}
271277

272278
// We'll request the message to be sent reliably.
273-
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil {
274-
t.Fatalf("unable to reliably send message: %v", err)
275-
}
279+
err = reliableSender.sendMessage(ctx, msg2, peerPubKey)
280+
require.NoError(t, err)
276281

277282
// We should see an online notification request indicating that a new
278283
// peerHandler has been spawned since it was previously torn down.

0 commit comments

Comments
 (0)