Skip to content

Commit d61f954

Browse files
authored
Fixed some weird behaviour in gRPC Monitor command (#1706)
* Made monitor.jsonDecodeLoop more thread resilient * Use idiomatic way to check for channel-closed event * Ensure that monitor processes are killed and collected correctly * Simplified (and fixed...) monitor stream handling There is no need to spawn two goroutines, one is enough. Also the extra context is not needed anymore. The monitor port can be closed when the goroutine with the grpc recv loop ends. * Fix cli crash if no configuration is provided in the Monitor gRPC call * Extended term_example gRPC API test coverage Now it cycles on the same port two times. * Reduce timeout for monitor close/quit commands to 250ms
1 parent 6c719d7 commit d61f954

File tree

4 files changed

+85
-89
lines changed

4 files changed

+85
-89
lines changed

arduino/monitor/monitor.go

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -103,38 +103,39 @@ func (mon *PluggableMonitor) String() string {
103103
return mon.id
104104
}
105105

106-
func (mon *PluggableMonitor) jsonDecodeLoop(in io.Reader, outChan chan<- *monitorMessage) {
106+
func jsonDecodeLoop(in io.Reader, outChan chan<- *monitorMessage, log *logrus.Entry, lastError *error) {
107107
decoder := json.NewDecoder(in)
108108

109109
for {
110110
var msg monitorMessage
111111
if err := decoder.Decode(&msg); err != nil {
112-
mon.incomingMessagesError = err
112+
*lastError = err
113113
close(outChan)
114-
mon.log.Errorf("stopped decode loop: %s", err)
114+
log.Errorf("stopped decode loop: %s", err)
115115
return
116116
}
117-
mon.log.
118-
WithField("event_type", msg.EventType).
117+
log.WithField("event_type", msg.EventType).
119118
WithField("message", msg.Message).
120119
WithField("error", msg.Error).
121120
Infof("received message")
122121
if msg.EventType == "port_closed" {
123-
mon.log.Infof("monitor port has been closed externally")
122+
log.Infof("monitor port has been closed externally")
124123
} else {
125124
outChan <- &msg
126125
}
127126
}
128127
}
129128

130129
func (mon *PluggableMonitor) waitMessage(timeout time.Duration, expectedEvt string) (*monitorMessage, error) {
130+
mon.log.WithField("expected", expectedEvt).Debugf("waiting for event")
131131
var msg *monitorMessage
132132
select {
133-
case msg = <-mon.incomingMessagesChan:
134-
if msg == nil {
133+
case m, ok := <-mon.incomingMessagesChan:
134+
if !ok {
135135
// channel has been closed
136136
return nil, mon.incomingMessagesError
137137
}
138+
msg = m
138139
case <-time.After(timeout):
139140
return nil, fmt.Errorf(tr("timeout waiting for message"))
140141
}
@@ -192,22 +193,21 @@ func (mon *PluggableMonitor) runProcess() error {
192193

193194
messageChan := make(chan *monitorMessage)
194195
mon.incomingMessagesChan = messageChan
195-
go mon.jsonDecodeLoop(stdout, messageChan)
196+
go jsonDecodeLoop(stdout, messageChan, mon.log, &mon.incomingMessagesError)
196197

197198
mon.log.Infof("Monitor process started successfully!")
198199
return nil
199200
}
200201

201-
func (mon *PluggableMonitor) killProcess() error {
202+
func (mon *PluggableMonitor) killProcess() {
202203
mon.log.Infof("Killing monitor process")
203204
if err := mon.process.Kill(); err != nil {
204-
return err
205+
mon.log.WithError(err).Error("Sent kill signal")
205206
}
206207
if err := mon.process.Wait(); err != nil {
207-
return err
208+
mon.log.WithError(err).Error("Waiting for process end")
208209
}
209-
mon.log.Infof("Monitor process killed successfully!")
210-
return nil
210+
mon.log.Infof("Monitor process killed")
211211
}
212212

213213
// Run starts the monitor executable process and sends the HELLO command to the monitor to agree on the
@@ -220,15 +220,10 @@ func (mon *PluggableMonitor) Run() (err error) {
220220

221221
defer func() {
222222
// If the monitor process is started successfully but the HELLO handshake
223-
// fails the monitor is an unusable state, we kill the process to avoid
223+
// fails the monitor is in an unusable state, we kill the process to avoid
224224
// further issues down the line.
225-
if err == nil {
226-
return
227-
}
228-
if killErr := mon.killProcess(); killErr != nil {
229-
// Log failure to kill the process, ideally that should never happen
230-
// but it's best to know it if it does
231-
mon.log.Errorf("Killing monitor after unsuccessful start: %s", killErr)
225+
if err != nil {
226+
mon.killProcess()
232227
}
233228
}()
234229

@@ -297,20 +292,19 @@ func (mon *PluggableMonitor) Close() error {
297292
if err := mon.sendCommand("CLOSE\n"); err != nil {
298293
return err
299294
}
300-
_, err := mon.waitMessage(time.Second*10, "close")
295+
_, err := mon.waitMessage(time.Millisecond*250, "close")
301296
return err
302297
}
303298

304299
// Quit terminates the monitor. No more commands can be accepted by the monitor.
305300
func (mon *PluggableMonitor) Quit() error {
301+
defer mon.killProcess() // ensure that killProcess is called in any case...
302+
306303
if err := mon.sendCommand("QUIT\n"); err != nil {
307304
return err
308305
}
309-
if _, err := mon.waitMessage(time.Second*10, "quit"); err != nil {
306+
if _, err := mon.waitMessage(time.Millisecond*250, "quit"); err != nil {
310307
return err
311308
}
312-
if err := mon.killProcess(); err != nil {
313-
mon.log.WithError(err).Info("error killing monitor process")
314-
}
315309
return nil
316310
}

commands/daemon/daemon.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -508,9 +508,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
508508
if err != nil {
509509
return err
510510
}
511-
ctx, cancel := context.WithCancel(stream.Context())
511+
512512
go func() {
513-
defer cancel()
513+
// close port on gRPC call EOF or errors
514+
defer portProxy.Close()
515+
514516
for {
515517
msg, err := stream.Recv()
516518
if errors.Is(err, io.EOF) {
@@ -541,23 +543,20 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
541543
}
542544
}
543545
}()
544-
go func() {
545-
defer cancel()
546-
buff := make([]byte, 4096)
547-
for {
548-
n, err := portProxy.Read(buff)
549-
if errors.Is(err, io.EOF) {
550-
return
551-
}
552-
if err != nil {
553-
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
554-
return
555-
}
556-
if err := stream.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
557-
return
558-
}
546+
547+
buff := make([]byte, 4096)
548+
for {
549+
n, err := portProxy.Read(buff)
550+
if errors.Is(err, io.EOF) {
551+
break
559552
}
560-
}()
561-
<-ctx.Done()
553+
if err != nil {
554+
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
555+
break
556+
}
557+
if err := stream.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
558+
break
559+
}
560+
}
562561
return nil
563562
}

commands/daemon/term_example/main.go

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -74,52 +74,54 @@ func main() {
7474
}
7575
fmt.Println("Detected port:", port.Label, port.ProtocolLabel)
7676

77+
connectToPort(cli, instance, port)
78+
time.Sleep(5 * time.Second)
79+
connectToPort(cli, instance, port)
80+
time.Sleep(5 * time.Second)
81+
}
82+
83+
func connectToPort(cli commands.ArduinoCoreServiceClient, instance *commands.Instance, port *commands.Port) {
7784
// Connect to the port monitor
78-
fmt.Println("Connecting to monitor")
85+
fmt.Println("Connecting to port", port)
86+
7987
ctx, cancel := context.WithCancel(context.Background())
80-
if respStream, err := cli.Monitor(ctx); err != nil {
81-
log.Fatal("Monitor:", err)
82-
} else {
83-
if err := respStream.Send(&commands.MonitorRequest{
84-
Instance: instance,
85-
Port: port,
86-
}); err != nil {
87-
log.Fatal("Monitor send-config:", err)
88-
}
89-
time.Sleep(1 * time.Second)
90-
91-
go func() {
92-
for {
93-
if resp, err := respStream.Recv(); err != nil {
94-
fmt.Println(" RECV:", err)
95-
break
96-
} else {
97-
fmt.Println(" RECV:", resp)
98-
}
99-
}
100-
}()
88+
monitorClient, err := cli.Monitor(ctx)
89+
if err != nil {
90+
log.Fatal("Error opening Monitor:", err)
91+
}
92+
if err := monitorClient.Send(&commands.MonitorRequest{
93+
Instance: instance,
94+
Port: port,
95+
}); err != nil {
96+
log.Fatal("Error sending Monitor config:", err)
97+
}
10198

102-
hello := &commands.MonitorRequest{
103-
TxData: []byte("HELLO!"),
104-
}
105-
fmt.Println("Send:", hello)
106-
if err := respStream.Send(hello); err != nil {
107-
log.Fatal("Monitor send HELLO:", err)
99+
go func() {
100+
for {
101+
resp, err := monitorClient.Recv()
102+
if err != nil {
103+
fmt.Println(" RECV-ERR:", err)
104+
break
105+
}
106+
fmt.Println(" RECV:", resp)
108107
}
108+
}()
109109

110-
fmt.Println("Send:", hello)
111-
if err := respStream.Send(hello); err != nil {
112-
log.Fatal("Monitor send HELLO:", err)
113-
}
110+
hello := &commands.MonitorRequest{
111+
TxData: []byte("HELLO!"),
112+
}
113+
fmt.Println("Send:", hello)
114+
if err := monitorClient.Send(hello); err != nil {
115+
log.Fatal("Monitor send HELLO:", err)
116+
}
114117

115-
time.Sleep(5 * time.Second)
118+
time.Sleep(15 * time.Second)
116119

117-
fmt.Println("Closing Monitor")
118-
if err := respStream.CloseSend(); err != nil {
119-
log.Fatal("Monitor close send:", err)
120-
}
121-
time.Sleep(5 * time.Second)
120+
fmt.Println("Closing Monitor")
121+
if err := monitorClient.CloseSend(); err != nil {
122+
log.Fatal("Monitor close send:", err)
122123
}
124+
<-monitorClient.Context().Done()
125+
123126
cancel()
124-
time.Sleep(5 * time.Second)
125127
}

commands/monitor/monitor.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,11 @@ func Monitor(ctx context.Context, req *rpc.MonitorRequest) (*PortProxy, *pluggab
8686
m.Quit()
8787
return nil, nil, &arduino.FailedMonitorError{Cause: err}
8888
}
89-
90-
for _, setting := range req.GetPortConfiguration().Settings {
91-
if err := m.Configure(setting.SettingId, setting.Value); err != nil {
92-
logrus.Errorf("Could not set configuration %s=%s: %s", setting.SettingId, setting.Value, err)
89+
if portConfig := req.GetPortConfiguration(); portConfig != nil {
90+
for _, setting := range portConfig.Settings {
91+
if err := m.Configure(setting.SettingId, setting.Value); err != nil {
92+
logrus.Errorf("Could not set configuration %s=%s: %s", setting.SettingId, setting.Value, err)
93+
}
9394
}
9495
}
9596

0 commit comments

Comments
 (0)