import pytest
|
|
import socket as stdlib_socket
|
import select
|
import random
|
import errno
|
from contextlib import suppress
|
|
from ... import _core
|
from ...testing import wait_all_tasks_blocked, Sequencer, assert_checkpoints
|
import trio
|
|
# Cross-platform tests for IO handling
|
|
|
def fill_socket(sock):
|
try:
|
while True:
|
sock.send(b"x" * 65536)
|
except BlockingIOError:
|
pass
|
|
|
def drain_socket(sock):
|
try:
|
while True:
|
sock.recv(65536)
|
except BlockingIOError:
|
pass
|
|
|
@pytest.fixture
|
def socketpair():
|
pair = stdlib_socket.socketpair()
|
for sock in pair:
|
sock.setblocking(False)
|
yield pair
|
for sock in pair:
|
sock.close()
|
|
|
def using_fileno(fn):
|
def fileno_wrapper(fileobj):
|
return fn(fileobj.fileno())
|
|
name = "<{} on fileno>".format(fn.__name__)
|
fileno_wrapper.__name__ = fileno_wrapper.__qualname__ = name
|
return fileno_wrapper
|
|
|
wait_readable_options = [trio.lowlevel.wait_readable]
|
wait_writable_options = [trio.lowlevel.wait_writable]
|
notify_closing_options = [trio.lowlevel.notify_closing]
|
|
for options_list in [
|
wait_readable_options,
|
wait_writable_options,
|
notify_closing_options,
|
]:
|
options_list += [using_fileno(f) for f in options_list]
|
|
# Decorators that feed in different settings for wait_readable / wait_writable
|
# / notify_closing.
|
# Note that if you use all three decorators on the same test, it will run all
|
# N**3 *combinations*
|
read_socket_test = pytest.mark.parametrize(
|
"wait_readable", wait_readable_options, ids=lambda fn: fn.__name__
|
)
|
write_socket_test = pytest.mark.parametrize(
|
"wait_writable", wait_writable_options, ids=lambda fn: fn.__name__
|
)
|
notify_closing_test = pytest.mark.parametrize(
|
"notify_closing", notify_closing_options, ids=lambda fn: fn.__name__
|
)
|
|
|
# XX These tests are all a bit dicey because they can't distinguish between
|
# wait_on_{read,writ}able blocking the way it should, versus blocking
|
# momentarily and then immediately resuming.
|
@read_socket_test
|
@write_socket_test
|
async def test_wait_basic(socketpair, wait_readable, wait_writable):
|
a, b = socketpair
|
|
# They start out writable()
|
with assert_checkpoints():
|
await wait_writable(a)
|
|
# But readable() blocks until data arrives
|
record = []
|
|
async def block_on_read():
|
try:
|
with assert_checkpoints():
|
await wait_readable(a)
|
except _core.Cancelled:
|
record.append("cancelled")
|
else:
|
record.append("readable")
|
assert a.recv(10) == b"x"
|
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(block_on_read)
|
await wait_all_tasks_blocked()
|
assert record == []
|
b.send(b"x")
|
|
fill_socket(a)
|
|
# Now writable will block, but readable won't
|
with assert_checkpoints():
|
await wait_readable(b)
|
record = []
|
|
async def block_on_write():
|
try:
|
with assert_checkpoints():
|
await wait_writable(a)
|
except _core.Cancelled:
|
record.append("cancelled")
|
else:
|
record.append("writable")
|
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(block_on_write)
|
await wait_all_tasks_blocked()
|
assert record == []
|
drain_socket(b)
|
|
# check cancellation
|
record = []
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(block_on_read)
|
await wait_all_tasks_blocked()
|
nursery.cancel_scope.cancel()
|
assert record == ["cancelled"]
|
|
fill_socket(a)
|
record = []
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(block_on_write)
|
await wait_all_tasks_blocked()
|
nursery.cancel_scope.cancel()
|
assert record == ["cancelled"]
|
|
|
@read_socket_test
|
async def test_double_read(socketpair, wait_readable):
|
a, b = socketpair
|
|
# You can't have two tasks trying to read from a socket at the same time
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(wait_readable, a)
|
await wait_all_tasks_blocked()
|
with pytest.raises(_core.BusyResourceError):
|
await wait_readable(a)
|
nursery.cancel_scope.cancel()
|
|
|
@write_socket_test
|
async def test_double_write(socketpair, wait_writable):
|
a, b = socketpair
|
|
# You can't have two tasks trying to write to a socket at the same time
|
fill_socket(a)
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(wait_writable, a)
|
await wait_all_tasks_blocked()
|
with pytest.raises(_core.BusyResourceError):
|
await wait_writable(a)
|
nursery.cancel_scope.cancel()
|
|
|
@read_socket_test
|
@write_socket_test
|
@notify_closing_test
|
async def test_interrupted_by_close(
|
socketpair, wait_readable, wait_writable, notify_closing
|
):
|
a, b = socketpair
|
|
async def reader():
|
with pytest.raises(_core.ClosedResourceError):
|
await wait_readable(a)
|
|
async def writer():
|
with pytest.raises(_core.ClosedResourceError):
|
await wait_writable(a)
|
|
fill_socket(a)
|
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(reader)
|
nursery.start_soon(writer)
|
await wait_all_tasks_blocked()
|
notify_closing(a)
|
|
|
@read_socket_test
|
@write_socket_test
|
async def test_socket_simultaneous_read_write(socketpair, wait_readable, wait_writable):
|
record = []
|
|
async def r_task(sock):
|
await wait_readable(sock)
|
record.append("r_task")
|
|
async def w_task(sock):
|
await wait_writable(sock)
|
record.append("w_task")
|
|
a, b = socketpair
|
fill_socket(a)
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(r_task, a)
|
nursery.start_soon(w_task, a)
|
await wait_all_tasks_blocked()
|
assert record == []
|
b.send(b"x")
|
await wait_all_tasks_blocked()
|
assert record == ["r_task"]
|
drain_socket(b)
|
await wait_all_tasks_blocked()
|
assert record == ["r_task", "w_task"]
|
|
|
@read_socket_test
|
@write_socket_test
|
async def test_socket_actual_streaming(socketpair, wait_readable, wait_writable):
|
a, b = socketpair
|
|
# Use a small send buffer on one of the sockets to increase the chance of
|
# getting partial writes
|
a.setsockopt(stdlib_socket.SOL_SOCKET, stdlib_socket.SO_SNDBUF, 10000)
|
|
N = 1000000 # 1 megabyte
|
MAX_CHUNK = 65536
|
|
results = {}
|
|
async def sender(sock, seed, key):
|
r = random.Random(seed)
|
sent = 0
|
while sent < N:
|
print("sent", sent)
|
chunk = bytearray(r.randrange(MAX_CHUNK))
|
while chunk:
|
with assert_checkpoints():
|
await wait_writable(sock)
|
this_chunk_size = sock.send(chunk)
|
sent += this_chunk_size
|
del chunk[:this_chunk_size]
|
sock.shutdown(stdlib_socket.SHUT_WR)
|
results[key] = sent
|
|
async def receiver(sock, key):
|
received = 0
|
while True:
|
print("received", received)
|
with assert_checkpoints():
|
await wait_readable(sock)
|
this_chunk_size = len(sock.recv(MAX_CHUNK))
|
if not this_chunk_size:
|
break
|
received += this_chunk_size
|
results[key] = received
|
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(sender, a, 0, "send_a")
|
nursery.start_soon(sender, b, 1, "send_b")
|
nursery.start_soon(receiver, a, "recv_a")
|
nursery.start_soon(receiver, b, "recv_b")
|
|
assert results["send_a"] == results["recv_b"]
|
assert results["send_b"] == results["recv_a"]
|
|
|
async def test_notify_closing_on_invalid_object():
|
# It should either be a no-op (generally on Unix, where we don't know
|
# which fds are valid), or an OSError (on Windows, where we currently only
|
# support sockets, so we have to do some validation to figure out whether
|
# it's a socket or a regular handle).
|
got_oserror = False
|
got_no_error = False
|
try:
|
trio.lowlevel.notify_closing(-1)
|
except OSError:
|
got_oserror = True
|
else:
|
got_no_error = True
|
assert got_oserror or got_no_error
|
|
|
async def test_wait_on_invalid_object():
|
# We definitely want to raise an error everywhere if you pass in an
|
# invalid fd to wait_*
|
for wait in [trio.lowlevel.wait_readable, trio.lowlevel.wait_writable]:
|
with stdlib_socket.socket() as s:
|
fileno = s.fileno()
|
# We just closed the socket and don't do anything else in between, so
|
# we can be confident that the fileno hasn't be reassigned.
|
with pytest.raises(OSError):
|
await wait(fileno)
|
|
|
async def test_io_manager_statistics():
|
def check(*, expected_readers, expected_writers):
|
statistics = _core.current_statistics()
|
print(statistics)
|
iostats = statistics.io_statistics
|
if iostats.backend in ["epoll", "windows"]:
|
assert iostats.tasks_waiting_read == expected_readers
|
assert iostats.tasks_waiting_write == expected_writers
|
else:
|
assert iostats.backend == "kqueue"
|
assert iostats.tasks_waiting == expected_readers + expected_writers
|
|
a1, b1 = stdlib_socket.socketpair()
|
a2, b2 = stdlib_socket.socketpair()
|
a3, b3 = stdlib_socket.socketpair()
|
for sock in [a1, b1, a2, b2, a3, b3]:
|
sock.setblocking(False)
|
with a1, b1, a2, b2, a3, b3:
|
# let the call_soon_task settle down
|
await wait_all_tasks_blocked()
|
|
# 1 for call_soon_task
|
check(expected_readers=1, expected_writers=0)
|
|
# We want:
|
# - one socket with a writer blocked
|
# - two sockets with a reader blocked
|
# - a socket with both blocked
|
fill_socket(a1)
|
fill_socket(a3)
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(_core.wait_writable, a1)
|
nursery.start_soon(_core.wait_readable, a2)
|
nursery.start_soon(_core.wait_readable, b2)
|
nursery.start_soon(_core.wait_writable, a3)
|
nursery.start_soon(_core.wait_readable, a3)
|
|
await wait_all_tasks_blocked()
|
|
# +1 for call_soon_task
|
check(expected_readers=3 + 1, expected_writers=2)
|
|
nursery.cancel_scope.cancel()
|
|
# 1 for call_soon_task
|
check(expected_readers=1, expected_writers=0)
|
|
|
async def test_can_survive_unnotified_close():
|
# An "unnotified" close is when the user closes an fd/socket/handle
|
# directly, without calling notify_closing first. This should never happen
|
# -- users should call notify_closing before closing things. But, just in
|
# case they don't, we would still like to avoid exploding.
|
#
|
# Acceptable behaviors:
|
# - wait_* never return, but can be cancelled cleanly
|
# - wait_* exit cleanly
|
# - wait_* raise an OSError
|
#
|
# Not acceptable:
|
# - getting stuck in an uncancellable state
|
# - TrioInternalError blowing up the whole run
|
#
|
# This test exercises some tricky "unnotified close" scenarios, to make
|
# sure we get the "acceptable" behaviors.
|
|
async def allow_OSError(async_func, *args):
|
with suppress(OSError):
|
await async_func(*args)
|
|
with stdlib_socket.socket() as s:
|
async with trio.open_nursery() as nursery:
|
nursery.start_soon(allow_OSError, trio.lowlevel.wait_readable, s)
|
await wait_all_tasks_blocked()
|
s.close()
|
await wait_all_tasks_blocked()
|
nursery.cancel_scope.cancel()
|
|
# We hit different paths on Windows depending on whether we close the last
|
# handle to the object (which produces a LOCAL_CLOSE notification and
|
# wakes up wait_readable), or only close one of the handles (which leaves
|
# wait_readable pending until cancelled).
|
with stdlib_socket.socket() as s, s.dup() as s2: # noqa: F841
|
async with trio.open_nursery() as nursery:
|
nursery.start_soon(allow_OSError, trio.lowlevel.wait_readable, s)
|
await wait_all_tasks_blocked()
|
s.close()
|
await wait_all_tasks_blocked()
|
nursery.cancel_scope.cancel()
|
|
# A more elaborate case, with two tasks waiting. On windows and epoll,
|
# the two tasks get muxed together onto a single underlying wait
|
# operation. So when they're cancelled, there's a brief moment where one
|
# of the tasks is cancelled but the other isn't, so we try to re-issue the
|
# underlying wait operation. But here, the handle we were going to use to
|
# do that has been pulled out from under our feet... so test that we can
|
# survive this.
|
a, b = stdlib_socket.socketpair()
|
with a, b, a.dup() as a2: # noqa: F841
|
a.setblocking(False)
|
b.setblocking(False)
|
fill_socket(a)
|
async with trio.open_nursery() as nursery:
|
nursery.start_soon(allow_OSError, trio.lowlevel.wait_readable, a)
|
nursery.start_soon(allow_OSError, trio.lowlevel.wait_writable, a)
|
await wait_all_tasks_blocked()
|
a.close()
|
nursery.cancel_scope.cancel()
|
|
# A similar case, but now the single-task-wakeup happens due to I/O
|
# arriving, not a cancellation, so the operation gets re-issued from
|
# handle_io context rather than abort context.
|
a, b = stdlib_socket.socketpair()
|
with a, b, a.dup() as a2: # noqa: F841
|
print("a={}, b={}, a2={}".format(a.fileno(), b.fileno(), a2.fileno()))
|
a.setblocking(False)
|
b.setblocking(False)
|
fill_socket(a)
|
e = trio.Event()
|
|
# We want to wait for the kernel to process the wakeup on 'a', if any.
|
# But depending on the platform, we might not get a wakeup on 'a'. So
|
# we put one task to sleep waiting on 'a', and we put a second task to
|
# sleep waiting on 'a2', with the idea that the 'a2' notification will
|
# definitely arrive, and when it does then we can assume that whatever
|
# notification was going to arrive for 'a' has also arrived.
|
async def wait_readable_a2_then_set():
|
await trio.lowlevel.wait_readable(a2)
|
e.set()
|
|
async with trio.open_nursery() as nursery:
|
nursery.start_soon(allow_OSError, trio.lowlevel.wait_readable, a)
|
nursery.start_soon(allow_OSError, trio.lowlevel.wait_writable, a)
|
nursery.start_soon(wait_readable_a2_then_set)
|
await wait_all_tasks_blocked()
|
a.close()
|
b.send(b"x")
|
# Make sure that the wakeup has been received and everything has
|
# settled before cancelling the wait_writable.
|
await e.wait()
|
await wait_all_tasks_blocked()
|
nursery.cancel_scope.cancel()
|