Skip to content

[graph-work-side-branch]: temp side branch for graph work #9692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 43 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1659eed
macaroons: remove context.TODO() in tests
ellemouton Apr 7, 2025
d52f729
kvdb/etcd: remove context.TODO() from test helpers
ellemouton Apr 7, 2025
62db6e2
lnd: pass context to `newServer` and `server.Start`
ellemouton Apr 7, 2025
9f6740e
discovery: thread context through to gossiper
ellemouton Apr 7, 2025
1c9c9d8
discovery: pass context through to reliable sender
ellemouton Apr 7, 2025
5193a9f
discovery: thread contexts to syncer
ellemouton Apr 7, 2025
3101f2a
discovery: thread contexts through sync manager
ellemouton Apr 7, 2025
1bc66db
discovery: pass context to ProcessRemoteAnnouncement
ellemouton Apr 7, 2025
3f9c554
discovery: pass context through to bootstrapper SampleNodeAddrs
ellemouton Apr 8, 2025
67a81a2
discovery: remove unnecessary context.Background() calls
ellemouton Apr 9, 2025
350de07
discovery: listen on ctx in any select
ellemouton Apr 9, 2025
6157399
graph/db: test clean-up
ellemouton Apr 5, 2025
b624a6a
graph/db: remove kvdb param from test helper
ellemouton Apr 5, 2025
e46c9f8
graph/db: remove kvdb.Backend from test helpers
ellemouton Apr 5, 2025
033ad38
graph/db: use only exported KVStore ForEachNode method in tests
ellemouton Apr 9, 2025
643f696
autopilot: start threading contexts through
ellemouton Apr 9, 2025
dea2265
autopilot: continue threading context
ellemouton Apr 9, 2025
5c1d21a
autopilot: update AttachmentHeuristics with context
ellemouton Apr 9, 2025
75bf82b
graph/db: remove unused Wipe method
ellemouton Mar 29, 2025
d381f03
graph/db: introduce ForEachSourceNodeChannel
ellemouton Mar 26, 2025
d079f86
graph/db: unexport various methods that expose `kvdb.RTx`
ellemouton Mar 27, 2025
14ce1ff
graph/db: use only exported KVStore methods in tests
ellemouton Apr 5, 2025
302a134
multi: remove kvdb.RTx from ForEachNodeChannel
ellemouton Mar 26, 2025
8b87367
discovery: revert passing ctx through to Start methods
ellemouton Apr 11, 2025
d01b392
autopilot: revert passing ctx to Start methods
ellemouton Apr 11, 2025
dedc51f
graph/db: let test alias be UTF-8 compatible
ellemouton Apr 5, 2025
424f63b
graph/db: update the `compareNodes` helper
ellemouton Apr 5, 2025
4ab0699
graph: test cleanup
ellemouton Mar 30, 2025
149abb9
channeldb: remove graph calls from tests
ellemouton Apr 5, 2025
f1db30a
batch: dont expose kvdb.RwTx in batch.SchedulerOptions
ellemouton Mar 30, 2025
83eb7e3
graph/db: introduce the V1Store interface
ellemouton Apr 5, 2025
9f79322
graph/db: use V1Store interface in ChannelGraph
ellemouton Apr 5, 2025
36d8bb2
graph/db: init KVStore outside of ChannelGraph
ellemouton Mar 30, 2025
8bf10b1
graph/db: make all ExtraOpaqueData valid TLV streams
ellemouton Apr 5, 2025
e6e6d72
lnwire: validate that gossip messages contain valid TLV
ellemouton May 7, 2025
8dec9cb
Merge pull request #9787 from ellemouton/graphSQL1-extra-opaque
ellemouton May 8, 2025
9d9bf71
graph/db: expand TestNodeInsertionAndDeletion
ellemouton May 8, 2025
6cd2770
graph/db: check for wrapped errors
ellemouton May 8, 2025
5aeab7c
graph/db: set empty Features and ExtraOpaqueData in tests
ellemouton May 8, 2025
5ebad9d
graph/db: use mainnet genisis hash in tests
ellemouton May 8, 2025
acad19e
graph/db: add test coverage for AddEdgeProof
ellemouton May 8, 2025
595077b
graph/db: let MakeTestGraph require no error internally
ellemouton May 9, 2025
e8ac280
Merge pull request #9796 from ellemouton/graphSQL4-test-coverage
guggero May 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions autopilot/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autopilot

