Skip to content

Commit 739d020

Browse files
committed
Improved streaming of pluggable-discoveries events (WIP)
Now the DiscoveryManager is able to start the discoveries and add/remove them in a thread-safe way. Also the watchers may connect and disconnect seamlessly at any time, the incoming events from the discovery are broadcasted correctly to each active watcher. This refactoring dramatically simplifies the DiscoveryManager design.
1 parent d266900 commit 739d020

File tree

5 files changed

+151
-312
lines changed

5 files changed

+151
-312
lines changed

arduino/discovery/discoverymanager/discoverymanager.go

Lines changed: 107 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ import (
2828
// DiscoveryManager is required to handle multiple pluggable-discovery that
2929
// may be shared across platforms
3030
type DiscoveryManager struct {
31-
discoveriesMutex sync.Mutex
32-
discoveries map[string]*discovery.PluggableDiscovery
31+
discoveriesMutex sync.Mutex
32+
discoveries map[string]*discovery.PluggableDiscovery
33+
discoveriesRunning bool
34+
feed chan *discovery.Event
35+
watchersMutex sync.Mutex
36+
watchers map[*PortWatcher]bool
3337
}
3438

3539
var tr = i18n.Tr
@@ -38,15 +42,20 @@ var tr = i18n.Tr
3842
func New() *DiscoveryManager {
3943
return &DiscoveryManager{
4044
discoveries: map[string]*discovery.PluggableDiscovery{},
45+
watchers: map[*PortWatcher]bool{},
46+
feed: make(chan *discovery.Event, 50),
4147
}
4248
}
4349

4450
// Clear resets the DiscoveryManager to its initial state
4551
func (dm *DiscoveryManager) Clear() {
46-
dm.QuitAll()
4752
dm.discoveriesMutex.Lock()
48-
defer dm.discoveriesMutex.Unlock()
53+
for _, d := range dm.discoveries {
54+
d.Quit()
55+
logrus.Infof("Closed and removed discovery %s", d.GetID())
56+
}
4957
dm.discoveries = map[string]*discovery.PluggableDiscovery{}
58+
dm.discoveriesMutex.Unlock()
5059
}
5160

5261
// IDs returns the list of discoveries' ids in this DiscoveryManager
@@ -60,233 +69,139 @@ func (dm *DiscoveryManager) IDs() []string {
6069
return ids
6170
}
6271

63-
// Add adds a discovery to the list of managed discoveries
64-
func (dm *DiscoveryManager) Add(disc *discovery.PluggableDiscovery) error {
65-
id := disc.GetID()
72+
// Start starts all the discoveries in this DiscoveryManager.
73+
// If the discoveries are already running, this function does nothing.
74+
func (dm *DiscoveryManager) Start() {
6675
dm.discoveriesMutex.Lock()
6776
defer dm.discoveriesMutex.Unlock()
68-
if _, has := dm.discoveries[id]; has {
69-
return errors.Errorf(tr("pluggable discovery already added: %s"), id)
77+
if dm.discoveriesRunning {
78+
return
7079
}
71-
dm.discoveries[id] = disc
72-
return nil
73-
}
7480

75-
// remove quits and deletes the discovery with specified id
76-
// from the discoveries managed by this DiscoveryManager
77-
func (dm *DiscoveryManager) remove(id string) {
78-
dm.discoveriesMutex.Lock()
79-
d := dm.discoveries[id]
80-
delete(dm.discoveries, id)
81-
dm.discoveriesMutex.Unlock()
82-
d.Quit()
83-
logrus.Infof("Closed and removed discovery %s", id)
84-
}
81+
go dm.feeder()
8582

86-
// parallelize runs function f concurrently for each discovery.
87-
// Returns a list of errors returned by each call of f.
88-
func (dm *DiscoveryManager) parallelize(f func(d *discovery.PluggableDiscovery) error) []error {
8983
var wg sync.WaitGroup
90-
errChan := make(chan error)
91-
dm.discoveriesMutex.Lock()
92-
discoveries := []*discovery.PluggableDiscovery{}
9384
for _, d := range dm.discoveries {
94-
discoveries = append(discoveries, d)
95-
}
96-
dm.discoveriesMutex.Unlock()
97-
for _, d := range discoveries {
9885
wg.Add(1)
9986
go func(d *discovery.PluggableDiscovery) {
100-
defer wg.Done()
101-
if err := f(d); err != nil {
102-
errChan <- err
103-
}
87+
dm.startDiscovery(d)
88+
wg.Done()
10489
}(d)
10590
}
91+
wg.Wait()
92+
dm.discoveriesRunning = true
93+
}
10694

107-
// Wait in a goroutine to collect eventual errors running a discovery.
108-
// When all goroutines that are calling discoveries are done close the errors chan.
109-
go func() {
110-
wg.Wait()
111-
close(errChan)
112-
}()
95+
// Add adds a discovery to the list of managed discoveries
96+
func (dm *DiscoveryManager) Add(d *discovery.PluggableDiscovery) error {
97+
dm.discoveriesMutex.Lock()
98+
defer dm.discoveriesMutex.Unlock()
11399

114-
errs := []error{}
115-
for err := range errChan {
116-
errs = append(errs, err)
100+
id := d.GetID()
101+
if _, has := dm.discoveries[id]; has {
102+
return errors.Errorf(tr("pluggable discovery already added: %s"), id)
117103
}
118-
return errs
119-
}
104+
dm.discoveries[id] = d
120105

121-
// RunAll the discoveries for this DiscoveryManager,
122-
// returns an error for each discovery failing to run
123-
func (dm *DiscoveryManager) RunAll() []error {
124-
return dm.parallelize(func(d *discovery.PluggableDiscovery) error {
125-
if d.State() != discovery.Dead {
126-
// This discovery is already alive, nothing to do
127-
return nil
128-
}
129-
130-
if err := d.Run(); err != nil {
131-
dm.remove(d.GetID())
132-
return fmt.Errorf(tr("discovery %[1]s process not started: %[2]w"), d.GetID(), err)
133-
}
134-
return nil
135-
})
106+
if dm.discoveriesRunning {
107+
dm.startDiscovery(d)
108+
}
109+
return nil
136110
}
137111

138-
// StartAll the discoveries for this DiscoveryManager,
139-
// returns an error for each discovery failing to start
140-
func (dm *DiscoveryManager) StartAll() []error {
141-
return dm.parallelize(func(d *discovery.PluggableDiscovery) error {
142-
state := d.State()
143-
if state != discovery.Idling {
144-
// Already started
145-
return nil
146-
}
147-
if err := d.Start(); err != nil {
148-
dm.remove(d.GetID())
149-
return fmt.Errorf(tr("starting discovery %[1]s: %[2]w"), d.GetID(), err)
150-
}
151-
return nil
152-
})
112+
// PortWatcher is a watcher for all discovery events (port connection/disconnection)
113+
type PortWatcher struct {
114+
closeCB func()
115+
feed chan *discovery.Event
153116
}
154117

155-
// StartSyncAll the discoveries for this DiscoveryManager,
156-
// returns an error for each discovery failing to start syncing
157-
func (dm *DiscoveryManager) StartSyncAll() (<-chan *discovery.Event, []error) {
158-
eventSink := make(chan *discovery.Event, 5)
159-
var wg sync.WaitGroup
160-
errs := dm.parallelize(func(d *discovery.PluggableDiscovery) error {
161-
state := d.State()
162-
if state != discovery.Idling || state == discovery.Syncing {
163-
// Already syncing
164-
return nil
165-
}
166-
167-
eventCh, err := d.StartSync(5)
168-
if err != nil {
169-
dm.remove(d.GetID())
170-
return fmt.Errorf(tr("start syncing discovery %[1]s: %[2]w"), d.GetID(), err)
171-
}
118+
// Feed returns the feed of events coming from the discoveries
119+
func (pw *PortWatcher) Feed() <-chan *discovery.Event {
120+
return pw.feed
121+
}
172122

173-
wg.Add(1)
174-
go func() {
175-
for ev := range eventCh {
176-
eventSink <- ev
177-
}
178-
wg.Done()
179-
}()
180-
return nil
181-
})
182-
go func() {
183-
wg.Wait()
184-
eventSink <- &discovery.Event{Type: "quit"}
185-
close(eventSink)
186-
}()
187-
return eventSink, errs
123+
// Close closes the PortWatcher
124+
func (pw *PortWatcher) Close() {
125+
pw.closeCB()
188126
}
189127

190-
// StopAll the discoveries for this DiscoveryManager,
191-
// returns an error for each discovery failing to stop
192-
func (dm *DiscoveryManager) StopAll() []error {
193-
return dm.parallelize(func(d *discovery.PluggableDiscovery) error {
194-
state := d.State()
195-
if state != discovery.Syncing && state != discovery.Running {
196-
// Not running nor syncing, nothing to stop
197-
return nil
198-
}
128+
// Watch starts a watcher for all discovery events (port connection/disconnection).
129+
// The watcher must be closed when it is no longer needed with the Close method.
130+
func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
131+
dm.Start()
199132

200-
if err := d.Stop(); err != nil {
201-
dm.remove(d.GetID())
202-
return fmt.Errorf(tr("stopping discovery %[1]s: %[2]w"), d.GetID(), err)
203-
}
204-
return nil
205-
})
133+
watcher := &PortWatcher{
134+
feed: make(chan *discovery.Event),
135+
}
136+
watcher.closeCB = func() {
137+
dm.watchersMutex.Lock()
138+
delete(dm.watchers, watcher)
139+
dm.watchersMutex.Unlock()
140+
close(watcher.feed)
141+
}
142+
dm.watchersMutex.Lock()
143+
dm.watchers[watcher] = true
144+
dm.watchersMutex.Unlock()
145+
return watcher, nil
206146
}
207147

208-
// QuitAll quits all the discoveries managed by this DiscoveryManager.
209-
// Returns an error for each discovery that fails quitting
210-
func (dm *DiscoveryManager) QuitAll() []error {
211-
errs := dm.parallelize(func(d *discovery.PluggableDiscovery) error {
212-
if d.State() == discovery.Dead {
213-
// Stop! Stop! It's already dead!
214-
return nil
148+
func (dm *DiscoveryManager) startDiscovery(d *discovery.PluggableDiscovery) (discErr error) {
149+
defer func() {
150+
if discErr != nil {
151+
logrus.Errorf("Discovery %s failed to run: %s", d.GetID(), discErr)
215152
}
153+
}()
216154

217-
d.Quit()
218-
return nil
219-
})
220-
return errs
221-
}
222-
223-
// List returns a list of available ports detected from all discoveries
224-
// and a list of errors for those discoveries that returned one.
225-
func (dm *DiscoveryManager) List() ([]*discovery.Port, []error) {
226-
var wg sync.WaitGroup
227-
// Use this struct to avoid the need of two separate
228-
// channels for ports and errors.
229-
type listMsg struct {
230-
Err error
231-
Port *discovery.Port
155+
if err := d.Run(); err != nil {
156+
return fmt.Errorf(tr("discovery %[1]s process not started: %[2]w"), d.GetID(), err)
232157
}
233-
msgChan := make(chan listMsg)
234-
dm.discoveriesMutex.Lock()
235-
discoveries := []*discovery.PluggableDiscovery{}
236-
for _, d := range dm.discoveries {
237-
discoveries = append(discoveries, d)
238-
}
239-
dm.discoveriesMutex.Unlock()
240-
for _, d := range discoveries {
241-
wg.Add(1)
242-
go func(d *discovery.PluggableDiscovery) {
243-
defer wg.Done()
244-
if d.State() != discovery.Running {
245-
// Discovery is not running, it won't return anything
246-
return
247-
}
248-
ports, err := d.List()
249-
if err != nil {
250-
msgChan <- listMsg{Err: fmt.Errorf(tr("listing ports from discovery %[1]s: %[2]w"), d.GetID(), err)}
251-
}
252-
for _, p := range ports {
253-
msgChan <- listMsg{Port: p}
254-
}
255-
}(d)
158+
eventCh, err := d.StartSync(5)
159+
if err != nil {
160+
return fmt.Errorf(tr("start syncing discovery %[1]s: %[2]w"), d.GetID(), err)
256161
}
257162

163+
// XXX do better cleanup if the discovery fails to start
164+
258165
go func() {
259-
// Close the channel only after all goroutines are done
260-
wg.Wait()
261-
close(msgChan)
166+
for ev := range eventCh {
167+
dm.feed <- ev
168+
}
262169
}()
170+
return nil
171+
}
263172

264-
ports := []*discovery.Port{}
265-
errs := []error{}
266-
for msg := range msgChan {
267-
if msg.Err != nil {
268-
errs = append(errs, msg.Err)
269-
} else {
270-
ports = append(ports, msg.Port)
173+
func (dm *DiscoveryManager) feeder() {
174+
// Feed all watchers with data coming from the discoveries
175+
for ev := range dm.feed {
176+
dm.watchersMutex.Lock()
177+
for watcher := range dm.watchers {
178+
select {
179+
case watcher.feed <- ev:
180+
// OK
181+
default:
182+
// If the watcher is not able to process event fast enough
183+
// remove the watcher from the list of watchers
184+
go watcher.Close()
185+
}
271186
}
187+
dm.cacheEvent(ev)
188+
dm.watchersMutex.Unlock()
272189
}
273-
return ports, errs
274190
}
275191

276-
// ListCachedPorts return the current list of ports detected from all discoveries
277-
func (dm *DiscoveryManager) ListCachedPorts() []*discovery.Port {
278-
res := []*discovery.Port{}
192+
func (dm *DiscoveryManager) cacheEvent(ev *discovery.Event) {
193+
// XXX: TODO
194+
}
195+
196+
// List return the current list of ports detected from all discoveries
197+
func (dm *DiscoveryManager) List() []*discovery.Port {
198+
dm.Start()
199+
200+
// XXX: Cache ports and return them
279201
dm.discoveriesMutex.Lock()
280-
discoveries := []*discovery.PluggableDiscovery{}
202+
defer dm.discoveriesMutex.Unlock()
203+
res := []*discovery.Port{}
281204
for _, d := range dm.discoveries {
282-
discoveries = append(discoveries, d)
283-
}
284-
dm.discoveriesMutex.Unlock()
285-
for _, d := range discoveries {
286-
if d.State() != discovery.Syncing {
287-
// Discovery is not syncing
288-
continue
289-
}
290205
res = append(res, d.ListCachedPorts()...)
291206
}
292207
return res

0 commit comments

Comments
 (0)