import time from math import inf from .. import _core from ._run import GLOBAL_RUN_CONTEXT from .._abc import Clock from .._util import Final ################################################################ # The glorious MockClock ################################################################ # Prior art: # https://twistedmatrix.com/documents/current/api/twisted.internet.task.Clock.html # https://github.com/ztellman/manifold/issues/57 class MockClock(Clock, metaclass=Final): """A user-controllable clock suitable for writing tests. Args: rate (float): the initial :attr:`rate`. autojump_threshold (float): the initial :attr:`autojump_threshold`. .. attribute:: rate How many seconds of clock time pass per second of real time. Default is 0.0, i.e. the clock only advances through manuals calls to :meth:`jump` or when the :attr:`autojump_threshold` is triggered. You can assign to this attribute to change it. .. attribute:: autojump_threshold The clock keeps an eye on the run loop, and if at any point it detects that all tasks have been blocked for this many real seconds (i.e., according to the actual clock, not this clock), then the clock automatically jumps ahead to the run loop's next scheduled timeout. Default is :data:`math.inf`, i.e., to never autojump. You can assign to this attribute to change it. Basically the idea is that if you have code or tests that use sleeps and timeouts, you can use this to make it run much faster, totally automatically. (At least, as long as those sleeps/timeouts are happening inside Trio; if your test involves talking to external service and waiting for it to timeout then obviously we can't help you there.) You should set this to the smallest value that lets you reliably avoid "false alarms" where some I/O is in flight (e.g. between two halves of a socketpair) but the threshold gets triggered and time gets advanced anyway. This will depend on the details of your tests and test environment. If you aren't doing any I/O (like in our sleeping example above) then just set it to zero, and the clock will jump whenever all tasks are blocked. .. note:: If you use ``autojump_threshold`` and `wait_all_tasks_blocked` at the same time, then you might wonder how they interact, since they both cause things to happen after the run loop goes idle for some time. The answer is: `wait_all_tasks_blocked` takes priority. If there's a task blocked in `wait_all_tasks_blocked`, then the autojump feature treats that as active task and does *not* jump the clock. """ def __init__(self, rate=0.0, autojump_threshold=inf): # when the real clock said 'real_base', the virtual time was # 'virtual_base', and since then it's advanced at 'rate' virtual # seconds per real second. self._real_base = 0.0 self._virtual_base = 0.0 self._rate = 0.0 self._autojump_threshold = 0.0 # kept as an attribute so that our tests can monkeypatch it self._real_clock = time.perf_counter # use the property update logic to set initial values self.rate = rate self.autojump_threshold = autojump_threshold def __repr__(self): return "".format( self.current_time(), self._rate, id(self) ) @property def rate(self): return self._rate @rate.setter def rate(self, new_rate): if new_rate < 0: raise ValueError("rate must be >= 0") else: real = self._real_clock() virtual = self._real_to_virtual(real) self._virtual_base = virtual self._real_base = real self._rate = float(new_rate) @property def autojump_threshold(self): return self._autojump_threshold @autojump_threshold.setter def autojump_threshold(self, new_autojump_threshold): self._autojump_threshold = float(new_autojump_threshold) self._try_resync_autojump_threshold() # runner.clock_autojump_threshold is an internal API that isn't easily # usable by custom third-party Clock objects. If you need access to this # functionality, let us know, and we'll figure out how to make a public # API. Discussion: # # https://github.com/python-trio/trio/issues/1587 def _try_resync_autojump_threshold(self): try: runner = GLOBAL_RUN_CONTEXT.runner if runner.is_guest: runner.force_guest_tick_asap() except AttributeError: pass else: runner.clock_autojump_threshold = self._autojump_threshold # Invoked by the run loop when runner.clock_autojump_threshold is # exceeded. def _autojump(self): statistics = _core.current_statistics() jump = statistics.seconds_to_next_deadline if 0 < jump < inf: self.jump(jump) def _real_to_virtual(self, real): real_offset = real - self._real_base virtual_offset = self._rate * real_offset return self._virtual_base + virtual_offset def start_clock(self): self._try_resync_autojump_threshold() def current_time(self): return self._real_to_virtual(self._real_clock()) def deadline_to_sleep_time(self, deadline): virtual_timeout = deadline - self.current_time() if virtual_timeout <= 0: return 0 elif self._rate > 0: return virtual_timeout / self._rate else: return 999999999 def jump(self, seconds): """Manually advance the clock by the given number of seconds. Args: seconds (float): the number of seconds to jump the clock forward. Raises: ValueError: if you try to pass a negative value for ``seconds``. """ if seconds < 0: raise ValueError("time can't go backwards") self._virtual_base += seconds