import pytest
|
|
from ... import _core
|
from ...testing import wait_all_tasks_blocked
|
from .._parking_lot import ParkingLot
|
from .tutil import check_sequence_matches
|
|
|
async def test_parking_lot_basic():
|
record = []
|
|
async def waiter(i, lot):
|
record.append("sleep {}".format(i))
|
await lot.park()
|
record.append("wake {}".format(i))
|
|
async with _core.open_nursery() as nursery:
|
lot = ParkingLot()
|
assert not lot
|
assert len(lot) == 0
|
assert lot.statistics().tasks_waiting == 0
|
for i in range(3):
|
nursery.start_soon(waiter, i, lot)
|
await wait_all_tasks_blocked()
|
assert len(record) == 3
|
assert bool(lot)
|
assert len(lot) == 3
|
assert lot.statistics().tasks_waiting == 3
|
lot.unpark_all()
|
assert lot.statistics().tasks_waiting == 0
|
await wait_all_tasks_blocked()
|
assert len(record) == 6
|
|
check_sequence_matches(
|
record, [{"sleep 0", "sleep 1", "sleep 2"}, {"wake 0", "wake 1", "wake 2"}]
|
)
|
|
async with _core.open_nursery() as nursery:
|
record = []
|
for i in range(3):
|
nursery.start_soon(waiter, i, lot)
|
await wait_all_tasks_blocked()
|
assert len(record) == 3
|
for i in range(3):
|
lot.unpark()
|
await wait_all_tasks_blocked()
|
# 1-by-1 wakeups are strict FIFO
|
assert record == [
|
"sleep 0",
|
"sleep 1",
|
"sleep 2",
|
"wake 0",
|
"wake 1",
|
"wake 2",
|
]
|
|
# It's legal (but a no-op) to try and unpark while there's nothing parked
|
lot.unpark()
|
lot.unpark(count=1)
|
lot.unpark(count=100)
|
|
# Check unpark with count
|
async with _core.open_nursery() as nursery:
|
record = []
|
for i in range(3):
|
nursery.start_soon(waiter, i, lot)
|
await wait_all_tasks_blocked()
|
lot.unpark(count=2)
|
await wait_all_tasks_blocked()
|
check_sequence_matches(
|
record, ["sleep 0", "sleep 1", "sleep 2", {"wake 0", "wake 1"}]
|
)
|
lot.unpark_all()
|
|
|
async def cancellable_waiter(name, lot, scopes, record):
|
with _core.CancelScope() as scope:
|
scopes[name] = scope
|
record.append("sleep {}".format(name))
|
try:
|
await lot.park()
|
except _core.Cancelled:
|
record.append("cancelled {}".format(name))
|
else:
|
record.append("wake {}".format(name))
|
|
|
async def test_parking_lot_cancel():
|
record = []
|
scopes = {}
|
|
async with _core.open_nursery() as nursery:
|
lot = ParkingLot()
|
nursery.start_soon(cancellable_waiter, 1, lot, scopes, record)
|
await wait_all_tasks_blocked()
|
nursery.start_soon(cancellable_waiter, 2, lot, scopes, record)
|
await wait_all_tasks_blocked()
|
nursery.start_soon(cancellable_waiter, 3, lot, scopes, record)
|
await wait_all_tasks_blocked()
|
assert len(record) == 3
|
|
scopes[2].cancel()
|
await wait_all_tasks_blocked()
|
assert len(record) == 4
|
lot.unpark_all()
|
await wait_all_tasks_blocked()
|
assert len(record) == 6
|
|
check_sequence_matches(
|
record, ["sleep 1", "sleep 2", "sleep 3", "cancelled 2", {"wake 1", "wake 3"}]
|
)
|
|
|
async def test_parking_lot_repark():
|
record = []
|
scopes = {}
|
lot1 = ParkingLot()
|
lot2 = ParkingLot()
|
|
with pytest.raises(TypeError):
|
lot1.repark([])
|
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(cancellable_waiter, 1, lot1, scopes, record)
|
await wait_all_tasks_blocked()
|
nursery.start_soon(cancellable_waiter, 2, lot1, scopes, record)
|
await wait_all_tasks_blocked()
|
nursery.start_soon(cancellable_waiter, 3, lot1, scopes, record)
|
await wait_all_tasks_blocked()
|
assert len(record) == 3
|
|
assert len(lot1) == 3
|
lot1.repark(lot2)
|
assert len(lot1) == 2
|
assert len(lot2) == 1
|
lot2.unpark_all()
|
await wait_all_tasks_blocked()
|
assert len(record) == 4
|
assert record == ["sleep 1", "sleep 2", "sleep 3", "wake 1"]
|
|
lot1.repark_all(lot2)
|
assert len(lot1) == 0
|
assert len(lot2) == 2
|
|
scopes[2].cancel()
|
await wait_all_tasks_blocked()
|
assert len(lot2) == 1
|
assert record == [
|
"sleep 1",
|
"sleep 2",
|
"sleep 3",
|
"wake 1",
|
"cancelled 2",
|
]
|
|
lot2.unpark_all()
|
await wait_all_tasks_blocked()
|
assert record == [
|
"sleep 1",
|
"sleep 2",
|
"sleep 3",
|
"wake 1",
|
"cancelled 2",
|
"wake 3",
|
]
|
|
|
async def test_parking_lot_repark_with_count():
|
record = []
|
scopes = {}
|
lot1 = ParkingLot()
|
lot2 = ParkingLot()
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(cancellable_waiter, 1, lot1, scopes, record)
|
await wait_all_tasks_blocked()
|
nursery.start_soon(cancellable_waiter, 2, lot1, scopes, record)
|
await wait_all_tasks_blocked()
|
nursery.start_soon(cancellable_waiter, 3, lot1, scopes, record)
|
await wait_all_tasks_blocked()
|
assert len(record) == 3
|
|
assert len(lot1) == 3
|
assert len(lot2) == 0
|
lot1.repark(lot2, count=2)
|
assert len(lot1) == 1
|
assert len(lot2) == 2
|
while lot2:
|
lot2.unpark()
|
await wait_all_tasks_blocked()
|
assert record == [
|
"sleep 1",
|
"sleep 2",
|
"sleep 3",
|
"wake 1",
|
"wake 2",
|
]
|
lot1.unpark_all()
|