Skip to content

Commit bdb7e03

Browse files
yoctopucedpgeorge
authored andcommitted
extmod/asyncio: Fix early exit of asyncio scheduler.
This commit fixes three open issues related to the asyncio scheduler exiting prematurely when the main task queue is empty, in cases where CPython would not exit (for example, because the main task is not done because it's on a different queue). In the first case, the scheduler exits because running a task via `run_until_complete` did not schedule any dependent tasks. In the other two cases, the scheduler exits because the tasks are queued in an event queue. Tests have been added which reproduce the original issues. These test cases document the unauthorized use of `Event.set()` from a soft IRQ, and are skipped in unsupported environments (webassembly and native emitter). Fixes issues micropython#16759, micropython#16569 and micropython#16318. Signed-off-by: Yoctopuce dev <[email protected]>
1 parent 79abdad commit bdb7e03

7 files changed

+255
-12
lines changed

extmod/asyncio/core.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,16 @@ def run_until_complete(main_task=None):
163163
# A task waiting on _task_queue; "ph_key" is time to schedule task at
164164
dt = max(0, ticks_diff(t.ph_key, ticks()))
165165
elif not _io_queue.map:
166-
# No tasks can be woken so finished running
166+
# No tasks can be woken
167167
cur_task = None
168-
return
168+
if not main_task or not main_task.state:
169+
# no main_task, or main_task is done so finished running
170+
return
171+
# At this point, there is theoretically nothing that could wake the
172+
# scheduler, but it is not allowed to exit either. We keep the code
173+
# running so that a hypothetical debugger (or other such meta-process)
174+
# can get a view of what is happening and possibly abort.
175+
dt = 3
169176
# print('(poll {})'.format(dt), len(_io_queue.map))
170177
_io_queue.wait_io_event(dt)
171178

@@ -187,31 +194,33 @@ def run_until_complete(main_task=None):
187194
except excs_all as er:
188195
# Check the task is not on any event queue
189196
assert t.data is None
190-
# This task is done, check if it's the main task and then loop should stop
191-
if t is main_task:
197+
# If it's the main task, it is considered as awaited by the caller
198+
awaited = t is main_task
199+
if awaited:
192200
cur_task = None
193-
if isinstance(er, StopIteration):
194-
return er.value
195-
raise er
201+
if not isinstance(er, StopIteration):
202+
t.state = False
203+
raise er
204+
if t.state is None:
205+
t.state = False
196206
if t.state:
197207
# Task was running but is now finished.
198-
waiting = False
199208
if t.state is True:
200209
# "None" indicates that the task is complete and not await'ed on (yet).
201-
t.state = None
210+
t.state = False if awaited else None
202211
elif callable(t.state):
203212
# The task has a callback registered to be called on completion.
204213
t.state(t, er)
205214
t.state = False
206-
waiting = True
215+
awaited = True
207216
else:
208217
# Schedule any other tasks waiting on the completion of this task.
209218
while t.state.peek():
210219
_task_queue.push(t.state.pop())
211-
waiting = True
220+
awaited = True
212221
# "False" indicates that the task is complete and has been await'ed on.
213222
t.state = False
214-
if not waiting and not isinstance(er, excs_stop):
223+
if not awaited and not isinstance(er, excs_stop):
215224
# An exception ended this detached task, so queue it for later
216225
# execution to handle the uncaught exception if no other task retrieves
217226
# the exception in the meantime (this is handled by Task.throw).
@@ -229,6 +238,9 @@ def run_until_complete(main_task=None):
229238
_exc_context["exception"] = exc
230239
_exc_context["future"] = t
231240
Loop.call_exception_handler(_exc_context)
241+
# If it's the main task then the loop should stop
242+
if t is main_task:
243+
return er.value
232244

233245

234246
# Create a new task from a coroutine and run it until it finishes

tests/extmod/asyncio_event_queue.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Ensure that an asyncio task can wait on an Event when the
2+
# _task_queue is empty
3+
# https://github.com/micropython/micropython/issues/16569
4+
5+
try:
6+
import asyncio
7+
except ImportError:
8+
print("SKIP")
9+
raise SystemExit
10+
11+
# This test requires checking that the asyncio scheduler
12+
# remains active "indefinitely" when the task queue is empty.
13+
#
14+
# To check this, we need another independent scheduler that
15+
# can wait for a certain amount of time. So we have to
16+
# create one using micropython.schedule() and time.ticks_ms()
17+
#
18+
# Technically, this code breaks the rules, as it is clearly
19+
# documented that Event.set() should _NOT_ be called from a
20+
# schedule (soft IRQ) because in some cases, a race condition
21+
# can occur, resulting in a crash. However:
22+
# - since the risk of a race condition in that specific
23+
# case has been analysed and excluded
24+
# - given that there is no other simple alternative to
25+
# write this test case,
26+
# an exception to the rule was deemed acceptable. See
27+
# https://github.com/micropython/micropython/pull/16772
28+
29+
import micropython, time
30+
31+
try:
32+
micropython.schedule
33+
except AttributeError:
34+
print("SKIP")
35+
raise SystemExit
36+
37+
38+
evt = asyncio.Event()
39+
40+
41+
def schedule_watchdog(end_ticks):
42+
if time.ticks_diff(end_ticks, time.ticks_ms()) <= 0:
43+
print("asyncio still pending, unlocking event")
44+
# Caution: about to call Event.set() from a schedule
45+
# (see the note in the comment above)
46+
evt.set()
47+
return
48+
micropython.schedule(schedule_watchdog, end_ticks)
49+
50+
51+
async def foo():
52+
print("foo waiting")
53+
schedule_watchdog(time.ticks_add(time.ticks_ms(), 100))
54+
await evt.wait()
55+
print("foo done")
56+
57+
58+
async def main():
59+
print("main started")
60+
await foo()
61+
print("main done")
62+
63+
64+
asyncio.run(main())
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
main started
2+
foo waiting
3+
asyncio still pending, unlocking event
4+
foo done
5+
main done
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Ensure that an asyncio task can wait on an Event when the
2+
# _task_queue is empty, in the context of an async iterator
3+
# https://github.com/micropython/micropython/issues/16318
4+
5+
try:
6+
import asyncio
7+
except ImportError:
8+
print("SKIP")
9+
raise SystemExit
10+
11+
# This test requires checking that the asyncio scheduler
12+
# remains active "indefinitely" when the task queue is empty.
13+
#
14+
# To check this, we need another independent scheduler that
15+
# can wait for a certain amount of time. So we have to
16+
# create one using micropython.schedule() and time.ticks_ms()
17+
#
18+
# Technically, this code breaks the rules, as it is clearly
19+
# documented that Event.set() should _NOT_ be called from a
20+
# schedule (soft IRQ) because in some cases, a race condition
21+
# can occur, resulting in a crash. However:
22+
# - since the risk of a race condition in that specific
23+
# case has been analysed and excluded
24+
# - given that there is no other simple alternative to
25+
# write this test case,
26+
# an exception to the rule was deemed acceptable. See
27+
# https://github.com/micropython/micropython/pull/16772
28+
29+
import micropython, time
30+
31+
try:
32+
micropython.schedule
33+
except AttributeError:
34+
print("SKIP")
35+
raise SystemExit
36+
37+
ai = None
38+
39+
40+
def schedule_watchdog(end_ticks):
41+
if time.ticks_diff(end_ticks, time.ticks_ms()) <= 0:
42+
print("good: asyncio iterator is still pending, exiting")
43+
# Caution: ai.fetch_data() will invoke Event.set()
44+
# (see the note in the comment above)
45+
ai.fetch_data(None)
46+
return
47+
micropython.schedule(schedule_watchdog, end_ticks)
48+
49+
50+
async def test(ai):
51+
for x in range(3):
52+
await asyncio.sleep(0.1)
53+
ai.fetch_data(f"bar {x}")
54+
55+
56+
class AsyncIterable:
57+
def __init__(self):
58+
self.message = None
59+
self.evt = asyncio.Event()
60+
61+
def __aiter__(self):
62+
return self
63+
64+
async def __anext__(self):
65+
await self.evt.wait()
66+
self.evt.clear()
67+
if self.message is None:
68+
raise StopAsyncIteration
69+
return self.message
70+
71+
def fetch_data(self, message):
72+
self.message = message
73+
self.evt.set()
74+
75+
76+
async def main():
77+
global ai
78+
ai = AsyncIterable()
79+
asyncio.create_task(test(ai))
80+
schedule_watchdog(time.ticks_add(time.ticks_ms(), 500))
81+
async for message in ai:
82+
print(message)
83+
print("end main")
84+
85+
86+
asyncio.run(main())
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
bar 0
2+
bar 1
3+
bar 2
4+
good: asyncio iterator is still pending, exiting
5+
end main
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Test asyncio.wait_for, with dependent tasks
2+
# https://github.com/micropython/micropython/issues/16759
3+
4+
try:
5+
import asyncio
6+
except ImportError:
7+
print("SKIP")
8+
raise SystemExit
9+
10+
11+
# CPython 3.12 deprecated calling get_event_loop() when there is no current event
12+
# loop, so to make this test run on CPython requires setting the event loop.
13+
if hasattr(asyncio, "set_event_loop"):
14+
asyncio.set_event_loop(asyncio.new_event_loop())
15+
16+
17+
class Worker:
18+
def __init__(self):
19+
self._eventLoop = None
20+
self._tasks = []
21+
22+
def launchTask(self, asyncJob):
23+
if self._eventLoop is None:
24+
self._eventLoop = asyncio.get_event_loop()
25+
return self._eventLoop.create_task(asyncJob)
26+
27+
async def job(self, prerequisite, taskName):
28+
if prerequisite:
29+
await prerequisite
30+
await asyncio.sleep(0.1)
31+
print(taskName, "work completed")
32+
33+
def planTasks(self):
34+
self._tasks.append(self.launchTask(self.job(None, "task0")))
35+
self._tasks.append(self.launchTask(self.job(self._tasks[0], "task1")))
36+
self._tasks.append(self.launchTask(self.job(self._tasks[1], "task2")))
37+
38+
async def waitForTask(self, taskIdx):
39+
return await self._tasks[taskIdx]
40+
41+
def syncWaitForTask(self, taskIdx):
42+
return self._eventLoop.run_until_complete(self._tasks[taskIdx])
43+
44+
45+
async def async_test():
46+
print("--- async test")
47+
worker = Worker()
48+
worker.planTasks()
49+
await worker.waitForTask(0)
50+
print("-> task0 done")
51+
await worker.waitForTask(2)
52+
print("-> task2 done")
53+
54+
55+
def sync_test():
56+
print("--- sync test")
57+
worker = Worker()
58+
worker.planTasks()
59+
worker.syncWaitForTask(0)
60+
print("-> task0 done")
61+
worker.syncWaitForTask(2)
62+
print("-> task2 done")
63+
64+
65+
asyncio.get_event_loop().run_until_complete(async_test())
66+
sync_test()

tests/run-tests.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ def open(self, path, mode):
162162
"extmod/asyncio_new_event_loop.py",
163163
"extmod/asyncio_threadsafeflag.py",
164164
"extmod/asyncio_wait_for_fwd.py",
165+
"extmod/asyncio_event_queue.py",
166+
"extmod/asyncio_iterator_event.py",
167+
"extmod/asyncio_wait_for_linked_task.py",
165168
"extmod/binascii_a2b_base64.py",
166169
"extmod/deflate_compress_memory_error.py", # tries to allocate unlimited memory
167170
"extmod/re_stack_overflow.py",
@@ -843,6 +846,8 @@ def run_tests(pyb, tests, args, result_dir, num_threads=1):
843846
) # native doesn't have proper traceback info
844847
skip_tests.add("micropython/schedule.py") # native code doesn't check pending events
845848
skip_tests.add("stress/bytecode_limit.py") # bytecode specific test
849+
skip_tests.add("extmod/asyncio_event_queue.py") # native can't run schedule
850+
skip_tests.add("extmod/asyncio_iterator_event.py") # native can't run schedule
846851

847852
def run_one_test(test_file):
848853
test_file = test_file.replace("\\", "/")

0 commit comments

Comments
 (0)