# *********************************************************** # ******* WARNING: AUTOGENERATED! ALL EDITS WILL BE LOST ****** # ************************************************************* from ._run import GLOBAL_RUN_CONTEXT, _NO_SEND from ._ki import LOCALS_KEY_KI_PROTECTION_ENABLED from ._instrumentation import Instrument # fmt: off def current_statistics(): """Returns an object containing run-loop-level debugging information. Currently the following fields are defined: * ``tasks_living`` (int): The number of tasks that have been spawned and not yet exited. * ``tasks_runnable`` (int): The number of tasks that are currently queued on the run queue (as opposed to blocked waiting for something to happen). * ``seconds_to_next_deadline`` (float): The time until the next pending cancel scope deadline. May be negative if the deadline has expired but we haven't yet processed cancellations. May be :data:`~math.inf` if there are no pending deadlines. * ``run_sync_soon_queue_size`` (int): The number of unprocessed callbacks queued via :meth:`trio.lowlevel.TrioToken.run_sync_soon`. * ``io_statistics`` (object): Some statistics from Trio's I/O backend. This always has an attribute ``backend`` which is a string naming which operating-system-specific I/O backend is in use; the other attributes vary between backends. """ locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: return GLOBAL_RUN_CONTEXT.runner.current_statistics() except AttributeError: raise RuntimeError("must be called from async context") def current_time(): """Returns the current time according to Trio's internal clock. Returns: float: The current time. Raises: RuntimeError: if not inside a call to :func:`trio.run`. """ locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: return GLOBAL_RUN_CONTEXT.runner.current_time() except AttributeError: raise RuntimeError("must be called from async context") def current_clock(): """Returns the current :class:`~trio.abc.Clock`.""" locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: return GLOBAL_RUN_CONTEXT.runner.current_clock() except AttributeError: raise RuntimeError("must be called from async context") def current_root_task(): """Returns the current root :class:`Task`. This is the task that is the ultimate parent of all other tasks. """ locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: return GLOBAL_RUN_CONTEXT.runner.current_root_task() except AttributeError: raise RuntimeError("must be called from async context") def reschedule(task, next_send=_NO_SEND): """Reschedule the given task with the given :class:`outcome.Outcome`. See :func:`wait_task_rescheduled` for the gory details. There must be exactly one call to :func:`reschedule` for every call to :func:`wait_task_rescheduled`. (And when counting, keep in mind that returning :data:`Abort.SUCCEEDED` from an abort callback is equivalent to calling :func:`reschedule` once.) Args: task (trio.lowlevel.Task): the task to be rescheduled. Must be blocked in a call to :func:`wait_task_rescheduled`. next_send (outcome.Outcome): the value (or error) to return (or raise) from :func:`wait_task_rescheduled`. """ locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: return GLOBAL_RUN_CONTEXT.runner.reschedule(task, next_send) except AttributeError: raise RuntimeError("must be called from async context") def spawn_system_task(async_fn, *args, name=None, context=None): """Spawn a "system" task. System tasks have a few differences from regular tasks: * They don't need an explicit nursery; instead they go into the internal "system nursery". * If a system task raises an exception, then it's converted into a :exc:`~trio.TrioInternalError` and *all* tasks are cancelled. If you write a system task, you should be careful to make sure it doesn't crash. * System tasks are automatically cancelled when the main task exits. * By default, system tasks have :exc:`KeyboardInterrupt` protection *enabled*. If you want your task to be interruptible by control-C, then you need to use :func:`disable_ki_protection` explicitly (and come up with some plan for what to do with a :exc:`KeyboardInterrupt`, given that system tasks aren't allowed to raise exceptions). * System tasks do not inherit context variables from their creator. Towards the end of a call to :meth:`trio.run`, after the main task and all system tasks have exited, the system nursery becomes closed. At this point, new calls to :func:`spawn_system_task` will raise ``RuntimeError("Nursery is closed to new arrivals")`` instead of creating a system task. It's possible to encounter this state either in a ``finally`` block in an async generator, or in a callback passed to :meth:`TrioToken.run_sync_soon` at the right moment. Args: async_fn: An async callable. args: Positional arguments for ``async_fn``. If you want to pass keyword arguments, use :func:`functools.partial`. name: The name for this task. Only used for debugging/introspection (e.g. ``repr(task_obj)``). If this isn't a string, :func:`spawn_system_task` will try to make it one. A common use case is if you're wrapping a function before spawning a new task, you might pass the original function as the ``name=`` to make debugging easier. context: An optional ``contextvars.Context`` object with context variables to use for this task. You would normally get a copy of the current context with ``context = contextvars.copy_context()`` and then you would pass that ``context`` object here. Returns: Task: the newly spawned task """ locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: return GLOBAL_RUN_CONTEXT.runner.spawn_system_task(async_fn, *args, name=name, context=context) except AttributeError: raise RuntimeError("must be called from async context") def current_trio_token(): """Retrieve the :class:`TrioToken` for the current call to :func:`trio.run`. """ locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: return GLOBAL_RUN_CONTEXT.runner.current_trio_token() except AttributeError: raise RuntimeError("must be called from async context") async def wait_all_tasks_blocked(cushion=0.0): """Block until there are no runnable tasks. This is useful in testing code when you want to give other tasks a chance to "settle down". The calling task is blocked, and doesn't wake up until all other tasks are also blocked for at least ``cushion`` seconds. (Setting a non-zero ``cushion`` is intended to handle cases like two tasks talking to each other over a local socket, where we want to ignore the potential brief moment between a send and receive when all tasks are blocked.) Note that ``cushion`` is measured in *real* time, not the Trio clock time. If there are multiple tasks blocked in :func:`wait_all_tasks_blocked`, then the one with the shortest ``cushion`` is the one woken (and this task becoming unblocked resets the timers for the remaining tasks). If there are multiple tasks that have exactly the same ``cushion``, then all are woken. You should also consider :class:`trio.testing.Sequencer`, which provides a more explicit way to control execution ordering within a test, and will often produce more readable tests. Example: Here's an example of one way to test that Trio's locks are fair: we take the lock in the parent, start a child, wait for the child to be blocked waiting for the lock (!), and then check that we can't release and immediately re-acquire the lock:: async def lock_taker(lock): await lock.acquire() lock.release() async def test_lock_fairness(): lock = trio.Lock() await lock.acquire() async with trio.open_nursery() as nursery: nursery.start_soon(lock_taker, lock) # child hasn't run yet, we have the lock assert lock.locked() assert lock._owner is trio.lowlevel.current_task() await trio.testing.wait_all_tasks_blocked() # now the child has run and is blocked on lock.acquire(), we # still have the lock assert lock.locked() assert lock._owner is trio.lowlevel.current_task() lock.release() try: # The child has a prior claim, so we can't have it lock.acquire_nowait() except trio.WouldBlock: assert lock._owner is not trio.lowlevel.current_task() print("PASS") else: print("FAIL") """ locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: return await GLOBAL_RUN_CONTEXT.runner.wait_all_tasks_blocked(cushion) except AttributeError: raise RuntimeError("must be called from async context") # fmt: on