import select import sys import attr from collections import defaultdict from typing import Dict, TYPE_CHECKING from .. import _core from ._run import _public from ._io_common import wake_all from ._wakeup_socketpair import WakeupSocketpair assert not TYPE_CHECKING or sys.platform == "linux" @attr.s(slots=True, eq=False, frozen=True) class _EpollStatistics: tasks_waiting_read = attr.ib() tasks_waiting_write = attr.ib() backend = attr.ib(default="epoll") # Some facts about epoll # ---------------------- # # Internally, an epoll object is sort of like a WeakKeyDictionary where the # keys are tuples of (fd number, file object). When you call epoll_ctl, you # pass in an fd; that gets converted to an (fd number, file object) tuple by # looking up the fd in the process's fd table at the time of the call. When an # event happens on the file object, epoll_wait drops the file object part, and # just returns the fd number in its event. So from the outside it looks like # it's keeping a table of fds, but really it's a bit more complicated. This # has some subtle consequences. # # In general, file objects inside the kernel are reference counted. Each entry # in a process's fd table holds a strong reference to the corresponding file # object, and most operations that use file objects take a temporary strong # reference while they're working. So when you call close() on an fd, that # might or might not cause the file object to be deallocated -- it depends on # whether there are any other references to that file object. Some common ways # this can happen: # # - after calling dup(), you have two fds in the same process referring to the # same file object. Even if you close one fd (= remove that entry from the # fd table), the file object will be kept alive by the other fd. # - when calling fork(), the child inherits a copy of the parent's fd table, # so all the file objects get another reference. (But if the fork() is # followed by exec(), then all of the child's fds that have the CLOEXEC flag # set will be closed at that point.) # - most syscalls that work on fds take a strong reference to the underlying # file object while they're using it. So there's one thread blocked in # read(fd), and then another thread calls close() on the last fd referring # to that object, the underlying file won't actually be closed until # after read() returns. # # However, epoll does *not* take a reference to any of the file objects in its # interest set (that's what makes it similar to a WeakKeyDictionary). File # objects inside an epoll interest set will be deallocated if all *other* # references to them are closed. And when that happens, the epoll object will # automatically deregister that file object and stop reporting events on it. # So that's quite handy. # # But, what happens if we do this? # # fd1 = open(...) # epoll_ctl(EPOLL_CTL_ADD, fd1, ...) # fd2 = dup(fd1) # close(fd1) # # In this case, the dup() keeps the underlying file object alive, so it # remains registered in the epoll object's interest set, as the tuple (fd1, # file object). But, fd1 no longer refers to this file object! You might think # there was some magic to handle this, but unfortunately no; the consequences # are totally predictable from what I said above: # # If any events occur on the file object, then epoll will report them as # happening on fd1, even though that doesn't make sense. # # Perhaps we would like to deregister fd1 to stop getting nonsensical events. # But how? When we call epoll_ctl, we have to pass an fd number, which will # get expanded to an (fd number, file object) tuple. We can't pass fd1, # because when epoll_ctl tries to look it up, it won't find our file object. # And we can't pass fd2, because that will get expanded to (fd2, file object), # which is a different lookup key. In fact, it's *impossible* to de-register # this fd! # # We could even have fd1 get assigned to another file object, and then we can # have multiple keys registered simultaneously using the same fd number, like: # (fd1, file object 1), (fd1, file object 2). And if events happen on either # file object, then epoll will happily report that something happened to # "fd1". # # Now here's what makes this especially nasty: suppose the old file object # becomes, say, readable. That means that every time we call epoll_wait, it # will return immediately to tell us that "fd1" is readable. Normally, we # would handle this by de-registering fd1, waking up the corresponding call to # wait_readable, then the user will call read() or recv() or something, and # we're fine. But if this happens on a stale fd where we can't remove the # registration, then we might get stuck in a state where epoll_wait *always* # returns immediately, so our event loop becomes unable to sleep, and now our # program is burning 100% of the CPU doing nothing, with no way out. # # # What does this mean for Trio? # ----------------------------- # # Since we don't control the user's code, we have no way to guarantee that we # don't get stuck with stale fd's in our epoll interest set. For example, a # user could call wait_readable(fd) in one task, and then while that's # running, they might close(fd) from another task. In this situation, they're # *supposed* to call notify_closing(fd) to let us know what's happening, so we # can interrupt the wait_readable() call and avoid getting into this mess. And # that's the only thing that can possibly work correctly in all cases. But # sometimes user code has bugs. So if this does happen, we'd like to degrade # gracefully, and survive without corrupting Trio's internal state or # otherwise causing the whole program to explode messily. # # Our solution: we always use EPOLLONESHOT. This way, we might get *one* # spurious event on a stale fd, but then epoll will automatically silence it # until we explicitly say that we want more events... and if we have a stale # fd, then we actually can't re-enable it! So we can't get stuck in an # infinite busy-loop. If there's a stale fd hanging around, then it might # cause a spurious `BusyResourceError`, or cause one wait_* call to return # before it should have... but in general, the wait_* functions are allowed to # have some spurious wakeups; the user code will just attempt the operation, # get EWOULDBLOCK, and call wait_* again. And the program as a whole will # survive, any exceptions will propagate, etc. # # As a bonus, EPOLLONESHOT also saves us having to explicitly deregister fds # on the normal wakeup path, so it's a bit more efficient in general. # # However, EPOLLONESHOT has a few trade-offs to consider: # # First, you can't combine EPOLLONESHOT with EPOLLEXCLUSIVE. This is a bit sad # in one somewhat rare case: if you have a multi-process server where a group # of processes all share the same listening socket, then EPOLLEXCLUSIVE can be # used to avoid "thundering herd" problems when a new connection comes in. But # this isn't too bad. It's not clear if EPOLLEXCLUSIVE even works for us # anyway: # # https://stackoverflow.com/questions/41582560/how-does-epolls-epollexclusive-mode-interact-with-level-triggering # # And it's not clear that EPOLLEXCLUSIVE is a great approach either: # # https://blog.cloudflare.com/the-sad-state-of-linux-socket-balancing/ # # And if we do need to support this, we could always add support through some # more-specialized API in the future. So this isn't a blocker to using # EPOLLONESHOT. # # Second, EPOLLONESHOT does not actually *deregister* the fd after delivering # an event (EPOLL_CTL_DEL). Instead, it keeps the fd registered, but # effectively does an EPOLL_CTL_MOD to set the fd's interest flags to # all-zeros. So we could still end up with an fd hanging around in the # interest set for a long time, even if we're not using it. # # Fortunately, this isn't a problem, because it's only a weak reference – if # we have a stale fd that's been silenced by EPOLLONESHOT, then it wastes a # tiny bit of kernel memory remembering this fd that can never be revived, but # when the underlying file object is eventually closed, that memory will be # reclaimed. So that's OK. # # The other issue is that when someone calls wait_*, using EPOLLONESHOT means # that if we have ever waited for this fd before, we have to use EPOLL_CTL_MOD # to re-enable it; but if it's a new fd, we have to use EPOLL_CTL_ADD. How do # we know which one to use? There's no reasonable way to track which fds are # currently registered -- remember, we're assuming the user might have gone # and rearranged their fds without telling us! # # Fortunately, this also has a simple solution: if we wait on a socket or # other fd once, then we'll probably wait on it lots of times. And the epoll # object itself knows which fds it already has registered. So when an fd comes # in, we optimistically assume that it's been waited on before, and try doing # EPOLL_CTL_MOD. And if that fails with an ENOENT error, then we try again # with EPOLL_CTL_ADD. # # So that's why this code is the way it is. And now you know more than you # wanted to about how epoll works. @attr.s(slots=True, eq=False) class EpollWaiters: read_task = attr.ib(default=None) write_task = attr.ib(default=None) current_flags = attr.ib(default=0) @attr.s(slots=True, eq=False, hash=False) class EpollIOManager: _epoll = attr.ib(factory=select.epoll) # {fd: EpollWaiters} _registered = attr.ib( factory=lambda: defaultdict(EpollWaiters), type=Dict[int, EpollWaiters] ) _force_wakeup = attr.ib(factory=WakeupSocketpair) _force_wakeup_fd = attr.ib(default=None) def __attrs_post_init__(self): self._epoll.register(self._force_wakeup.wakeup_sock, select.EPOLLIN) self._force_wakeup_fd = self._force_wakeup.wakeup_sock.fileno() def statistics(self): tasks_waiting_read = 0 tasks_waiting_write = 0 for waiter in self._registered.values(): if waiter.read_task is not None: tasks_waiting_read += 1 if waiter.write_task is not None: tasks_waiting_write += 1 return _EpollStatistics( tasks_waiting_read=tasks_waiting_read, tasks_waiting_write=tasks_waiting_write, ) def close(self): self._epoll.close() self._force_wakeup.close() def force_wakeup(self): self._force_wakeup.wakeup_thread_and_signal_safe() # Return value must be False-y IFF the timeout expired, NOT if any I/O # happened or force_wakeup was called. Otherwise it can be anything; gets # passed straight through to process_events. def get_events(self, timeout): # max_events must be > 0 or epoll gets cranky # accessing self._registered from a thread looks dangerous, but it's # OK because it doesn't matter if our value is a little bit off. max_events = max(1, len(self._registered)) return self._epoll.poll(timeout, max_events) def process_events(self, events): for fd, flags in events: if fd == self._force_wakeup_fd: self._force_wakeup.drain() continue waiters = self._registered[fd] # EPOLLONESHOT always clears the flags when an event is delivered waiters.current_flags = 0 # Clever hack stolen from selectors.EpollSelector: an event # with EPOLLHUP or EPOLLERR flags wakes both readers and # writers. if flags & ~select.EPOLLIN and waiters.write_task is not None: _core.reschedule(waiters.write_task) waiters.write_task = None if flags & ~select.EPOLLOUT and waiters.read_task is not None: _core.reschedule(waiters.read_task) waiters.read_task = None self._update_registrations(fd) def _update_registrations(self, fd): waiters = self._registered[fd] wanted_flags = 0 if waiters.read_task is not None: wanted_flags |= select.EPOLLIN if waiters.write_task is not None: wanted_flags |= select.EPOLLOUT if wanted_flags != waiters.current_flags: try: try: # First try EPOLL_CTL_MOD self._epoll.modify(fd, wanted_flags | select.EPOLLONESHOT) except OSError: # If that fails, it might be a new fd; try EPOLL_CTL_ADD self._epoll.register(fd, wanted_flags | select.EPOLLONESHOT) waiters.current_flags = wanted_flags except OSError as exc: # If everything fails, probably it's a bad fd, e.g. because # the fd was closed behind our back. In this case we don't # want to try to unregister the fd, because that will probably # fail too. Just clear our state and wake everyone up. del self._registered[fd] # This could raise (in case we're calling this inside one of # the to-be-woken tasks), so we have to do it last. wake_all(waiters, exc) return if not wanted_flags: del self._registered[fd] async def _epoll_wait(self, fd, attr_name): if not isinstance(fd, int): fd = fd.fileno() waiters = self._registered[fd] if getattr(waiters, attr_name) is not None: raise _core.BusyResourceError( "another task is already reading / writing this fd" ) setattr(waiters, attr_name, _core.current_task()) self._update_registrations(fd) def abort(_): setattr(waiters, attr_name, None) self._update_registrations(fd) return _core.Abort.SUCCEEDED await _core.wait_task_rescheduled(abort) @_public async def wait_readable(self, fd): await self._epoll_wait(fd, "read_task") @_public async def wait_writable(self, fd): await self._epoll_wait(fd, "write_task") @_public def notify_closing(self, fd): if not isinstance(fd, int): fd = fd.fileno() wake_all( self._registered[fd], _core.ClosedResourceError("another task closed this fd"), ) del self._registered[fd] try: self._epoll.unregister(fd) except (OSError, ValueError): pass