# These are the only functions that ever yield back to the task runner. import types import enum import attr import outcome from . import _run # Helper for the bottommost 'yield'. You can't use 'yield' inside an async # function, but you can inside a generator, and if you decorate your generator # with @types.coroutine, then it's even awaitable. However, it's still not a # real async function: in particular, it isn't recognized by # inspect.iscoroutinefunction, and it doesn't trigger the unawaited coroutine # tracking machinery. Since our traps are public APIs, we make them real async # functions, and then this helper takes care of the actual yield: @types.coroutine def _async_yield(obj): return (yield obj) # This class object is used as a singleton. # Not exported in the trio._core namespace, but imported directly by _run. class CancelShieldedCheckpoint: pass async def cancel_shielded_checkpoint(): """Introduce a schedule point, but not a cancel point. This is *not* a :ref:`checkpoint `, but it is half of a checkpoint, and when combined with :func:`checkpoint_if_cancelled` it can make a full checkpoint. Equivalent to (but potentially more efficient than):: with trio.CancelScope(shield=True): await trio.lowlevel.checkpoint() """ return (await _async_yield(CancelShieldedCheckpoint)).unwrap() # Return values for abort functions class Abort(enum.Enum): """:class:`enum.Enum` used as the return value from abort functions. See :func:`wait_task_rescheduled` for details. .. data:: SUCCEEDED FAILED """ SUCCEEDED = 1 FAILED = 2 # Not exported in the trio._core namespace, but imported directly by _run. @attr.s(frozen=True) class WaitTaskRescheduled: abort_func = attr.ib() async def wait_task_rescheduled(abort_func): """Put the current task to sleep, with cancellation support. This is the lowest-level API for blocking in Trio. Every time a :class:`~trio.lowlevel.Task` blocks, it does so by calling this function (usually indirectly via some higher-level API). This is a tricky interface with no guard rails. If you can use :class:`ParkingLot` or the built-in I/O wait functions instead, then you should. Generally the way it works is that before calling this function, you make arrangements for "someone" to call :func:`reschedule` on the current task at some later point. Then you call :func:`wait_task_rescheduled`, passing in ``abort_func``, an "abort callback". (Terminology: in Trio, "aborting" is the process of attempting to interrupt a blocked task to deliver a cancellation.) There are two possibilities for what happens next: 1. "Someone" calls :func:`reschedule` on the current task, and :func:`wait_task_rescheduled` returns or raises whatever value or error was passed to :func:`reschedule`. 2. The call's context transitions to a cancelled state (e.g. due to a timeout expiring). When this happens, the ``abort_func`` is called. Its interface looks like:: def abort_func(raise_cancel): ... return trio.lowlevel.Abort.SUCCEEDED # or FAILED It should attempt to clean up any state associated with this call, and in particular, arrange that :func:`reschedule` will *not* be called later. If (and only if!) it is successful, then it should return :data:`Abort.SUCCEEDED`, in which case the task will automatically be rescheduled with an appropriate :exc:`~trio.Cancelled` error. Otherwise, it should return :data:`Abort.FAILED`. This means that the task can't be cancelled at this time, and still has to make sure that "someone" eventually calls :func:`reschedule`. At that point there are again two possibilities. You can simply ignore the cancellation altogether: wait for the operation to complete and then reschedule and continue as normal. (For example, this is what :func:`trio.to_thread.run_sync` does if cancellation is disabled.) The other possibility is that the ``abort_func`` does succeed in cancelling the operation, but for some reason isn't able to report that right away. (Example: on Windows, it's possible to request that an async ("overlapped") I/O operation be cancelled, but this request is *also* asynchronous – you don't find out until later whether the operation was actually cancelled or not.) To report a delayed cancellation, then you should reschedule the task yourself, and call the ``raise_cancel`` callback passed to ``abort_func`` to raise a :exc:`~trio.Cancelled` (or possibly :exc:`KeyboardInterrupt`) exception into this task. Either of the approaches sketched below can work:: # Option 1: # Catch the exception from raise_cancel and inject it into the task. # (This is what Trio does automatically for you if you return # Abort.SUCCEEDED.) trio.lowlevel.reschedule(task, outcome.capture(raise_cancel)) # Option 2: # wait to be woken by "someone", and then decide whether to raise # the error from inside the task. outer_raise_cancel = None def abort(inner_raise_cancel): nonlocal outer_raise_cancel outer_raise_cancel = inner_raise_cancel TRY_TO_CANCEL_OPERATION() return trio.lowlevel.Abort.FAILED await wait_task_rescheduled(abort) if OPERATION_WAS_SUCCESSFULLY_CANCELLED: # raises the error outer_raise_cancel() In any case it's guaranteed that we only call the ``abort_func`` at most once per call to :func:`wait_task_rescheduled`. Sometimes, it's useful to be able to share some mutable sleep-related data between the sleeping task, the abort function, and the waking task. You can use the sleeping task's :data:`~Task.custom_sleep_data` attribute to store this data, and Trio won't touch it, except to make sure that it gets cleared when the task is rescheduled. .. warning:: If your ``abort_func`` raises an error, or returns any value other than :data:`Abort.SUCCEEDED` or :data:`Abort.FAILED`, then Trio will crash violently. Be careful! Similarly, it is entirely possible to deadlock a Trio program by failing to reschedule a blocked task, or cause havoc by calling :func:`reschedule` too many times. Remember what we said up above about how you should use a higher-level API if at all possible? """ return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap() # Not exported in the trio._core namespace, but imported directly by _run. @attr.s(frozen=True) class PermanentlyDetachCoroutineObject: final_outcome = attr.ib() async def permanently_detach_coroutine_object(final_outcome): """Permanently detach the current task from the Trio scheduler. Normally, a Trio task doesn't exit until its coroutine object exits. When you call this function, Trio acts like the coroutine object just exited and the task terminates with the given outcome. This is useful if you want to permanently switch the coroutine object over to a different coroutine runner. When the calling coroutine enters this function it's running under Trio, and when the function returns it's running under the foreign coroutine runner. You should make sure that the coroutine object has released any Trio-specific resources it has acquired (e.g. nurseries). Args: final_outcome (outcome.Outcome): Trio acts as if the current task exited with the given return value or exception. Returns or raises whatever value or exception the new coroutine runner uses to resume the coroutine. """ if _run.current_task().child_nurseries: raise RuntimeError( "can't permanently detach a coroutine object with open nurseries" ) return await _async_yield(PermanentlyDetachCoroutineObject(final_outcome)) async def temporarily_detach_coroutine_object(abort_func): """Temporarily detach the current coroutine object from the Trio scheduler. When the calling coroutine enters this function it's running under Trio, and when the function returns it's running under the foreign coroutine runner. The Trio :class:`Task` will continue to exist, but will be suspended until you use :func:`reattach_detached_coroutine_object` to resume it. In the mean time, you can use another coroutine runner to schedule the coroutine object. In fact, you have to – the function doesn't return until the coroutine is advanced from outside. Note that you'll need to save the current :class:`Task` object to later resume; you can retrieve it with :func:`current_task`. You can also use this :class:`Task` object to retrieve the coroutine object – see :data:`Task.coro`. Args: abort_func: Same as for :func:`wait_task_rescheduled`, except that it must return :data:`Abort.FAILED`. (If it returned :data:`Abort.SUCCEEDED`, then Trio would attempt to reschedule the detached task directly without going through :func:`reattach_detached_coroutine_object`, which would be bad.) Your ``abort_func`` should still arrange for whatever the coroutine object is doing to be cancelled, and then reattach to Trio and call the ``raise_cancel`` callback, if possible. Returns or raises whatever value or exception the new coroutine runner uses to resume the coroutine. """ return await _async_yield(WaitTaskRescheduled(abort_func)) async def reattach_detached_coroutine_object(task, yield_value): """Reattach a coroutine object that was detached using :func:`temporarily_detach_coroutine_object`. When the calling coroutine enters this function it's running under the foreign coroutine runner, and when the function returns it's running under Trio. This must be called from inside the coroutine being resumed, and yields whatever value you pass in. (Presumably you'll pass a value that will cause the current coroutine runner to stop scheduling this task.) Then the coroutine is resumed by the Trio scheduler at the next opportunity. Args: task (Task): The Trio task object that the current coroutine was detached from. yield_value (object): The object to yield to the current coroutine runner. """ # This is a kind of crude check – in particular, it can fail if the # passed-in task is where the coroutine *runner* is running. But this is # an experts-only interface, and there's no easy way to do a more accurate # check, so I guess that's OK. if not task.coro.cr_running: raise RuntimeError("given task does not match calling coroutine") _run.reschedule(task, outcome.Value("reattaching")) value = await _async_yield(yield_value) assert value == outcome.Value("reattaching")