import select
|
import sys
|
from typing import TYPE_CHECKING
|
|
import outcome
|
from contextlib import contextmanager
|
import attr
|
import errno
|
|
from .. import _core
|
from ._run import _public
|
from ._wakeup_socketpair import WakeupSocketpair
|
|
assert not TYPE_CHECKING or (sys.platform != "linux" and sys.platform != "win32")
|
|
|
@attr.s(slots=True, eq=False, frozen=True)
|
class _KqueueStatistics:
|
tasks_waiting = attr.ib()
|
monitors = attr.ib()
|
backend = attr.ib(default="kqueue")
|
|
|
@attr.s(slots=True, eq=False)
|
class KqueueIOManager:
|
_kqueue = attr.ib(factory=select.kqueue)
|
# {(ident, filter): Task or UnboundedQueue}
|
_registered = attr.ib(factory=dict)
|
_force_wakeup = attr.ib(factory=WakeupSocketpair)
|
_force_wakeup_fd = attr.ib(default=None)
|
|
def __attrs_post_init__(self):
|
force_wakeup_event = select.kevent(
|
self._force_wakeup.wakeup_sock, select.KQ_FILTER_READ, select.KQ_EV_ADD
|
)
|
self._kqueue.control([force_wakeup_event], 0)
|
self._force_wakeup_fd = self._force_wakeup.wakeup_sock.fileno()
|
|
def statistics(self):
|
tasks_waiting = 0
|
monitors = 0
|
for receiver in self._registered.values():
|
if type(receiver) is _core.Task:
|
tasks_waiting += 1
|
else:
|
monitors += 1
|
return _KqueueStatistics(tasks_waiting=tasks_waiting, monitors=monitors)
|
|
def close(self):
|
self._kqueue.close()
|
self._force_wakeup.close()
|
|
def force_wakeup(self):
|
self._force_wakeup.wakeup_thread_and_signal_safe()
|
|
def get_events(self, timeout):
|
# max_events must be > 0 or kqueue gets cranky
|
# and we generally want this to be strictly larger than the actual
|
# number of events we get, so that we can tell that we've gotten
|
# all the events in just 1 call.
|
max_events = len(self._registered) + 1
|
events = []
|
while True:
|
batch = self._kqueue.control([], max_events, timeout)
|
events += batch
|
if len(batch) < max_events:
|
break
|
else:
|
timeout = 0
|
# and loop back to the start
|
return events
|
|
def process_events(self, events):
|
for event in events:
|
key = (event.ident, event.filter)
|
if event.ident == self._force_wakeup_fd:
|
self._force_wakeup.drain()
|
continue
|
receiver = self._registered[key]
|
if event.flags & select.KQ_EV_ONESHOT:
|
del self._registered[key]
|
if type(receiver) is _core.Task:
|
_core.reschedule(receiver, outcome.Value(event))
|
else:
|
receiver.put_nowait(event)
|
|
# kevent registration is complicated -- e.g. aio submission can
|
# implicitly perform a EV_ADD, and EVFILT_PROC with NOTE_TRACK will
|
# automatically register filters for child processes. So our lowlevel
|
# API is *very* low-level: we expose the kqueue itself for adding
|
# events or sticking into AIO submission structs, and split waiting
|
# off into separate methods. It's your responsibility to make sure
|
# that handle_io never receives an event without a corresponding
|
# registration! This may be challenging if you want to be careful
|
# about e.g. KeyboardInterrupt. Possibly this API could be improved to
|
# be more ergonomic...
|
|
@_public
|
def current_kqueue(self):
|
return self._kqueue
|
|
@contextmanager
|
@_public
|
def monitor_kevent(self, ident, filter):
|
key = (ident, filter)
|
if key in self._registered:
|
raise _core.BusyResourceError(
|
"attempt to register multiple listeners for same ident/filter pair"
|
)
|
q = _core.UnboundedQueue()
|
self._registered[key] = q
|
try:
|
yield q
|
finally:
|
del self._registered[key]
|
|
@_public
|
async def wait_kevent(self, ident, filter, abort_func):
|
key = (ident, filter)
|
if key in self._registered:
|
raise _core.BusyResourceError(
|
"attempt to register multiple listeners for same ident/filter pair"
|
)
|
self._registered[key] = _core.current_task()
|
|
def abort(raise_cancel):
|
r = abort_func(raise_cancel)
|
if r is _core.Abort.SUCCEEDED:
|
del self._registered[key]
|
return r
|
|
return await _core.wait_task_rescheduled(abort)
|
|
async def _wait_common(self, fd, filter):
|
if not isinstance(fd, int):
|
fd = fd.fileno()
|
flags = select.KQ_EV_ADD | select.KQ_EV_ONESHOT
|
event = select.kevent(fd, filter, flags)
|
self._kqueue.control([event], 0)
|
|
def abort(_):
|
event = select.kevent(fd, filter, select.KQ_EV_DELETE)
|
try:
|
self._kqueue.control([event], 0)
|
except OSError as exc:
|
# kqueue tracks individual fds (*not* the underlying file
|
# object, see _io_epoll.py for a long discussion of why this
|
# distinction matters), and automatically deregisters an event
|
# if the fd is closed. So if kqueue.control says that it
|
# doesn't know about this event, then probably it's because
|
# the fd was closed behind our backs. (Too bad we can't ask it
|
# to wake us up when this happens, versus discovering it after
|
# the fact... oh well, you can't have everything.)
|
#
|
# FreeBSD reports this using EBADF. macOS uses ENOENT.
|
if exc.errno in (errno.EBADF, errno.ENOENT): # pragma: no branch
|
pass
|
else: # pragma: no cover
|
# As far as we know, this branch can't happen.
|
raise
|
return _core.Abort.SUCCEEDED
|
|
await self.wait_kevent(fd, filter, abort)
|
|
@_public
|
async def wait_readable(self, fd):
|
await self._wait_common(fd, select.KQ_FILTER_READ)
|
|
@_public
|
async def wait_writable(self, fd):
|
await self._wait_common(fd, select.KQ_FILTER_WRITE)
|
|
@_public
|
def notify_closing(self, fd):
|
if not isinstance(fd, int):
|
fd = fd.fileno()
|
|
for filter in [select.KQ_FILTER_READ, select.KQ_FILTER_WRITE]:
|
key = (fd, filter)
|
receiver = self._registered.get(key)
|
|
if receiver is None:
|
continue
|
|
if type(receiver) is _core.Task:
|
event = select.kevent(fd, filter, select.KQ_EV_DELETE)
|
self._kqueue.control([event], 0)
|
exc = _core.ClosedResourceError("another task closed this fd")
|
_core.reschedule(receiver, outcome.Error(exc))
|
del self._registered[key]
|
else:
|
# XX this is an interesting example of a case where being able
|
# to close a queue would be useful...
|
raise NotImplementedError(
|
"can't close an fd that monitor_kevent is using"
|
)
|