6
6
from common import running_on_ci
7
7
message = []
8
8
9
+
9
10
def test_ws_connection (socketio ):
10
11
print ('my sid is' , socketio .sid )
11
12
assert socketio .sid is not None
12
13
13
- def test_list (socketio ):
14
- global message
15
- socketio .on ('message' , message_handler )
14
+
15
+ def test_list (socketio , message ):
16
16
socketio .emit ('command' , 'list' )
17
17
time .sleep (.2 )
18
18
print (message )
19
19
assert any ("list" in i for i in message )
20
20
assert any ("Ports" in i for i in message )
21
21
assert any ("Network" in i for i in message )
22
22
23
+
23
24
# NOTE run the following tests with a board connected to the PC
24
25
@pytest .mark .skipif (
25
26
running_on_ci (),
26
27
reason = "VMs have no serial ports" ,
27
28
)
28
- def test_open_serial_default (socketio , serial_port , baudrate ):
29
- general_open_serial (socketio , serial_port , baudrate , "default" )
29
+ def test_open_serial_default (socketio , serial_port , baudrate , message ):
30
+ general_open_serial (socketio , serial_port , baudrate , message , "default" )
30
31
31
32
32
33
@pytest .mark .skipif (
33
34
running_on_ci (),
34
35
reason = "VMs have no serial ports" ,
35
36
)
36
- def test_open_serial_timed (socketio , serial_port , baudrate ):
37
- general_open_serial (socketio , serial_port , baudrate , "timed" )
37
+ def test_open_serial_timed (socketio , serial_port , baudrate , message ):
38
+ general_open_serial (socketio , serial_port , baudrate , message , "timed" )
38
39
39
40
40
41
@pytest .mark .skipif (
41
42
running_on_ci (),
42
43
reason = "VMs have no serial ports" ,
43
44
)
44
- def test_open_serial_timedraw (socketio , serial_port , baudrate ):
45
- general_open_serial (socketio , serial_port , baudrate , "timedraw" )
45
+ def test_open_serial_timedraw (socketio , serial_port , baudrate , message ):
46
+ general_open_serial (socketio , serial_port , baudrate , message , "timedraw" )
46
47
47
48
48
49
# NOTE run the following tests with a board connected to the PC and with the sketch found in test/testdata/SerialEcho.ino on it be sure to change serial_address in conftest.py
49
50
@pytest .mark .skipif (
50
51
running_on_ci (),
51
52
reason = "VMs have no serial ports" ,
52
53
)
53
- def test_send_serial_default (socketio , close_port , serial_port , baudrate ):
54
- general_send_serial (socketio , close_port , serial_port , baudrate , "default" )
54
+ def test_send_serial_default (socketio , close_port , serial_port , baudrate , message ):
55
+ general_send_serial (socketio , close_port , serial_port , baudrate , message , "default" )
55
56
56
57
57
58
@pytest .mark .skipif (
58
59
running_on_ci (),
59
60
reason = "VMs have no serial ports" ,
60
61
)
61
- def test_send_serial_timed (socketio , close_port , serial_port , baudrate ):
62
- general_send_serial (socketio , close_port , serial_port , baudrate , "timed" )
62
+ def test_send_serial_timed (socketio , close_port , serial_port , baudrate , message ):
63
+ general_send_serial (socketio , close_port , serial_port , baudrate , message , "timed" )
63
64
64
65
65
66
@pytest .mark .skipif (
66
67
running_on_ci (),
67
68
reason = "VMs have no serial ports" ,
68
69
)
69
- def test_send_serial_timedraw (socketio , close_port , serial_port , baudrate ):
70
- general_send_serial (socketio , close_port , serial_port , baudrate , "timedraw" )
70
+ def test_send_serial_timedraw (socketio , close_port , serial_port , baudrate , message ):
71
+ general_send_serial (socketio , close_port , serial_port , baudrate , message , "timedraw" )
71
72
72
73
73
- def general_open_serial (socketio , serial_port , baudrate , buffertype ):
74
- global message
75
- message = []
76
- # in message var we will find the "response"
77
- socketio .on ('message' , message_handler )
78
- socketio .emit ('command' , 'open ' + serial_port + ' ' + baudrate + ' ' + buffertype )
79
- # give time to the message var to be filled
80
- time .sleep (.5 )
81
- print (message )
82
- # the serial connection should be open now
83
- assert any ("\" IsOpen\" : true" in i for i in message )
74
+ @pytest .mark .skipif (
75
+ running_on_ci (),
76
+ reason = "VMs have no serial ports" ,
77
+ )
78
+ def test_send_emoji_serial_default (socketio , close_port , serial_port , baudrate , message ):
79
+ general_send_emoji_serial (socketio , close_port , serial_port , baudrate , message , "default" )
80
+
81
+
82
+ @pytest .mark .skipif (
83
+ running_on_ci (),
84
+ reason = "VMs have no serial ports" ,
85
+ )
86
+ def test_send_emoji_serial_timed (socketio , close_port , serial_port , baudrate , message ):
87
+ general_send_emoji_serial (socketio , close_port , serial_port , baudrate , message , "timed" )
88
+
89
+
90
+ @pytest .mark .skipif (
91
+ running_on_ci (),
92
+ reason = "VMs have no serial ports" ,
93
+ )
94
+ def test_send_emoji_serial_timedraw (socketio , close_port , serial_port , baudrate , message ):
95
+ general_send_emoji_serial (socketio , close_port , serial_port , baudrate , message , "timedraw" )
84
96
85
- # close the serial port
97
+
98
+ def general_open_serial (socketio , serial_port , baudrate , message , buffertype ):
99
+ open_serial_port (socketio , serial_port , baudrate , message , buffertype )
100
+ # test the closing of the serial port, we are gonna use close_port for the other tests
86
101
socketio .emit ('command' , 'close ' + serial_port )
87
102
time .sleep (.2 )
88
103
print (message )
89
104
#check if port has been closed
90
105
assert any ("\" IsOpen\" : false," in i for i in message )
91
106
92
107
93
-
94
- def general_send_serial (socketio , close_port , serial_port , baudrate , buffertype ):
95
- global message
96
- message = []
97
- #in message var we will find the "response"
98
- socketio .on ('message' , message_handler )
99
- #open a new serial connection with the specified buffertype, if buffertype is empty it will use the default one
100
- socketio .emit ('command' , 'open ' + serial_port + ' ' + baudrate + ' ' + buffertype )
101
- # give time to the message var to be filled
102
- time .sleep (.5 )
103
- print (message )
104
- # the serial connection should be open now
105
- assert any ("\" IsOpen\" : true" in i for i in message )
106
-
107
- #test with string
108
+ def general_send_serial (socketio , close_port , serial_port , baudrate , message , buffertype ):
109
+ open_serial_port (socketio , serial_port , baudrate , message , buffertype )
108
110
# send the string "ciao" using the serial connection
109
- socketio .emit ('command' , 'send ' + serial_port + ' /" ciao/" ' )
111
+ socketio .emit ('command' , 'send ' + serial_port + ' ciao' )
110
112
time .sleep (1 )
111
113
print (message )
112
114
# check if the send command has been registered
113
- assert any ("send " + serial_port + " / \" ciao/ \" " in i for i in message )
115
+ assert any ("send " + serial_port + " ciao" in i for i in message )
114
116
#check if message has been sent back by the connected board
115
117
if buffertype == "timedraw" :
116
118
output = decode_output (extract_serial_data (message ))
117
119
elif buffertype in ("default" , "timed" ):
118
120
output = extract_serial_data (message )
119
121
assert "ciao" in output
122
+ # the serial connection is closed by close_port() fixture: even if in case of test failure
120
123
121
- #test with emoji
122
- message = [] # reinitialize the message buffer to have a clean situation
124
+
125
+ def general_send_emoji_serial (socketio , close_port , serial_port , baudrate , message , buffertype ):
126
+ open_serial_port (socketio , serial_port , baudrate , message , buffertype )
123
127
# send a lot of emoji: they can be messed up
124
128
socketio .emit ('command' , 'send ' + serial_port + ' /"🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/"' )
125
- time .sleep (.5 )
129
+ time .sleep (1 )
126
130
print (message )
127
131
# check if the send command has been registered
128
132
assert any ("send " + serial_port + " /\" 🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/\" " in i for i in message )
@@ -133,23 +137,23 @@ def general_send_serial(socketio, close_port, serial_port, baudrate, buffertype)
133
137
assert "/\" 🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/\" " in output
134
138
# the serial connection is closed by close_port() fixture: even if in case of test failure
135
139
136
- @pytest .mark .skipif (
137
- running_on_ci (),
138
- reason = "VMs have no serial ports" ,
139
- )
140
- def test_sendraw_serial (socketio , close_port , serial_port , baudrate ):
141
- global message
142
- message = []
143
- #in message var we will find the "response"
144
- socketio .on ('message' , message_handler )
145
- #open a new serial connection with the specified buffertype, if buffertype is empty it will use the default one
146
- socketio .emit ('command' , 'open ' + serial_port + ' ' + baudrate + ' timedraw' )
140
+
141
+ def open_serial_port (socketio , serial_port , baudrate , message , buffertype ):
142
+ #open a new serial connection with the specified buffertype
143
+ socketio .emit ('command' , 'open ' + serial_port + ' ' + baudrate + ' ' + buffertype )
147
144
# give time to the message var to be filled
148
145
time .sleep (.5 )
149
146
print (message )
150
147
# the serial connection should be open now
151
148
assert any ("\" IsOpen\" : true" in i for i in message )
152
149
150
+
151
+ @pytest .mark .skipif (
152
+ running_on_ci (),
153
+ reason = "VMs have no serial ports" ,
154
+ )
155
+ def test_sendraw_serial (socketio , close_port , serial_port , baudrate , message ):
156
+ open_serial_port (socketio , serial_port , baudrate , message , "timedraw" )
153
157
#test with bytes
154
158
integers = [1 , 2 , 3 , 4 , 5 ]
155
159
bytes_array = bytearray (integers )
@@ -165,12 +169,6 @@ def test_sendraw_serial(socketio, close_port, serial_port, baudrate):
165
169
assert encoded_integers in output
166
170
167
171
168
- # callback called by socketio when a message is received
169
- def message_handler (msg ):
170
- # print('Received message: ', msg)
171
- global message
172
- message .append (msg )
173
-
174
172
# helper function used to extract serial data from its JSON representation
175
173
# NOTE make sure to pass a clean message (maybe reinitialize the message global var before populating it)
176
174
def extract_serial_data (msg ):
0 commit comments