import (
"bytes"
"context"
"fmt"
"math/rand"
"net"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/lnwire"
)

Expand Down Expand Up @@ -166,8 +168,9 @@ type Agent struct {
pendingOpens map[NodeID]LocalChannel
pendingMtx sync.Mutex

quit chan struct{}
wg sync.WaitGroup
quit chan struct{}
wg sync.WaitGroup
cancel fn.Option[context.CancelFunc]
}

// New creates a new instance of the Agent instantiated using the passed
Expand Down Expand Up @@ -202,17 +205,20 @@ func New(cfg Config, initialState []LocalChannel) (*Agent, error) {
func (a *Agent) Start() error {
var err error
a.started.Do(func() {
err = a.start()
ctx, cancel := context.WithCancel(context.Background())
a.cancel = fn.Some(cancel)

err = a.start(ctx)
})
return err
}

func (a *Agent) start() error {
func (a *Agent) start(ctx context.Context) error {
rand.Seed(time.Now().Unix())
log.Infof("Autopilot Agent starting")

a.wg.Add(1)
go a.controller()
go a.controller(ctx)

return nil
}
Expand All @@ -230,6 +236,7 @@ func (a *Agent) Stop() error {
func (a *Agent) stop() error {
log.Infof("Autopilot Agent stopping")

a.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
close(a.quit)
a.wg.Wait()

Expand Down Expand Up @@ -401,7 +408,7 @@ func mergeChanState(pendingChans map[NodeID]LocalChannel,
// and external state changes as a result of decisions it makes w.r.t channel
// allocation, or attributes affecting its control loop being updated by the
// backing Lightning Node.
func (a *Agent) controller() {
func (a *Agent) controller(ctx context.Context) {
defer a.wg.Done()

// We'll start off by assigning our starting balance, and injecting
Expand Down Expand Up @@ -502,6 +509,9 @@ func (a *Agent) controller() {
// immediately.
case <-a.quit:
return

case <-ctx.Done():
return
}

a.pendingMtx.Lock()
Expand Down Expand Up @@ -539,7 +549,7 @@ func (a *Agent) controller() {
log.Infof("Triggering attachment directive dispatch, "+
"total_funds=%v", a.totalBalance)

err := a.openChans(availableFunds, numChans, totalChans)
err := a.openChans(ctx, availableFunds, numChans, totalChans)
if err != nil {
log.Errorf("Unable to open channels: %v", err)
}
Expand All @@ -548,8 +558,8 @@ func (a *Agent) controller() {

// openChans queries the agent's heuristic for a set of channel candidates, and
// attempts to open channels to them.
func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
totalChans []LocalChannel) error {
func (a *Agent) openChans(ctx context.Context, availableFunds btcutil.Amount,
numChans uint32, totalChans []LocalChannel) error {

// As channel size we'll use the maximum channel size available.
chanSize := a.cfg.Constraints.MaxChanSize()
Expand Down Expand Up @@ -598,7 +608,9 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
selfPubBytes := a.cfg.Self.SerializeCompressed()
nodes := make(map[NodeID]struct{})
addresses := make(map[NodeID][]net.Addr)
if err := a.cfg.Graph.ForEachNode(func(node Node) error {
if err := a.cfg.Graph.ForEachNode(ctx, func(_ context.Context,
node Node) error {

nID := NodeID(node.PubKey())

// If we come across ourselves, them we'll continue in
Expand Down Expand Up @@ -636,7 +648,7 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
// graph.
log.Debugf("Scoring %d nodes for chan_size=%v", len(nodes), chanSize)
scores, err := a.cfg.Heuristic.NodeScores(
a.cfg.Graph, totalChans, chanSize, nodes,
ctx, a.cfg.Graph, totalChans, chanSize, nodes,
)
if err != nil {
return fmt.Errorf("unable to calculate node scores : %w", err)
Expand Down
7 changes: 4 additions & 3 deletions autopilot/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -85,9 +86,9 @@ func (m *mockHeuristic) Name() string {
return "mock"
}

func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []LocalChannel,
chanSize btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*NodeScore, error) {
func (m *mockHeuristic) NodeScores(_ context.Context, g ChannelGraph,
chans []LocalChannel, chanSize btcutil.Amount,
nodes map[NodeID]struct{}) (map[NodeID]*NodeScore, error) {

if m.nodeScoresArgs != nil {
directive := directiveArg{
Expand Down
7 changes: 5 additions & 2 deletions autopilot/betweenness_centrality.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"fmt"
"sync"
)
Expand Down Expand Up @@ -168,8 +169,10 @@ func betweennessCentrality(g *SimpleGraph, s int, centrality []float64) {
}

// Refresh recalculates and stores centrality values.
func (bc *BetweennessCentrality) Refresh(graph ChannelGraph) error {
cache, err := NewSimpleGraph(graph)
func (bc *BetweennessCentrality) Refresh(ctx context.Context,
graph ChannelGraph) error {

cache, err := NewSimpleGraph(ctx, graph)
if err != nil {
return err
}
Expand Down
11 changes: 9 additions & 2 deletions autopilot/betweenness_centrality_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -30,6 +31,9 @@ func TestBetweennessCentralityMetricConstruction(t *testing.T) {

// Tests that empty graph results in empty centrality result.
func TestBetweennessCentralityEmptyGraph(t *testing.T) {
t.Parallel()
ctx := context.Background()

centralityMetric, err := NewBetweennessCentralityMetric(1)
require.NoError(
t, err,
Expand All @@ -42,7 +46,7 @@ func TestBetweennessCentralityEmptyGraph(t *testing.T) {
require.NoError(t, err, "unable to create graph")

success := t.Run(chanGraph.name, func(t1 *testing.T) {
err = centralityMetric.Refresh(graph)
err = centralityMetric.Refresh(ctx, graph)
require.NoError(t1, err)

centrality := centralityMetric.GetMetric(false)
Expand All @@ -59,6 +63,9 @@ func TestBetweennessCentralityEmptyGraph(t *testing.T) {

// Test betweenness centrality calculating using an example graph.
func TestBetweennessCentralityWithNonEmptyGraph(t *testing.T) {
t.Parallel()
ctx := context.Background()

workers := []int{1, 3, 9, 100}

tests := []struct {
Expand Down Expand Up @@ -100,7 +107,7 @@ func TestBetweennessCentralityWithNonEmptyGraph(t *testing.T) {
t1, graph, centralityTestGraph,
)

err = metric.Refresh(graph)
err = metric.Refresh(ctx, graph)
require.NoError(t1, err)

for _, expected := range tests {
Expand Down
9 changes: 5 additions & 4 deletions autopilot/combinedattach.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"fmt"

"github.com/btcsuite/btcd/btcutil"
Expand Down Expand Up @@ -70,9 +71,9 @@ func (c *WeightedCombAttachment) Name() string {
// is the maximum possible improvement in connectivity.
//
// NOTE: This is a part of the AttachmentHeuristic interface.
func (c *WeightedCombAttachment) NodeScores(g ChannelGraph, chans []LocalChannel,
chanSize btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*NodeScore, error) {
func (c *WeightedCombAttachment) NodeScores(ctx context.Context, g ChannelGraph,
chans []LocalChannel, chanSize btcutil.Amount,
nodes map[NodeID]struct{}) (map[NodeID]*NodeScore, error) {

// We now query each heuristic to determine the score they give to the
// nodes for the given channel size.
Expand All @@ -81,7 +82,7 @@ func (c *WeightedCombAttachment) NodeScores(g ChannelGraph, chans []LocalChannel
log.Tracef("Getting scores from sub heuristic %v", h.Name())

s, err := h.NodeScores(
g, chans, chanSize, nodes,
ctx, g, chans, chanSize, nodes,
)
if err != nil {
return nil, fmt.Errorf("unable to get sub score: %w",
Expand Down
7 changes: 4 additions & 3 deletions autopilot/externalscoreattach.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -80,9 +81,9 @@ func (s *ExternalScoreAttachment) SetNodeScores(targetHeuristic string,
// not known will get a score of 0.
//
// NOTE: This is a part of the AttachmentHeuristic interface.
func (s *ExternalScoreAttachment) NodeScores(g ChannelGraph, chans []LocalChannel,
chanSize btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*NodeScore, error) {
func (s *ExternalScoreAttachment) NodeScores(_ context.Context, g ChannelGraph,
chans []LocalChannel, chanSize btcutil.Amount,
nodes map[NodeID]struct{}) (map[NodeID]*NodeScore, error) {

existingPeers := make(map[NodeID]struct{})
for _, c := range chans {
Expand Down
4 changes: 3 additions & 1 deletion autopilot/externalscoreattach_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot_test

import (
"context"
"testing"

"github.com/btcsuite/btcd/btcec/v2"
Expand All @@ -22,6 +23,7 @@ func randKey() (*btcec.PublicKey, error) {
// ExternalScoreAttachment correctly reflects the scores we set last.
func TestSetNodeScores(t *testing.T) {
t.Parallel()
ctx := context.Background()

const name = "externalscore"

Expand Down Expand Up @@ -62,7 +64,7 @@ func TestSetNodeScores(t *testing.T) {
q[nID] = struct{}{}
}
resp, err := h.NodeScores(
nil, nil, btcutil.Amount(btcutil.SatoshiPerBitcoin), q,
ctx, nil, nil, btcutil.Amount(btcutil.SatoshiPerBitcoin), q,
)
if err != nil {
t.Fatal(err)
Expand Down
33 changes: 23 additions & 10 deletions autopilot/graph.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"encoding/hex"
"net"
"sort"
Expand Down Expand Up @@ -80,7 +81,9 @@ func (d *dbNode) Addrs() []net.Addr {
// describes the active channel.
//
// NOTE: Part of the autopilot.Node interface.
func (d *dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
func (d *dbNode) ForEachChannel(ctx context.Context,
cb func(context.Context, ChannelEdge) error) error {

return d.tx.ForEachChannel(func(ei *models.ChannelEdgeInfo, ep,
_ *models.ChannelEdgePolicy) error {

Expand Down Expand Up @@ -108,7 +111,7 @@ func (d *dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
},
}

return cb(edge)
return cb(ctx, edge)
})
}

Expand All @@ -117,7 +120,9 @@ func (d *dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
// error, then execution should be terminated.
//
// NOTE: Part of the autopilot.ChannelGraph interface.
func (d *databaseChannelGraph) ForEachNode(cb func(Node) error) error {
func (d *databaseChannelGraph) ForEachNode(ctx context.Context,
cb func(context.Context, Node) error) error {

return d.db.ForEachNode(func(nodeTx graphdb.NodeRTx) error {
// We'll skip over any node that doesn't have any advertised
// addresses. As we won't be able to reach them to actually
Expand All @@ -129,7 +134,8 @@ func (d *databaseChannelGraph) ForEachNode(cb func(Node) error) error {
node := &dbNode{
tx: nodeTx,
}
return cb(node)

return cb(ctx, node)
})
}

Expand Down Expand Up @@ -185,7 +191,9 @@ func (nc dbNodeCached) Addrs() []net.Addr {
// describes the active channel.
//
// NOTE: Part of the autopilot.Node interface.
func (nc dbNodeCached) ForEachChannel(cb func(ChannelEdge) error) error {
func (nc dbNodeCached) ForEachChannel(ctx context.Context,
cb func(context.Context, ChannelEdge) error) error {

for cid, channel := range nc.channels {
edge := ChannelEdge{
ChanID: lnwire.NewShortChanIDFromInt(cid),
Expand All @@ -195,7 +203,7 @@ func (nc dbNodeCached) ForEachChannel(cb func(ChannelEdge) error) error {
},
}

if err := cb(edge); err != nil {
if err := cb(ctx, edge); err != nil {
return err
}
}
Expand All @@ -208,7 +216,9 @@ func (nc dbNodeCached) ForEachChannel(cb func(ChannelEdge) error) error {
// error, then execution should be terminated.
//
// NOTE: Part of the autopilot.ChannelGraph interface.
func (dc *databaseChannelGraphCached) ForEachNode(cb func(Node) error) error {
func (dc *databaseChannelGraphCached) ForEachNode(ctx context.Context,
cb func(context.Context, Node) error) error {

return dc.db.ForEachNodeCached(func(n route.Vertex,
channels map[uint64]*graphdb.DirectedChannel) error {

Expand All @@ -217,7 +227,8 @@ func (dc *databaseChannelGraphCached) ForEachNode(cb func(Node) error) error {
node: n,
channels: channels,
}
return cb(node)

return cb(ctx, node)
}
return nil
})
Expand Down Expand Up @@ -262,9 +273,11 @@ func (m memNode) Addrs() []net.Addr {
// describes the active channel.
//
// NOTE: Part of the autopilot.Node interface.
func (m memNode) ForEachChannel(cb func(ChannelEdge) error) error {
func (m memNode) ForEachChannel(ctx context.Context,
cb func(context.Context, ChannelEdge) error) error {

for _, channel := range m.chans {
if err := cb(channel); err != nil {
if err := cb(ctx, channel); err != nil {
return err
}
}
Expand Down
Loading
Loading