mirror of
https://github.com/micropython/micropython.git
synced 2025-07-21 21:11:12 +02:00
This commit fixes three open issues related to the asyncio scheduler exiting prematurely when the main task queue is empty, in cases where CPython would not exit (for example, because the main task is not done because it's on a different queue). In the first case, the scheduler exits because running a task via `run_until_complete` did not schedule any dependent tasks. In the other two cases, the scheduler exits because the tasks are queued in an event queue. Tests have been added which reproduce the original issues. These test cases document the unauthorized use of `Event.set()` from a soft IRQ, and are skipped in unsupported environments (webassembly and native emitter). Fixes issues #16759, #16569 and #16318. Signed-off-by: Yoctopuce dev <dev@yoctopuce.com>
67 lines
1.8 KiB
Python
67 lines
1.8 KiB
Python
# Test asyncio.wait_for, with dependent tasks
|
|
# https://github.com/micropython/micropython/issues/16759
|
|
|
|
try:
|
|
import asyncio
|
|
except ImportError:
|
|
print("SKIP")
|
|
raise SystemExit
|
|
|
|
|
|
# CPython 3.12 deprecated calling get_event_loop() when there is no current event
|
|
# loop, so to make this test run on CPython requires setting the event loop.
|
|
if hasattr(asyncio, "set_event_loop"):
|
|
asyncio.set_event_loop(asyncio.new_event_loop())
|
|
|
|
|
|
class Worker:
|
|
def __init__(self):
|
|
self._eventLoop = None
|
|
self._tasks = []
|
|
|
|
def launchTask(self, asyncJob):
|
|
if self._eventLoop is None:
|
|
self._eventLoop = asyncio.get_event_loop()
|
|
return self._eventLoop.create_task(asyncJob)
|
|
|
|
async def job(self, prerequisite, taskName):
|
|
if prerequisite:
|
|
await prerequisite
|
|
await asyncio.sleep(0.1)
|
|
print(taskName, "work completed")
|
|
|
|
def planTasks(self):
|
|
self._tasks.append(self.launchTask(self.job(None, "task0")))
|
|
self._tasks.append(self.launchTask(self.job(self._tasks[0], "task1")))
|
|
self._tasks.append(self.launchTask(self.job(self._tasks[1], "task2")))
|
|
|
|
async def waitForTask(self, taskIdx):
|
|
return await self._tasks[taskIdx]
|
|
|
|
def syncWaitForTask(self, taskIdx):
|
|
return self._eventLoop.run_until_complete(self._tasks[taskIdx])
|
|
|
|
|
|
async def async_test():
|
|
print("--- async test")
|
|
worker = Worker()
|
|
worker.planTasks()
|
|
await worker.waitForTask(0)
|
|
print("-> task0 done")
|
|
await worker.waitForTask(2)
|
|
print("-> task2 done")
|
|
|
|
|
|
def sync_test():
|
|
print("--- sync test")
|
|
worker = Worker()
|
|
worker.planTasks()
|
|
worker.syncWaitForTask(0)
|
|
print("-> task0 done")
|
|
worker.syncWaitForTask(2)
|
|
print("-> task2 done")
|
|
|
|
|
|
asyncio.get_event_loop().run_until_complete(async_test())
|
|
sync_test()
|