import attr import pytest from ... import _core, _abc from .tutil import check_sequence_matches @attr.s(eq=False, hash=False) class TaskRecorder: record = attr.ib(factory=list) def before_run(self): self.record.append(("before_run",)) def task_scheduled(self, task): self.record.append(("schedule", task)) def before_task_step(self, task): assert task is _core.current_task() self.record.append(("before", task)) def after_task_step(self, task): assert task is _core.current_task() self.record.append(("after", task)) def after_run(self): self.record.append(("after_run",)) def filter_tasks(self, tasks): for item in self.record: if item[0] in ("schedule", "before", "after") and item[1] in tasks: yield item if item[0] in ("before_run", "after_run"): yield item def test_instruments(recwarn): r1 = TaskRecorder() r2 = TaskRecorder() r3 = TaskRecorder() task = None # We use a child task for this, because the main task does some extra # bookkeeping stuff that can leak into the instrument results, and we # don't want to deal with it. async def task_fn(): nonlocal task task = _core.current_task() for _ in range(4): await _core.checkpoint() # replace r2 with r3, to test that we can manipulate them as we go _core.remove_instrument(r2) with pytest.raises(KeyError): _core.remove_instrument(r2) # add is idempotent _core.add_instrument(r3) _core.add_instrument(r3) for _ in range(1): await _core.checkpoint() async def main(): async with _core.open_nursery() as nursery: nursery.start_soon(task_fn) _core.run(main, instruments=[r1, r2]) # It sleeps 5 times, so it runs 6 times. Note that checkpoint() # reschedules the task immediately upon yielding, before the # after_task_step event fires. expected = ( [("before_run",), ("schedule", task)] + [("before", task), ("schedule", task), ("after", task)] * 5 + [("before", task), ("after", task), ("after_run",)] ) assert r1.record == r2.record + r3.record assert list(r1.filter_tasks([task])) == expected def test_instruments_interleave(): tasks = {} async def two_step1(): tasks["t1"] = _core.current_task() await _core.checkpoint() async def two_step2(): tasks["t2"] = _core.current_task() await _core.checkpoint() async def main(): async with _core.open_nursery() as nursery: nursery.start_soon(two_step1) nursery.start_soon(two_step2) r = TaskRecorder() _core.run(main, instruments=[r]) expected = [ ("before_run",), ("schedule", tasks["t1"]), ("schedule", tasks["t2"]), { ("before", tasks["t1"]), ("schedule", tasks["t1"]), ("after", tasks["t1"]), ("before", tasks["t2"]), ("schedule", tasks["t2"]), ("after", tasks["t2"]), }, { ("before", tasks["t1"]), ("after", tasks["t1"]), ("before", tasks["t2"]), ("after", tasks["t2"]), }, ("after_run",), ] print(list(r.filter_tasks(tasks.values()))) check_sequence_matches(list(r.filter_tasks(tasks.values())), expected) def test_null_instrument(): # undefined instrument methods are skipped class NullInstrument: def something_unrelated(self): pass # pragma: no cover async def main(): await _core.checkpoint() _core.run(main, instruments=[NullInstrument()]) def test_instrument_before_after_run(): record = [] class BeforeAfterRun: def before_run(self): record.append("before_run") def after_run(self): record.append("after_run") async def main(): pass _core.run(main, instruments=[BeforeAfterRun()]) assert record == ["before_run", "after_run"] def test_instrument_task_spawn_exit(): record = [] class SpawnExitRecorder: def task_spawned(self, task): record.append(("spawned", task)) def task_exited(self, task): record.append(("exited", task)) async def main(): return _core.current_task() main_task = _core.run(main, instruments=[SpawnExitRecorder()]) assert ("spawned", main_task) in record assert ("exited", main_task) in record # This test also tests having a crash before the initial task is even spawned, # which is very difficult to handle. def test_instruments_crash(caplog): record = [] class BrokenInstrument: def task_scheduled(self, task): record.append("scheduled") raise ValueError("oops") def close(self): # Shouldn't be called -- tests that the instrument disabling logic # works right. record.append("closed") # pragma: no cover async def main(): record.append("main ran") return _core.current_task() r = TaskRecorder() main_task = _core.run(main, instruments=[r, BrokenInstrument()]) assert record == ["scheduled", "main ran"] # the TaskRecorder kept going throughout, even though the BrokenInstrument # was disabled assert ("after", main_task) in r.record assert ("after_run",) in r.record # And we got a log message exc_type, exc_value, exc_traceback = caplog.records[0].exc_info assert exc_type is ValueError assert str(exc_value) == "oops" assert "Instrument has been disabled" in caplog.records[0].message def test_instruments_monkeypatch(): class NullInstrument(_abc.Instrument): pass instrument = NullInstrument() async def main(): record = [] # Changing the set of hooks implemented by an instrument after # it's installed doesn't make them start being called right away instrument.before_task_step = record.append await _core.checkpoint() await _core.checkpoint() assert len(record) == 0 # But if we remove and re-add the instrument, the new hooks are # picked up _core.remove_instrument(instrument) _core.add_instrument(instrument) await _core.checkpoint() await _core.checkpoint() assert record.count(_core.current_task()) == 2 _core.remove_instrument(instrument) await _core.checkpoint() await _core.checkpoint() assert record.count(_core.current_task()) == 2 _core.run(main, instruments=[instrument]) def test_instrument_that_raises_on_getattr(): class EvilInstrument: def task_exited(self, task): assert False # pragma: no cover @property def after_run(self): raise ValueError("oops") async def main(): with pytest.raises(ValueError): _core.add_instrument(EvilInstrument()) # Make sure the instrument is fully removed from the per-method lists runner = _core.current_task()._runner assert "after_run" not in runner.instruments assert "task_exited" not in runner.instruments _core.run(main)