PATH:
opt
/
bitninja-python-dojo
/
embedded
/
lib
/
python3.9
/
test
/
test_asyncio
import asyncio import unittest import time def tearDownModule(): asyncio.set_event_loop_policy(None) class SlowTask: """ Task will run for this defined time, ignoring cancel requests """ TASK_TIMEOUT = 0.2 def __init__(self): self.exited = False async def run(self): exitat = time.monotonic() + self.TASK_TIMEOUT while True: tosleep = exitat - time.monotonic() if tosleep <= 0: break try: await asyncio.sleep(tosleep) except asyncio.CancelledError: pass self.exited = True class AsyncioWaitForTest(unittest.TestCase): async def atest_asyncio_wait_for_cancelled(self): t = SlowTask() waitfortask = asyncio.create_task(asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2)) await asyncio.sleep(0) waitfortask.cancel() await asyncio.wait({waitfortask}) self.assertTrue(t.exited) def test_asyncio_wait_for_cancelled(self): asyncio.run(self.atest_asyncio_wait_for_cancelled()) async def atest_asyncio_wait_for_timeout(self): t = SlowTask() try: await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2) except asyncio.TimeoutError: pass self.assertTrue(t.exited) def test_asyncio_wait_for_timeout(self): asyncio.run(self.atest_asyncio_wait_for_timeout()) if __name__ == '__main__': unittest.main()
[-] test_sslproto.py
[edit]
[-] test_futures2.py
[edit]
[-] test_streams.py
[edit]
[-] test_threads.py
[edit]
[-] test_windows_utils.py
[edit]
[-] test_buffered_proto.py
[edit]
[-] test_sock_lowlevel.py
[edit]
[-] test_tasks.py
[edit]
[-] test_unix_events.py
[edit]
[-] test_server.py
[edit]
[-] test_pep492.py
[edit]
[+]
..
[-] test_futures.py
[edit]
[-] test_context.py
[edit]
[-] test_sendfile.py
[edit]
[-] test_transports.py
[edit]
[-] __main__.py
[edit]
[-] echo2.py
[edit]
[-] test_locks.py
[edit]
[-] test_queues.py
[edit]
[+]
__pycache__
[-] test_base_events.py
[edit]
[-] utils.py
[edit]
[-] echo3.py
[edit]
[-] test_events.py
[edit]
[-] test_proactor_events.py
[edit]
[-] test_selector_events.py
[edit]
[-] functional.py
[edit]
[-] __init__.py
[edit]
[-] test_windows_events.py
[edit]
[-] test_asyncio_waitfor.py
[edit]
[-] test_runners.py
[edit]
[-] echo.py
[edit]
[-] test_protocols.py
[edit]
[-] test_subprocess.py
[edit]