import itertools from contextlib import contextmanager import enum import socket import sys from typing import TYPE_CHECKING import attr from outcome import Value from .. import _core from ._run import _public from ._io_common import wake_all from ._windows_cffi import ( ffi, kernel32, ntdll, ws2_32, INVALID_HANDLE_VALUE, raise_winerror, _handle, ErrorCodes, FileFlags, AFDPollFlags, WSAIoctls, CompletionModes, IoControlCodes, ) assert not TYPE_CHECKING or sys.platform == "win32" # There's a lot to be said about the overall design of a Windows event # loop. See # # https://github.com/python-trio/trio/issues/52 # # for discussion. This now just has some lower-level notes: # # How IOCP fits together: # # The general model is that you call some function like ReadFile or WriteFile # to tell the kernel that you want it to perform some operation, and the # kernel goes off and does that in the background, then at some point later it # sends you a notification that the operation is complete. There are some more # exotic APIs that don't quite fit this pattern, but most APIs do. # # Each background operation is tracked using an OVERLAPPED struct, that # uniquely identifies that particular operation. # # An "IOCP" (or "I/O completion port") is an object that lets the kernel send # us these notifications -- basically it's just a kernel->userspace queue. # # Each IOCP notification is represented by an OVERLAPPED_ENTRY struct, which # contains 3 fields: # - The "completion key". This is an opaque integer that we pick, and use # however is convenient. # - pointer to the OVERLAPPED struct for the completed operation. # - dwNumberOfBytesTransferred (an integer). # # And in addition, for regular I/O, the OVERLAPPED structure gets filled in # with: # - result code (named "Internal") # - number of bytes transferred (named "InternalHigh"); usually redundant # with dwNumberOfBytesTransferred. # # There are also some other entries in OVERLAPPED which only matter on input: # - Offset and OffsetHigh which are inputs to {Read,Write}File and # otherwise always zero # - hEvent which is for if you aren't using IOCP; we always set it to zero. # # That describes the usual pattern for operations and the usual meaning of # these struct fields, but really these are just some arbitrary chunks of # bytes that get passed back and forth, so some operations like to overload # them to mean something else. # # You can also directly queue an OVERLAPPED_ENTRY object to an IOCP by calling # PostQueuedCompletionStatus. When you use this you get to set all the # OVERLAPPED_ENTRY fields to arbitrary values. # # You can request to cancel any operation if you know which handle it was # issued on + the OVERLAPPED struct that identifies it (via CancelIoEx). This # request might fail because the operation has already completed, or it might # be queued to happen in the background, so you only find out whether it # succeeded or failed later, when we get back the notification for the # operation being complete. # # There are three types of operations that we support: # # == Regular I/O operations on handles (e.g. files or named pipes) == # # Implemented by: register_with_iocp, wait_overlapped # # To use these, you have to register the handle with your IOCP first. Once # it's registered, any operations on that handle will automatically send # completion events to that IOCP, with a completion key that you specify *when # the handle is registered* (so you can't use different completion keys for # different operations). # # We give these two dedicated completion keys: CKeys.WAIT_OVERLAPPED for # regular operations, and CKeys.LATE_CANCEL that's used to make # wait_overlapped cancellable even if the user forgot to call # register_with_iocp. The problem here is that after we request the cancel, # wait_overlapped keeps blocking until it sees the completion notification... # but if the user forgot to register_with_iocp, then the completion will never # come, so the cancellation will never resolve. To avoid this, whenever we try # to cancel an I/O operation and the cancellation fails, we use # PostQueuedCompletionStatus to send a CKeys.LATE_CANCEL notification. If this # arrives before the real completion, we assume the user forgot to call # register_with_iocp on their handle, and raise an error accordingly. # # == Socket state notifications == # # Implemented by: wait_readable, wait_writable # # The public APIs that windows provides for this are all really awkward and # don't integrate with IOCP. So we drop down to a lower level, and talk # directly to the socket device driver in the kernel, which is called "AFD". # Unfortunately, this is a totally undocumented internal API. Fortunately # libuv also does this, so we can be pretty confident that MS won't break it # on us, and there is a *little* bit of information out there if you go # digging. # # Basically: we open a magic file that refers to the AFD driver, register the # magic file with our IOCP, and then we can issue regular overlapped I/O # operations on that handle. Specifically, the operation we use is called # IOCTL_AFD_POLL, which lets us pass in a buffer describing which events we're # interested in on a given socket (readable, writable, etc.). Later, when the # operation completes, the kernel rewrites the buffer we passed in to record # which events happened, and uses IOCP as normal to notify us that this # operation has completed. # # Unfortunately, the Windows kernel seems to have bugs if you try to issue # multiple simultaneous IOCTL_AFD_POLL operations on the same socket (see # notes-to-self/afd-lab.py). So if a user calls wait_readable and # wait_writable at the same time, we have to combine those into a single # IOCTL_AFD_POLL. This means we can't just use the wait_overlapped machinery. # Instead we have some dedicated code to handle these operations, and a # dedicated completion key CKeys.AFD_POLL. # # Sources of information: # - https://github.com/python-trio/trio/issues/52 # - Wepoll: https://github.com/piscisaureus/wepoll/ # - libuv: https://github.com/libuv/libuv/ # - ReactOS: https://github.com/reactos/reactos/ # - Ancient leaked copies of the Windows NT and Winsock source code: # https://github.com/pustladi/Windows-2000/blob/661d000d50637ed6fab2329d30e31775046588a9/private/net/sockets/winsock2/wsp/msafd/select.c#L59-L655 # https://github.com/metoo10987/WinNT4/blob/f5c14e6b42c8f45c20fe88d14c61f9d6e0386b8e/private/ntos/afd/poll.c#L68-L707 # - The WSAEventSelect docs (this exposes a finer-grained set of events than # select(), so if you squint you can treat it as a source of information on # the fine-grained AFD poll types) # # # == Everything else == # # There are also some weirder APIs for interacting with IOCP. For example, the # "Job" API lets you specify an IOCP handle and "completion key", and then in # the future whenever certain events happen it sends uses IOCP to send a # notification. These notifications don't correspond to any particular # operation; they're just spontaneous messages you get. The # "dwNumberOfBytesTransferred" field gets repurposed to carry an identifier # for the message type (e.g. JOB_OBJECT_MSG_EXIT_PROCESS), and the # "lpOverlapped" field gets repurposed to carry some arbitrary data that # depends on the message type (e.g. the pid of the process that exited). # # To handle these, we have monitor_completion_key, where we hand out an # unassigned completion key, let users set it up however they want, and then # get any events that arrive on that key. # # (Note: monitor_completion_key is not documented or fully baked; expect it to # change in the future.) # Our completion keys class CKeys(enum.IntEnum): AFD_POLL = 0 WAIT_OVERLAPPED = 1 LATE_CANCEL = 2 FORCE_WAKEUP = 3 USER_DEFINED = 4 # and above def _check(success): if not success: raise_winerror() return success def _get_underlying_socket(sock, *, which=WSAIoctls.SIO_BASE_HANDLE): if hasattr(sock, "fileno"): sock = sock.fileno() base_ptr = ffi.new("HANDLE *") out_size = ffi.new("DWORD *") failed = ws2_32.WSAIoctl( ffi.cast("SOCKET", sock), which, ffi.NULL, 0, base_ptr, ffi.sizeof("HANDLE"), out_size, ffi.NULL, ffi.NULL, ) if failed: code = ws2_32.WSAGetLastError() raise_winerror(code) return base_ptr[0] def _get_base_socket(sock): # There is a development kit for LSPs called Komodia Redirector. # It does some unusual (some might say evil) things like intercepting # SIO_BASE_HANDLE (fails) and SIO_BSP_HANDLE_SELECT (returns the same # socket) in a misguided attempt to prevent bypassing it. It's been used # in malware including the infamous Lenovo Superfish incident from 2015, # but unfortunately is also used in some legitimate products such as # parental control tools and Astrill VPN. Komodia happens to not # block SIO_BSP_HANDLE_POLL, so we'll try SIO_BASE_HANDLE and fall back # to SIO_BSP_HANDLE_POLL if it doesn't work. # References: # - https://github.com/piscisaureus/wepoll/blob/0598a791bf9cbbf480793d778930fc635b044980/wepoll.c#L2223 # - https://github.com/tokio-rs/mio/issues/1314 while True: try: # If this is not a Komodia-intercepted socket, we can just use # SIO_BASE_HANDLE. return _get_underlying_socket(sock) except OSError as ex: if ex.winerror == ErrorCodes.ERROR_NOT_SOCKET: # SIO_BASE_HANDLE might fail even without LSP intervention, # if we get something that's not a socket. raise if hasattr(sock, "fileno"): sock = sock.fileno() sock = _handle(sock) next_sock = _get_underlying_socket( sock, which=WSAIoctls.SIO_BSP_HANDLE_POLL ) if next_sock == sock: # If BSP_HANDLE_POLL returns the same socket we already had, # then there's no layering going on and we need to fail # to prevent an infinite loop. raise RuntimeError( "Unexpected network configuration detected: " "SIO_BASE_HANDLE failed and SIO_BSP_HANDLE_POLL didn't " "return a different socket. Please file a bug at " "https://github.com/python-trio/trio/issues/new, " "and include the output of running: " "netsh winsock show catalog" ) # Otherwise we've gotten at least one layer deeper, so # loop back around to keep digging. sock = next_sock def _afd_helper_handle(): # The "AFD" driver is exposed at the NT path "\Device\Afd". We're using # the Win32 CreateFile, though, so we have to pass a Win32 path. \\.\ is # how Win32 refers to the NT \GLOBAL??\ directory, and GLOBALROOT is a # symlink inside that directory that points to the root of the NT path # system. So by sticking that in front of the NT path, we get a Win32 # path. Alternatively, we could use NtCreateFile directly, since it takes # an NT path. But we already wrap CreateFileW so this was easier. # References: # https://blogs.msdn.microsoft.com/jeremykuhne/2016/05/02/dos-to-nt-a-paths-journey/ # https://stackoverflow.com/a/21704022 # # I'm actually not sure what the \Trio part at the end of the path does. # Wepoll uses \Device\Afd\Wepoll, so I just copied them. (I'm guessing it # might be visible in some debug tools, and is otherwise arbitrary?) rawname = r"\\.\GLOBALROOT\Device\Afd\Trio".encode("utf-16le") + b"\0\0" rawname_buf = ffi.from_buffer(rawname) handle = kernel32.CreateFileW( ffi.cast("LPCWSTR", rawname_buf), FileFlags.SYNCHRONIZE, FileFlags.FILE_SHARE_READ | FileFlags.FILE_SHARE_WRITE, ffi.NULL, # no security attributes FileFlags.OPEN_EXISTING, FileFlags.FILE_FLAG_OVERLAPPED, ffi.NULL, # no template file ) if handle == INVALID_HANDLE_VALUE: # pragma: no cover raise_winerror() return handle # AFD_POLL has a finer-grained set of events than other APIs. We collapse them # down into Unix-style "readable" and "writable". # # Note: AFD_POLL_LOCAL_CLOSE isn't a reliable substitute for notify_closing(), # because even if the user closes the socket *handle*, the socket *object* # could still remain open, e.g. if the socket was dup'ed (possibly into # another process). Explicitly calling notify_closing() guarantees that # everyone waiting on the *handle* wakes up, which is what you'd expect. # # However, we can't avoid getting LOCAL_CLOSE notifications -- the kernel # delivers them whether we ask for them or not -- so better to include them # here for documentation, and so that when we check (delivered & requested) we # get a match. READABLE_FLAGS = ( AFDPollFlags.AFD_POLL_RECEIVE | AFDPollFlags.AFD_POLL_ACCEPT | AFDPollFlags.AFD_POLL_DISCONNECT # other side sent an EOF | AFDPollFlags.AFD_POLL_ABORT | AFDPollFlags.AFD_POLL_LOCAL_CLOSE ) WRITABLE_FLAGS = ( AFDPollFlags.AFD_POLL_SEND | AFDPollFlags.AFD_POLL_CONNECT_FAIL | AFDPollFlags.AFD_POLL_ABORT | AFDPollFlags.AFD_POLL_LOCAL_CLOSE ) # Annoyingly, while the API makes it *seem* like you can happily issue as many # independent AFD_POLL operations as you want without them interfering with # each other, in fact if you issue two AFD_POLL operations for the same socket # at the same time with notification going to the same IOCP port, then Windows # gets super confused. For example, if we issue one operation from # wait_readable, and another independent operation from wait_writable, then # Windows may complete the wait_writable operation when the socket becomes # readable. # # To avoid this, we have to coalesce all the operations on a single socket # into one, and when the set of waiters changes we have to throw away the old # operation and start a new one. @attr.s(slots=True, eq=False) class AFDWaiters: read_task = attr.ib(default=None) write_task = attr.ib(default=None) current_op = attr.ib(default=None) # We also need to bundle up all the info for a single op into a standalone # object, because we need to keep all these objects alive until the operation # finishes, even if we're throwing it away. @attr.s(slots=True, eq=False, frozen=True) class AFDPollOp: lpOverlapped = attr.ib() poll_info = attr.ib() waiters = attr.ib() afd_group = attr.ib() # The Windows kernel has a weird issue when using AFD handles. If you have N # instances of wait_readable/wait_writable registered with a single AFD handle, # then cancelling any one of them takes something like O(N**2) time. So if we # used just a single AFD handle, then cancellation would quickly become very # expensive, e.g. a program with N active sockets would take something like # O(N**3) time to unwind after control-C. The solution is to spread our sockets # out over multiple AFD handles, so that N doesn't grow too large for any # individual handle. MAX_AFD_GROUP_SIZE = 500 # at 1000, the cubic scaling is just starting to bite @attr.s(slots=True, eq=False) class AFDGroup: size = attr.ib() handle = attr.ib() @attr.s(slots=True, eq=False, frozen=True) class _WindowsStatistics: tasks_waiting_read = attr.ib() tasks_waiting_write = attr.ib() tasks_waiting_overlapped = attr.ib() completion_key_monitors = attr.ib() backend = attr.ib(default="windows") # Maximum number of events to dequeue from the completion port on each pass # through the run loop. Somewhat arbitrary. Should be large enough to collect # a good set of tasks on each loop, but not so large to waste tons of memory. # (Each WindowsIOManager holds a buffer whose size is ~32x this number.) MAX_EVENTS = 1000 @attr.s(frozen=True) class CompletionKeyEventInfo: lpOverlapped = attr.ib() dwNumberOfBytesTransferred = attr.ib() class WindowsIOManager: def __init__(self): # If this method raises an exception, then __del__ could run on a # half-initialized object. So we initialize everything that __del__ # touches to safe values up front, before we do anything that can # fail. self._iocp = None self._all_afd_handles = [] self._iocp = _check( kernel32.CreateIoCompletionPort(INVALID_HANDLE_VALUE, ffi.NULL, 0, 0) ) self._events = ffi.new("OVERLAPPED_ENTRY[]", MAX_EVENTS) self._vacant_afd_groups = set() # {lpOverlapped: AFDPollOp} self._afd_ops = {} # {socket handle: AFDWaiters} self._afd_waiters = {} # {lpOverlapped: task} self._overlapped_waiters = {} self._posted_too_late_to_cancel = set() self._completion_key_queues = {} self._completion_key_counter = itertools.count(CKeys.USER_DEFINED) with socket.socket() as s: # We assume we're not working with any LSP that changes # how select() is supposed to work. Validate this by # ensuring that the result of SIO_BSP_HANDLE_SELECT (the # LSP-hookable mechanism for "what should I use for # select()?") matches that of SIO_BASE_HANDLE ("what is # the real non-hooked underlying socket here?"). # # This doesn't work for Komodia-based LSPs; see the comments # in _get_base_socket() for details. But we have special # logic for those, so we just skip this check if # SIO_BASE_HANDLE fails. # LSPs can in theory override this, but we believe that it never # actually happens in the wild (except Komodia) select_handle = _get_underlying_socket( s, which=WSAIoctls.SIO_BSP_HANDLE_SELECT ) try: # LSPs shouldn't override this... base_handle = _get_underlying_socket(s, which=WSAIoctls.SIO_BASE_HANDLE) except OSError: # But Komodia-based LSPs do anyway, in a way that causes # a failure with WSAEFAULT. We have special handling for # them in _get_base_socket(). Make sure it works. _get_base_socket(s) else: if base_handle != select_handle: raise RuntimeError( "Unexpected network configuration detected: " "SIO_BASE_HANDLE and SIO_BSP_HANDLE_SELECT differ. " "Please file a bug at " "https://github.com/python-trio/trio/issues/new, " "and include the output of running: " "netsh winsock show catalog" ) def close(self): try: if self._iocp is not None: iocp = self._iocp self._iocp = None _check(kernel32.CloseHandle(iocp)) finally: while self._all_afd_handles: afd_handle = self._all_afd_handles.pop() _check(kernel32.CloseHandle(afd_handle)) def __del__(self): self.close() def statistics(self): tasks_waiting_read = 0 tasks_waiting_write = 0 for waiter in self._afd_waiters.values(): if waiter.read_task is not None: tasks_waiting_read += 1 if waiter.write_task is not None: tasks_waiting_write += 1 return _WindowsStatistics( tasks_waiting_read=tasks_waiting_read, tasks_waiting_write=tasks_waiting_write, tasks_waiting_overlapped=len(self._overlapped_waiters), completion_key_monitors=len(self._completion_key_queues), ) def force_wakeup(self): _check( kernel32.PostQueuedCompletionStatus( self._iocp, 0, CKeys.FORCE_WAKEUP, ffi.NULL ) ) def get_events(self, timeout): received = ffi.new("PULONG") milliseconds = round(1000 * timeout) if timeout > 0 and milliseconds == 0: milliseconds = 1 try: _check( kernel32.GetQueuedCompletionStatusEx( self._iocp, self._events, MAX_EVENTS, received, milliseconds, 0 ) ) except OSError as exc: if exc.winerror != ErrorCodes.WAIT_TIMEOUT: # pragma: no cover raise return 0 return received[0] def process_events(self, received): for i in range(received): entry = self._events[i] if entry.lpCompletionKey == CKeys.AFD_POLL: lpo = entry.lpOverlapped op = self._afd_ops.pop(lpo) waiters = op.waiters if waiters.current_op is not op: # Stale op, nothing to do pass else: waiters.current_op = None # I don't think this can happen, so if it does let's crash # and get a debug trace. if lpo.Internal != 0: # pragma: no cover code = ntdll.RtlNtStatusToDosError(lpo.Internal) raise_winerror(code) flags = op.poll_info.Handles[0].Events if waiters.read_task and flags & READABLE_FLAGS: _core.reschedule(waiters.read_task) waiters.read_task = None if waiters.write_task and flags & WRITABLE_FLAGS: _core.reschedule(waiters.write_task) waiters.write_task = None self._refresh_afd(op.poll_info.Handles[0].Handle) elif entry.lpCompletionKey == CKeys.WAIT_OVERLAPPED: # Regular I/O event, dispatch on lpOverlapped waiter = self._overlapped_waiters.pop(entry.lpOverlapped) overlapped = entry.lpOverlapped transferred = entry.dwNumberOfBytesTransferred info = CompletionKeyEventInfo( lpOverlapped=overlapped, dwNumberOfBytesTransferred=transferred ) _core.reschedule(waiter, Value(info)) elif entry.lpCompletionKey == CKeys.LATE_CANCEL: # Post made by a regular I/O event's abort_fn # after it failed to cancel the I/O. If we still # have a waiter with this lpOverlapped, we didn't # get the regular I/O completion and almost # certainly the user forgot to call # register_with_iocp. self._posted_too_late_to_cancel.remove(entry.lpOverlapped) try: waiter = self._overlapped_waiters.pop(entry.lpOverlapped) except KeyError: # Looks like the actual completion got here before this # fallback post did -- we're in the "expected" case of # too-late-to-cancel, where the user did nothing wrong. # Nothing more to do. pass else: exc = _core.TrioInternalError( "Failed to cancel overlapped I/O in {} and didn't " "receive the completion either. Did you forget to " "call register_with_iocp()?".format(waiter.name) ) # Raising this out of handle_io ensures that # the user will see our message even if some # other task is in an uncancellable wait due # to the same underlying forgot-to-register # issue (if their CancelIoEx succeeds, we # have no way of noticing that their completion # won't arrive). Unfortunately it loses the # task traceback. If you're debugging this # error and can't tell where it's coming from, # try changing this line to # _core.reschedule(waiter, outcome.Error(exc)) raise exc elif entry.lpCompletionKey == CKeys.FORCE_WAKEUP: pass else: # dispatch on lpCompletionKey queue = self._completion_key_queues[entry.lpCompletionKey] overlapped = int(ffi.cast("uintptr_t", entry.lpOverlapped)) transferred = entry.dwNumberOfBytesTransferred info = CompletionKeyEventInfo( lpOverlapped=overlapped, dwNumberOfBytesTransferred=transferred ) queue.put_nowait(info) def _register_with_iocp(self, handle, completion_key): handle = _handle(handle) _check(kernel32.CreateIoCompletionPort(handle, self._iocp, completion_key, 0)) # Supposedly this makes things slightly faster, by disabling the # ability to do WaitForSingleObject(handle). We would never want to do # that anyway, so might as well get the extra speed (if any). # Ref: http://www.lenholgate.com/blog/2009/09/interesting-blog-posts-on-high-performance-servers.html _check( kernel32.SetFileCompletionNotificationModes( handle, CompletionModes.FILE_SKIP_SET_EVENT_ON_HANDLE ) ) ################################################################ # AFD stuff ################################################################ def _refresh_afd(self, base_handle): waiters = self._afd_waiters[base_handle] if waiters.current_op is not None: afd_group = waiters.current_op.afd_group try: _check( kernel32.CancelIoEx( afd_group.handle, waiters.current_op.lpOverlapped ) ) except OSError as exc: if exc.winerror != ErrorCodes.ERROR_NOT_FOUND: # I don't think this is possible, so if it happens let's # crash noisily. raise # pragma: no cover waiters.current_op = None afd_group.size -= 1 self._vacant_afd_groups.add(afd_group) flags = 0 if waiters.read_task is not None: flags |= READABLE_FLAGS if waiters.write_task is not None: flags |= WRITABLE_FLAGS if not flags: del self._afd_waiters[base_handle] else: try: afd_group = self._vacant_afd_groups.pop() except KeyError: afd_group = AFDGroup(0, _afd_helper_handle()) self._register_with_iocp(afd_group.handle, CKeys.AFD_POLL) self._all_afd_handles.append(afd_group.handle) self._vacant_afd_groups.add(afd_group) lpOverlapped = ffi.new("LPOVERLAPPED") poll_info = ffi.new("AFD_POLL_INFO *") poll_info.Timeout = 2**63 - 1 # INT64_MAX poll_info.NumberOfHandles = 1 poll_info.Exclusive = 0 poll_info.Handles[0].Handle = base_handle poll_info.Handles[0].Status = 0 poll_info.Handles[0].Events = flags try: _check( kernel32.DeviceIoControl( afd_group.handle, IoControlCodes.IOCTL_AFD_POLL, poll_info, ffi.sizeof("AFD_POLL_INFO"), poll_info, ffi.sizeof("AFD_POLL_INFO"), ffi.NULL, lpOverlapped, ) ) except OSError as exc: if exc.winerror != ErrorCodes.ERROR_IO_PENDING: # This could happen if the socket handle got closed behind # our back while a wait_* call was pending, and we tried # to re-issue the call. Clear our state and wake up any # pending calls. del self._afd_waiters[base_handle] # Do this last, because it could raise. wake_all(waiters, exc) return op = AFDPollOp(lpOverlapped, poll_info, waiters, afd_group) waiters.current_op = op self._afd_ops[lpOverlapped] = op afd_group.size += 1 if afd_group.size >= MAX_AFD_GROUP_SIZE: self._vacant_afd_groups.remove(afd_group) async def _afd_poll(self, sock, mode): base_handle = _get_base_socket(sock) waiters = self._afd_waiters.get(base_handle) if waiters is None: waiters = AFDWaiters() self._afd_waiters[base_handle] = waiters if getattr(waiters, mode) is not None: raise _core.BusyResourceError setattr(waiters, mode, _core.current_task()) # Could potentially raise if the handle is somehow invalid; that's OK, # we let it escape. self._refresh_afd(base_handle) def abort_fn(_): setattr(waiters, mode, None) self._refresh_afd(base_handle) return _core.Abort.SUCCEEDED await _core.wait_task_rescheduled(abort_fn) @_public async def wait_readable(self, sock): await self._afd_poll(sock, "read_task") @_public async def wait_writable(self, sock): await self._afd_poll(sock, "write_task") @_public def notify_closing(self, handle): handle = _get_base_socket(handle) waiters = self._afd_waiters.get(handle) if waiters is not None: wake_all(waiters, _core.ClosedResourceError()) self._refresh_afd(handle) ################################################################ # Regular overlapped operations ################################################################ @_public def register_with_iocp(self, handle): self._register_with_iocp(handle, CKeys.WAIT_OVERLAPPED) @_public async def wait_overlapped(self, handle, lpOverlapped): handle = _handle(handle) if isinstance(lpOverlapped, int): lpOverlapped = ffi.cast("LPOVERLAPPED", lpOverlapped) if lpOverlapped in self._overlapped_waiters: raise _core.BusyResourceError( "another task is already waiting on that lpOverlapped" ) task = _core.current_task() self._overlapped_waiters[lpOverlapped] = task raise_cancel = None def abort(raise_cancel_): nonlocal raise_cancel raise_cancel = raise_cancel_ try: _check(kernel32.CancelIoEx(handle, lpOverlapped)) except OSError as exc: if exc.winerror == ErrorCodes.ERROR_NOT_FOUND: # Too late to cancel. If this happens because the # operation is already completed, we don't need to do # anything; we'll get a notification of that completion # soon. But another possibility is that the operation was # performed on a handle that wasn't registered with our # IOCP (ie, the user forgot to call register_with_iocp), # in which case we're just never going to see the # completion. To avoid an uncancellable infinite sleep in # the latter case, we'll PostQueuedCompletionStatus here, # and if our post arrives before the original completion # does, we'll assume the handle wasn't registered. _check( kernel32.PostQueuedCompletionStatus( self._iocp, 0, CKeys.LATE_CANCEL, lpOverlapped ) ) # Keep the lpOverlapped referenced so its address # doesn't get reused until our posted completion # status has been processed. Otherwise, we can # get confused about which completion goes with # which I/O. self._posted_too_late_to_cancel.add(lpOverlapped) else: # pragma: no cover raise _core.TrioInternalError( "CancelIoEx failed with unexpected error" ) from exc return _core.Abort.FAILED info = await _core.wait_task_rescheduled(abort) if lpOverlapped.Internal != 0: # the lpOverlapped reports the error as an NT status code, # which we must convert back to a Win32 error code before # it will produce the right sorts of exceptions code = ntdll.RtlNtStatusToDosError(lpOverlapped.Internal) if code == ErrorCodes.ERROR_OPERATION_ABORTED: if raise_cancel is not None: raise_cancel() else: # We didn't request this cancellation, so assume # it happened due to the underlying handle being # closed before the operation could complete. raise _core.ClosedResourceError("another task closed this resource") else: raise_winerror(code) return info async def _perform_overlapped(self, handle, submit_fn): # submit_fn(lpOverlapped) submits some I/O # it may raise an OSError with ERROR_IO_PENDING # the handle must already be registered using # register_with_iocp(handle) # This always does a schedule point, but it's possible that the # operation will not be cancellable, depending on how Windows is # feeling today. So we need to check for cancellation manually. await _core.checkpoint_if_cancelled() lpOverlapped = ffi.new("LPOVERLAPPED") try: submit_fn(lpOverlapped) except OSError as exc: if exc.winerror != ErrorCodes.ERROR_IO_PENDING: raise await self.wait_overlapped(handle, lpOverlapped) return lpOverlapped @_public async def write_overlapped(self, handle, data, file_offset=0): with ffi.from_buffer(data) as cbuf: def submit_write(lpOverlapped): # yes, these are the real documented names offset_fields = lpOverlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME offset_fields.Offset = file_offset & 0xFFFFFFFF offset_fields.OffsetHigh = file_offset >> 32 _check( kernel32.WriteFile( _handle(handle), ffi.cast("LPCVOID", cbuf), len(cbuf), ffi.NULL, lpOverlapped, ) ) lpOverlapped = await self._perform_overlapped(handle, submit_write) # this is "number of bytes transferred" return lpOverlapped.InternalHigh @_public async def readinto_overlapped(self, handle, buffer, file_offset=0): with ffi.from_buffer(buffer, require_writable=True) as cbuf: def submit_read(lpOverlapped): offset_fields = lpOverlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME offset_fields.Offset = file_offset & 0xFFFFFFFF offset_fields.OffsetHigh = file_offset >> 32 _check( kernel32.ReadFile( _handle(handle), ffi.cast("LPVOID", cbuf), len(cbuf), ffi.NULL, lpOverlapped, ) ) lpOverlapped = await self._perform_overlapped(handle, submit_read) return lpOverlapped.InternalHigh ################################################################ # Raw IOCP operations ################################################################ @_public def current_iocp(self): return int(ffi.cast("uintptr_t", self._iocp)) @contextmanager @_public def monitor_completion_key(self): key = next(self._completion_key_counter) queue = _core.UnboundedQueue() self._completion_key_queues[key] = queue try: yield (key, queue) finally: del self._completion_key_queues[key]