Skip to content

Commit

Permalink
Fix bug where last few bytes on socket go unread (#642)
Browse files Browse the repository at this point in the history
**Issue:**

aws-c-s3 is occasionally seeing errors when the server sends an HTTP response with a `Connection: close` header (meaning it intends to close the connection after the response is sent). The server is sending the full response, then immediately hanging up. But the last few bytes of the response never make it to the HTTP client.

**Diagnosis:**

If a peer closed their socket immediately after the last few bytes were sent, our socket code wasn't always reading those last few bytes.

On Linux, if a socket has unread data AND the peer has closed their side, the event from epoll has 2 flags set: `EPOLLIN|EPOLLRDHUP`. This means "the socket is has data to read AND the other side is closed (or half-closed) and won't be sending any more data".

Our [socket handler code](https://github.com/awslabs/aws-c-io/blob/e762fd250589dfa98a9cce9c3ffca3414d99fdda/source/socket_channel_handler.c#L217-L225) *kinda* did the right thing by "attempting" to read from the socket before shutting down the channel. But if the downstream read window reached 0, that "attempt" wouldn't read all the data.

**Description of changes:**

The socket handler no longer shuts down the channel in response to an error event. Instead, the error event queues more reads to happen. And only when `read()` reports that the socket is finished (due to error or EOF), will the socket handler shut down the channel.
  • Loading branch information
graebm authored Jun 4, 2024
1 parent e762fd2 commit 878b4fa
Show file tree
Hide file tree
Showing 7 changed files with 436 additions and 144 deletions.
30 changes: 17 additions & 13 deletions source/posix/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1668,6 +1668,23 @@ static void s_on_socket_io_event(
* subscribed is set to false. */
aws_ref_count_acquire(&socket_impl->internal_refcount);

/* NOTE: READABLE|WRITABLE|HANG_UP events might arrive simultaneously
* (e.g. peer sends last few bytes and immediately hangs up).
* Notify user of READABLE|WRITABLE events first, so they try to read any remaining bytes. */

if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) {
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd);
if (socket->readable_fn) {
socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data);
}
}
/* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not
* have been cleaned up, so this next branch is safe. */
if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) {
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd);
s_process_socket_write_requests(socket, NULL);
}

if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) {
aws_raise_error(AWS_IO_SOCKET_CLOSED);
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd);
Expand All @@ -1688,19 +1705,6 @@ static void s_on_socket_io_event(
goto end_check;
}

if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) {
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd);
if (socket->readable_fn) {
socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data);
}
}
/* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not
* have been cleaned up, so this next branch is safe. */
if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) {
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd);
s_process_socket_write_requests(socket, NULL);
}

