Skip to content

Commit

Permalink
Configure message buffer (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
mehaase committed Nov 16, 2018
1 parent d2ce97b commit 4520d2f
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 40 deletions.
38 changes: 38 additions & 0 deletions docs/backpressure.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
Message Queues
==============

.. currentmodule:: trio_websocket

.. TODO This file will grow into a "backpressure" document once #65 is complete.
For now it is just deals with userspace buffers, since this is a related
topic.
When a connection is open, it runs a background task that reads network data and
automatically handles certain types of events for you. For example, if the
background task receives a ping event, then it will automatically send back a
pong event. When the background task receives a message, it places that message
into an internal queue. When you call ``get_message()``, it returns the first
item from this queue.

If this internal message queue does not have any size limits, then a remote
endpoint could rapidly send large messages and use up all of the memory on the
local machine! In almost all situations, the message queue needs to have size
limits, both in terms of the number of items and the size per message. These
limits create an upper bound for the amount of memory that can be used by a
single WebSocket connection. For example, if the queue size is 10 and the
maximum message size is 1 megabyte, then the connection will use at most 10
megabytes of memory.

When the message queue is full, the background task pauses and waits for the
user to remove a message, i.e. call ``get_message()``. When the background task
is paused, it stops processing background events like replying to ping events.
If a message is received that is larger than the maximum message size, then the
connection is automatically closed with code 1009 and the message is discarded.

The library APIs each take arguments to configure the mesage buffer:
``message_queue_size`` and ``max_message_size``. By default the queue size is
one and the maximum message size is 1 MiB. If you set queue size to zero, then
the background task will block every time it receives a message until somebody
calls ``get_message()``. For an unbounded queue—which is strongly
discouraged—set the queue size to ``math.inf``. Likewise, the maximum message
size may also be disabled by setting it to ``math.inf``.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Autobahn Test Suite <https://github.com/crossbario/autobahn-testsuite>`__.
getting_started
clients
servers
backpressure
timeouts
api
recipes
Expand Down
4 changes: 3 additions & 1 deletion docs/servers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ host/port to bind to. The handler function receives a
:class:`WebSocketRequest` object, and it calls the request's
:func:`~WebSocketRequest.accept` method to finish the handshake and obtain a
:class:`WebSocketConnection` object. When the handler function exits, the
connection is automatically closed.
connection is automatically closed. If the handler function raises an
exception, the server will silently close the connection and cancel the
tasks belonging to it.

.. autofunction:: serve_websocket

Expand Down
75 changes: 56 additions & 19 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@

HOST = '127.0.0.1'
RESOURCE = '/resource'
DEFAULT_TEST_MAX_DURATION = 1

# Timeout tests follow a general pattern: one side waits TIMEOUT seconds for an
# event. The other side delays for FORCE_TIMEOUT seconds to force the timeout
# to trigger. Each test also has maximum runtime (measure by Trio's clock) to
# prevent a faulty test from hanging the entire suite.
TIMEOUT = 1
FORCE_TIMEOUT = 2
MAX_TIMEOUT_TEST_DURATION = 3
TIMEOUT_TEST_MAX_DURATION = 3


@pytest.fixture
Expand Down Expand Up @@ -89,12 +90,6 @@ async def echo_request_handler(request):
Accept incoming request and then pass off to echo connection handler.
'''
conn = await request.accept()
await echo_conn_handler(conn)


async def echo_conn_handler(conn):
''' A connection handler that reads one message, sends back the same
message, then exits. '''
try:
msg = await conn.get_message()
await conn.send_message(msg)
Expand Down Expand Up @@ -391,7 +386,7 @@ async def handler(stream):
await client.send_message('Hello from client!')


@fail_after(MAX_TIMEOUT_TEST_DURATION)
@fail_after(TIMEOUT_TEST_MAX_DURATION)
async def test_client_open_timeout(nursery, autojump_clock):
'''
The client times out waiting for the server to complete the opening
Expand All @@ -411,7 +406,7 @@ async def handler(request):
pass


@fail_after(MAX_TIMEOUT_TEST_DURATION)
@fail_after(TIMEOUT_TEST_MAX_DURATION)
async def test_client_close_timeout(nursery, autojump_clock):
'''
This client times out waiting for the server to complete the closing
Expand All @@ -430,15 +425,16 @@ async def handler(request):
pytest.fail('Should not reach this line.')

server = await nursery.start(
partial(serve_websocket, handler, HOST, 0, ssl_context=None))
partial(serve_websocket, handler, HOST, 0, ssl_context=None,
message_queue_size=0))

with pytest.raises(trio.TooSlowError):
async with open_websocket(HOST, server.port, RESOURCE, use_ssl=False,
disconnect_timeout=TIMEOUT) as client_ws:
await client_ws.send_message('test')


@fail_after(MAX_TIMEOUT_TEST_DURATION)
@fail_after(TIMEOUT_TEST_MAX_DURATION)
async def test_server_open_timeout(autojump_clock):
'''
The server times out waiting for the client to complete the opening
Expand Down Expand Up @@ -470,7 +466,7 @@ async def handler(request):
nursery.cancel_scope.cancel()


@fail_after(MAX_TIMEOUT_TEST_DURATION)
@fail_after(TIMEOUT_TEST_MAX_DURATION)
async def test_server_close_timeout(autojump_clock):
'''
The server times out waiting for the client to complete the closing
Expand All @@ -488,7 +484,7 @@ async def handler(request):
ws = await request.accept()
# Send one message to block the client's reader task:
await ws.send_message('test')
import logging

async with trio.open_nursery() as outer:
server = await outer.start(partial(serve_websocket, handler, HOST, 0,
ssl_context=None, handler_nursery=outer,
Expand Down Expand Up @@ -523,7 +519,6 @@ async def handler(request):
with pytest.raises(ConnectionClosed):
await server_ws.get_message()
server = await nursery.start(serve_websocket, handler, HOST, 0, None)
port = server.port
stream = await trio.open_tcp_stream(HOST, server.port)
client_ws = await wrap_client_stream(nursery, stream, HOST, RESOURCE)
async with client_ws:
Expand Down Expand Up @@ -566,12 +561,14 @@ async def handler(request):
assert exc.reason.name == 'NORMAL_CLOSURE'


@pytest.mark.skip(reason='Hangs because channel size is hard coded to 0')
@fail_after(DEFAULT_TEST_MAX_DURATION)
async def test_read_messages_after_remote_close(nursery):
'''
When the remote endpoint closes, the local endpoint can still read all
of the messages sent prior to closing. Any attempt to read beyond that will
raise ConnectionClosed.
This test also exercises the configuration of the queue size.
'''
server_closed = trio.Event()

Expand All @@ -585,7 +582,10 @@ async def handler(request):
server = await nursery.start(
partial(serve_websocket, handler, HOST, 0, ssl_context=None))

async with open_websocket(HOST, server.port, '/', use_ssl=False) as client:
# The client needs a message queue of size 2 so that it can buffer both
# incoming messages without blocking the reader task.
async with open_websocket(HOST, server.port, '/', use_ssl=False,
message_queue_size=2) as client:
await server_closed.wait()
assert await client.get_message() == '1'
assert await client.get_message() == '2'
Expand Down Expand Up @@ -618,12 +618,49 @@ async def handler(request):
client_closed.set()


async def test_client_cm_exit_with_pending_messages(echo_server, autojump_clock):
async def test_cm_exit_with_pending_messages(echo_server, autojump_clock):
'''
Regression test for #74, where a context manager was not able to exit when
there were pending messages in the receive queue.
'''
with trio.fail_after(1):
async with open_websocket(HOST, echo_server.port, RESOURCE,
use_ssl=False) as ws:
await ws.send_message('hello')
# allow time for the server to respond
await trio.sleep(.1)
# bug: context manager exit is blocked on unconsumed message
#await ws.get_message()


@fail_after(DEFAULT_TEST_MAX_DURATION)
async def test_max_message_size(nursery):
'''
Set the client's max message size to 100 bytes. The client can send a
message larger than 100 bytes, but when it receives a message larger than
100 bytes, it closes the connection with code 1009.
'''
async def handler(request):
''' Similar to the echo_request_handler fixture except it runs in a
loop. '''
conn = await request.accept()
while True:
try:
msg = await conn.get_message()
await conn.send_message(msg)
except ConnectionClosed:
break

server = await nursery.start(
partial(serve_websocket, handler, HOST, 0, ssl_context=None))

async with open_websocket(HOST, server.port, RESOURCE, use_ssl=False,
max_message_size=100) as client:
# We can send and receive 100 bytes:
await client.send_message(b'A' * 100)
msg = await client.get_message()
assert len(msg) == 100
# We can send 101 bytes but cannot receive 101 bytes:
await client.send_message(b'B' * 101)
with pytest.raises(ConnectionClosed):
await client.get_message()
assert client.closed
assert client.closed.code == 1009
Loading

0 comments on commit 4520d2f

Please sign in to comment.