import logging import types import attr from typing import Any, Callable, Dict, List, Sequence, Iterator, TypeVar from .._abc import Instrument # Used to log exceptions in instruments INSTRUMENT_LOGGER = logging.getLogger("trio.abc.Instrument") F = TypeVar("F", bound=Callable[..., Any]) # Decorator to mark methods public. This does nothing by itself, but # trio/_tools/gen_exports.py looks for it. def _public(fn: F) -> F: return fn class Instruments(Dict[str, Dict[Instrument, None]]): """A collection of `trio.abc.Instrument` organized by hook. Instrumentation calls are rather expensive, and we don't want a rarely-used instrument (like before_run()) to slow down hot operations (like before_task_step()). Thus, we cache the set of instruments to be called for each hook, and skip the instrumentation call if there's nothing currently installed for that hook. """ __slots__ = () def __init__(self, incoming: Sequence[Instrument]): self["_all"] = {} for instrument in incoming: self.add_instrument(instrument) @_public def add_instrument(self, instrument: Instrument) -> None: """Start instrumenting the current run loop with the given instrument. Args: instrument (trio.abc.Instrument): The instrument to activate. If ``instrument`` is already active, does nothing. """ if instrument in self["_all"]: return self["_all"][instrument] = None try: for name in dir(instrument): if name.startswith("_"): continue try: prototype = getattr(Instrument, name) except AttributeError: continue impl = getattr(instrument, name) if isinstance(impl, types.MethodType) and impl.__func__ is prototype: # Inherited unchanged from _abc.Instrument continue self.setdefault(name, {})[instrument] = None except: self.remove_instrument(instrument) raise @_public def remove_instrument(self, instrument: Instrument) -> None: """Stop instrumenting the current run loop with the given instrument. Args: instrument (trio.abc.Instrument): The instrument to de-activate. Raises: KeyError: if the instrument is not currently active. This could occur either because you never added it, or because you added it and then it raised an unhandled exception and was automatically deactivated. """ # If instrument isn't present, the KeyError propagates out self["_all"].pop(instrument) for hookname, instruments in list(self.items()): if instrument in instruments: del instruments[instrument] if not instruments: del self[hookname] def call(self, hookname: str, *args: Any) -> None: """Call hookname(*args) on each applicable instrument. You must first check whether there are any instruments installed for that hook, e.g.:: if "before_task_step" in instruments: instruments.call("before_task_step", task) """ for instrument in list(self[hookname]): try: getattr(instrument, hookname)(*args) except: self.remove_instrument(instrument) INSTRUMENT_LOGGER.exception( "Exception raised when calling %r on instrument %r. " "Instrument has been disabled.", hookname, instrument, )