end_check:
aws_ref_count_release(&socket_impl->internal_refcount);
}
Expand Down
61 changes: 40 additions & 21 deletions source/socket_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code
*/
static void s_do_read(struct socket_handler *socket_handler) {

if (socket_handler->shutdown_in_progress) {
return;
}

size_t downstream_window = aws_channel_slot_downstream_read_window(socket_handler->slot);
size_t max_to_read =
downstream_window > socket_handler->max_rw_size ? socket_handler->max_rw_size : downstream_window;
Expand All @@ -139,17 +143,20 @@ static void s_do_read(struct socket_handler *socket_handler) {

size_t total_read = 0;
size_t read = 0;
while (total_read < max_to_read && !socket_handler->shutdown_in_progress) {
int last_error = 0;
while (total_read < max_to_read) {
size_t iter_max_read = max_to_read - total_read;

struct aws_io_message *message = aws_channel_acquire_message_from_pool(
socket_handler->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, iter_max_read);

if (!message) {
last_error = aws_last_error();
break;
}

if (aws_socket_read(socket_handler->socket, &message->message_data, &read)) {
last_error = aws_last_error();
aws_mem_release(message->allocator, message);
break;
}
Expand All @@ -162,6 +169,7 @@ static void s_do_read(struct socket_handler *socket_handler) {
(unsigned long long)read);

if (aws_channel_slot_send_message(socket_handler->slot, message, AWS_CHANNEL_DIR_READ)) {
last_error = aws_last_error();
aws_mem_release(message->allocator, message);
break;
}
Expand All @@ -170,30 +178,29 @@ static void s_do_read(struct socket_handler *socket_handler) {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
"id=%p: total read on this tick %llu",
(void *)&socket_handler->slot->handler,
(void *)socket_handler->slot->handler,
(unsigned long long)total_read);

socket_handler->stats.bytes_read += total_read;

/* resubscribe as long as there's no error, just return if we're in a would block scenario. */
if (total_read < max_to_read) {
int last_error = aws_last_error();
AWS_ASSERT(last_error != 0);

if (last_error != AWS_IO_READ_WOULD_BLOCK && !socket_handler->shutdown_in_progress) {
if (last_error != AWS_IO_READ_WOULD_BLOCK) {
aws_channel_shutdown(socket_handler->slot->channel, last_error);
} else {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
"id=%p: out of data to read on socket. "
"Waiting on event-loop notification.",
(void *)socket_handler->slot->handler);
}

AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
"id=%p: out of data to read on socket. "
"Waiting on event-loop notification.",
(void *)socket_handler->slot->handler);
return;
}
/* in this case, everything was fine, but there's still pending reads. We need to schedule a task to do the read
* again. */
if (!socket_handler->shutdown_in_progress && total_read == socket_handler->max_rw_size &&
!socket_handler->read_task_storage.task_fn) {
if (total_read == socket_handler->max_rw_size && !socket_handler->read_task_storage.task_fn) {

AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
Expand All @@ -212,17 +219,29 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code
(void)socket;

struct socket_handler *socket_handler = user_data;
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET_HANDLER, "id=%p: socket is now readable", (void *)socket_handler->slot->handler);

/* read regardless so we can pick up data that was sent prior to the close. For example, peer sends a TLS ALERT
* then immediately closes the socket. On some platforms, we'll never see the readable flag. So we want to make
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
"id=%p: socket on-readable with error code %d(%s)",
(void *)socket_handler->slot->handler,
error_code,
aws_error_name(error_code));

/* Regardless of error code call read() until it reports error or EOF,
* so we can pick up data that was sent prior to the close.
*
* For example, if peer closes the socket immediately after sending the last
* bytes of data, the READABLE and HANGUP events arrive simultaneously.
*
* Another example, peer sends a TLS ALERT then immediately closes the socket.
* On some platforms, we'll never see the readable flag. So we want to make
* sure we read the ALERT, otherwise, we'll end up telling the user that the channel shutdown because of a socket
* closure, when in reality it was a TLS error */
* closure, when in reality it was a TLS error
*
* It may take more than one read() to get all remaining data.
* Also, if the downstream read-window reaches 0, we need to patiently
* wait until the window opens before we can call read() again. */
(void)error_code;
s_do_read(socket_handler);

if (error_code && !socket_handler->shutdown_in_progress) {
aws_channel_shutdown(socket_handler->slot->channel, error_code);
}
}

/* Either the result of a context switch (for fairness in the event loop), or a window update. */
Expand Down
3 changes: 2 additions & 1 deletion source/windows/iocp/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ static int s_determine_socket_error(int error) {
case IO_STATUS_TIMEOUT:
return AWS_IO_SOCKET_TIMEOUT;
case IO_PIPE_BROKEN:
case ERROR_BROKEN_PIPE:
return AWS_IO_SOCKET_CLOSED;
case STATUS_INVALID_ADDRESS_COMPONENT:
case WSAEADDRNOTAVAIL:
Expand Down Expand Up @@ -2970,7 +2971,7 @@ static int s_tcp_read(struct aws_socket *socket, struct aws_byte_buf *buffer, si

AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: ReadFile() failed with error %d",
"id=%p handle=%p: recv() failed with error %d",
(void *)socket,
(void *)socket->io_handle.data.handle,
error);
Expand Down
22 changes: 22 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,28 @@ add_test_case(pem_sanitize_wrong_format_rejected)

add_test_case(socket_handler_echo_and_backpressure)
add_test_case(socket_handler_close)
# These tests fail on Windows due to some bug in our server code where, if the socket is closed
# immediately after data is written, that data does not flush cleanly to the client.
# I've lost days to this bug, and no one is using our Windows server funcionality,
# so disabling these tests on Windows and moving along for now.
# I tried the following:
# 1) Wrote 2 simple standalone Windows programs, server and client, using simple synchronous socket code.
# WORKED PERFECTLY. So it's not a fundamental issue with Windows.
# 2) Commented out server part of this failing test, and used the simple standalone server instead.
# WORKED PERFECTLY. So it's not a problem with our actual client code.
# 3) Copy/pasted the simple standlone server code into this test, and used that instead of our actual server code.
# WORKED PERFECTLY. So it's not a problem with the server and client sockets being in the same process.
# 4) Commented out the client part of this failing test, and used the simple standalone client instead.
# FAILED. The standalone client got WSAECONNRESET (Connection reset by peer) before receiving all the data.
# So it's something with our complicated non-blocking server code.
# The last interesting thing I noticed before giving up was: we call shutdown() immediately
# before calling closesocket() but shutdown() gets error WSAENOTCONN, even
# though, at that moment, the socket should be connected just fine.
if(NOT WIN32)
add_net_test_case(socket_handler_read_to_eof_after_peer_hangup)
add_net_test_case(socket_handler_ipv4_read_to_eof_after_peer_hangup)
add_net_test_case(socket_handler_ipv6_read_to_eof_after_peer_hangup)
endif()
add_test_case(socket_pinned_event_loop)
add_net_test_case(socket_pinned_event_loop_dns_failure)

Expand Down
50 changes: 34 additions & 16 deletions tests/read_write_test_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,42 +190,60 @@ struct rw_handler_write_task_args {
struct aws_channel_slot *slot;
struct aws_byte_buf *buffer;
struct aws_channel_task task;
aws_channel_on_message_write_completed_fn *on_completion;
void *user_data;
};

static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, enum aws_task_status task_status) {
(void)task;
(void)task_status;
struct rw_handler_write_task_args *write_task_args = arg;
static void s_rw_handler_write_now(
struct aws_channel_slot *slot,
struct aws_byte_buf *buffer,
aws_channel_on_message_write_completed_fn *on_completion,
void *user_data) {

struct aws_io_message *msg = aws_channel_acquire_message_from_pool(
write_task_args->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, write_task_args->buffer->len);
struct aws_io_message *msg =
aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, buffer->len);

msg->on_completion = on_completion;
msg->user_data = user_data;

struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(write_task_args->buffer);
aws_byte_buf_append(&msg->message_data, &write_buffer);
struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(buffer);
AWS_FATAL_ASSERT(aws_byte_buf_append(&msg->message_data, &write_buffer) == AWS_OP_SUCCESS);

aws_channel_slot_send_message(write_task_args->slot, msg, AWS_CHANNEL_DIR_WRITE);
AWS_FATAL_ASSERT(aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE) == AWS_OP_SUCCESS);
}

static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, enum aws_task_status task_status) {
(void)task;
(void)task_status;
struct rw_handler_write_task_args *write_task_args = arg;
s_rw_handler_write_now(
write_task_args->slot, write_task_args->buffer, write_task_args->on_completion, write_task_args->user_data);
aws_mem_release(write_task_args->handler->alloc, write_task_args);
}

void rw_handler_write(struct aws_channel_handler *handler, struct aws_channel_slot *slot, struct aws_byte_buf *buffer) {
rw_handler_write_with_callback(handler, slot, buffer, NULL /*on_completion*/, NULL /*user_data*/);
}

void rw_handler_write_with_callback(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
struct aws_byte_buf *buffer,
aws_channel_on_message_write_completed_fn *on_completion,
void *user_data) {

struct rw_test_handler_impl *handler_impl = handler->impl;

if (!handler_impl->event_loop_driven || aws_channel_thread_is_callers_thread(slot->channel)) {
struct aws_io_message *msg =
aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, buffer->len);

struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(buffer);
aws_byte_buf_append(&msg->message_data, &write_buffer);

aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE);
s_rw_handler_write_now(slot, buffer, on_completion, user_data);
} else {
struct rw_handler_write_task_args *write_task_args =
aws_mem_acquire(handler->alloc, sizeof(struct rw_handler_write_task_args));
write_task_args->handler = handler;
write_task_args->buffer = buffer;
write_task_args->slot = slot;
write_task_args->on_completion = on_completion;
write_task_args->user_data = user_data;
aws_channel_task_init(&write_task_args->task, s_rw_handler_write_task, write_task_args, "rw_handler_write");

aws_channel_schedule_task_now(slot->channel, &write_task_args->task);
Expand Down
7 changes: 7 additions & 0 deletions tests/read_write_test_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ void rw_handler_enable_wait_on_destroy(

void rw_handler_write(struct aws_channel_handler *handler, struct aws_channel_slot *slot, struct aws_byte_buf *buffer);

void rw_handler_write_with_callback(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
struct aws_byte_buf *buffer,
aws_channel_on_message_write_completed_fn *on_completion,
void *user_data);

void rw_handler_trigger_read(struct aws_channel_handler *handler, struct aws_channel_slot *slot);

bool rw_handler_shutdown_called(struct aws_channel_handler *handler);
Expand Down
Loading

0 comments on commit 878b4fa

Please sign in to comment.