import os import tempfile from contextlib import contextmanager import pytest on_windows = os.name == "nt" # Mark all the tests in this file as being windows-only pytestmark = pytest.mark.skipif(not on_windows, reason="windows only") from .tutil import slow, gc_collect_harder, restore_unraisablehook from ... import _core, sleep, move_on_after from ...testing import wait_all_tasks_blocked if on_windows: from .._windows_cffi import ( ffi, kernel32, INVALID_HANDLE_VALUE, raise_winerror, FileFlags, ) # The undocumented API that this is testing should be changed to stop using # UnboundedQueue (or just removed until we have time to redo it), but until # then we filter out the warning. @pytest.mark.filterwarnings("ignore:.*UnboundedQueue:trio.TrioDeprecationWarning") async def test_completion_key_listen(): async def post(key): iocp = ffi.cast("HANDLE", _core.current_iocp()) for i in range(10): print("post", i) if i % 3 == 0: await _core.checkpoint() success = kernel32.PostQueuedCompletionStatus(iocp, i, key, ffi.NULL) assert success with _core.monitor_completion_key() as (key, queue): async with _core.open_nursery() as nursery: nursery.start_soon(post, key) i = 0 print("loop") async for batch in queue: # pragma: no branch print("got some", batch) for info in batch: assert info.lpOverlapped == 0 assert info.dwNumberOfBytesTransferred == i i += 1 if i == 10: break print("end loop") async def test_readinto_overlapped(): data = b"1" * 1024 + b"2" * 1024 + b"3" * 1024 + b"4" * 1024 buffer = bytearray(len(data)) with tempfile.TemporaryDirectory() as tdir: tfile = os.path.join(tdir, "numbers.txt") with open(tfile, "wb") as fp: fp.write(data) fp.flush() rawname = tfile.encode("utf-16le") + b"\0\0" rawname_buf = ffi.from_buffer(rawname) handle = kernel32.CreateFileW( ffi.cast("LPCWSTR", rawname_buf), FileFlags.GENERIC_READ, FileFlags.FILE_SHARE_READ, ffi.NULL, # no security attributes FileFlags.OPEN_EXISTING, FileFlags.FILE_FLAG_OVERLAPPED, ffi.NULL, # no template file ) if handle == INVALID_HANDLE_VALUE: # pragma: no cover raise_winerror() try: with memoryview(buffer) as buffer_view: async def read_region(start, end): await _core.readinto_overlapped( handle, buffer_view[start:end], start ) _core.register_with_iocp(handle) async with _core.open_nursery() as nursery: for start in range(0, 4096, 512): nursery.start_soon(read_region, start, start + 512) assert buffer == data with pytest.raises(BufferError): await _core.readinto_overlapped(handle, b"immutable") finally: kernel32.CloseHandle(handle) @contextmanager def pipe_with_overlapped_read(): from asyncio.windows_utils import pipe import msvcrt read_handle, write_handle = pipe(overlapped=(True, False)) try: write_fd = msvcrt.open_osfhandle(write_handle, 0) yield os.fdopen(write_fd, "wb", closefd=False), read_handle finally: kernel32.CloseHandle(ffi.cast("HANDLE", read_handle)) kernel32.CloseHandle(ffi.cast("HANDLE", write_handle)) @restore_unraisablehook() def test_forgot_to_register_with_iocp(): with pipe_with_overlapped_read() as (write_fp, read_handle): with write_fp: write_fp.write(b"test\n") left_run_yet = False async def main(): target = bytearray(1) try: async with _core.open_nursery() as nursery: nursery.start_soon( _core.readinto_overlapped, read_handle, target, name="xyz" ) await wait_all_tasks_blocked() nursery.cancel_scope.cancel() finally: # Run loop is exited without unwinding running tasks, so # we don't get here until the main() coroutine is GC'ed assert left_run_yet with pytest.raises(_core.TrioInternalError) as exc_info: _core.run(main) left_run_yet = True assert "Failed to cancel overlapped I/O in xyz " in str(exc_info.value) assert "forget to call register_with_iocp()?" in str(exc_info.value) # Make sure the Nursery.__del__ assertion about dangling children # gets put with the correct test del exc_info gc_collect_harder() @slow async def test_too_late_to_cancel(): import time with pipe_with_overlapped_read() as (write_fp, read_handle): _core.register_with_iocp(read_handle) target = bytearray(6) async with _core.open_nursery() as nursery: # Start an async read in the background nursery.start_soon(_core.readinto_overlapped, read_handle, target) await wait_all_tasks_blocked() # Synchronous write to the other end of the pipe with write_fp: write_fp.write(b"test1\ntest2\n") # Note: not trio.sleep! We're making sure the OS level # ReadFile completes, before Trio has a chance to execute # another checkpoint and notice it completed. time.sleep(1) nursery.cancel_scope.cancel() assert target[:6] == b"test1\n" # Do another I/O to make sure we've actually processed the # fallback completion that was posted when CancelIoEx failed. assert await _core.readinto_overlapped(read_handle, target) == 6 assert target[:6] == b"test2\n" def test_lsp_that_hooks_select_gives_good_error(monkeypatch): from .._windows_cffi import WSAIoctls, _handle from .. import _io_windows def patched_get_underlying(sock, *, which=WSAIoctls.SIO_BASE_HANDLE): if hasattr(sock, "fileno"): # pragma: no branch sock = sock.fileno() if which == WSAIoctls.SIO_BSP_HANDLE_SELECT: return _handle(sock + 1) else: return _handle(sock) monkeypatch.setattr(_io_windows, "_get_underlying_socket", patched_get_underlying) with pytest.raises( RuntimeError, match="SIO_BASE_HANDLE and SIO_BSP_HANDLE_SELECT differ" ): _core.run(sleep, 0) def test_lsp_that_completely_hides_base_socket_gives_good_error(monkeypatch): # This tests behavior with an LSP that fails SIO_BASE_HANDLE and returns # self for SIO_BSP_HANDLE_SELECT (like Komodia), but also returns # self for SIO_BSP_HANDLE_POLL. No known LSP does this, but we want to # make sure we get an error rather than an infinite loop. from .._windows_cffi import WSAIoctls, _handle from .. import _io_windows def patched_get_underlying(sock, *, which=WSAIoctls.SIO_BASE_HANDLE): if hasattr(sock, "fileno"): # pragma: no branch sock = sock.fileno() if which == WSAIoctls.SIO_BASE_HANDLE: raise OSError("nope") else: return _handle(sock) monkeypatch.setattr(_io_windows, "_get_underlying_socket", patched_get_underlying) with pytest.raises( RuntimeError, match="SIO_BASE_HANDLE failed and SIO_BSP_HANDLE_POLL didn't return a diff", ): _core.run(sleep, 0)