From 9f2c3a80b5bb2c60c48624620b5b0d596375f6f6 Mon Sep 17 00:00:00 2001 From: John W Higgins Date: Sun, 17 Oct 2021 15:30:45 -0700 Subject: [PATCH] WebSocket permessage-deflate server implementation --- ext/iodine/http.h | 1 + ext/iodine/http1.c | 4 ++ ext/iodine/http_internal.c | 6 +++ ext/iodine/http_internal.h | 2 + ext/iodine/iodine_connection.c | 36 +++++++++++-- ext/iodine/iodine_http.c | 20 +++++++- ext/iodine/websocket_deflate.h | 94 ++++++++++++++++++++++++++++++++++ ext/iodine/websockets.c | 65 +++++++++++++++++++---- ext/iodine/websockets.h | 7 ++- 9 files changed, 217 insertions(+), 18 deletions(-) create mode 100644 ext/iodine/websocket_deflate.h diff --git a/ext/iodine/http.h b/ext/iodine/http.h index e13a8e20..fb92ffe1 100644 --- a/ext/iodine/http.h +++ b/ext/iodine/http.h @@ -543,6 +543,7 @@ typedef struct { void (*on_close)(intptr_t uuid, void *udata); /** Opaque user data. */ void *udata; + int deflate; } websocket_settings_s; /** diff --git a/ext/iodine/http1.c b/ext/iodine/http1.c index cbf4b6cd..a9045fea 100644 --- a/ext/iodine/http1.c +++ b/ext/iodine/http1.c @@ -349,6 +349,10 @@ static int http1_http2websocket_server(http_s *h, websocket_settings_s *args) { stmp = fiobj_obj2cstr(tmp); fiobj_str_resize(tmp, fio_base64_encode(stmp.data, fio_sha1_result(&sha1), 20)); + + if (args->deflate) { + http_set_header(h, HTTP_HEADER_WS_SEC_EXTENSIONS, fiobj_dup(HTTP_HVALUE_WS_DEFLATE)); + } http_set_header(h, HTTP_HEADER_CONNECTION, fiobj_dup(HTTP_HVALUE_WS_UPGRADE)); http_set_header(h, HTTP_HEADER_UPGRADE, fiobj_dup(HTTP_HVALUE_WEBSOCKET)); http_set_header(h, HTTP_HEADER_WS_SEC_KEY, tmp); diff --git a/ext/iodine/http_internal.c b/ext/iodine/http_internal.c index f3058d33..2b69b5bf 100644 --- a/ext/iodine/http_internal.c +++ b/ext/iodine/http_internal.c @@ -126,6 +126,7 @@ FIOBJ HTTP_HEADER_ORIGIN; FIOBJ HTTP_HEADER_SET_COOKIE; FIOBJ HTTP_HEADER_UPGRADE; FIOBJ HTTP_HEADER_WS_SEC_CLIENT_KEY; +FIOBJ HTTP_HEADER_WS_SEC_EXTENSIONS; FIOBJ HTTP_HEADER_WS_SEC_KEY; FIOBJ HTTP_HVALUE_BYTES; FIOBJ HTTP_HVALUE_CLOSE; @@ -136,6 +137,7 @@ FIOBJ HTTP_HVALUE_MAX_AGE; FIOBJ HTTP_HVALUE_NO_CACHE; FIOBJ HTTP_HVALUE_SSE_MIME; FIOBJ HTTP_HVALUE_WEBSOCKET; +FIOBJ HTTP_HVALUE_WS_DEFLATE; FIOBJ HTTP_HVALUE_WS_SEC_VERSION; FIOBJ HTTP_HVALUE_WS_UPGRADE; FIOBJ HTTP_HVALUE_WS_VERSION; @@ -172,6 +174,7 @@ static void http_lib_cleanup(void *ignr_) { HTTPLIB_RESET(HTTP_HEADER_SET_COOKIE); HTTPLIB_RESET(HTTP_HEADER_UPGRADE); HTTPLIB_RESET(HTTP_HEADER_WS_SEC_CLIENT_KEY); + HTTPLIB_RESET(HTTP_HEADER_WS_SEC_EXTENSIONS); HTTPLIB_RESET(HTTP_HEADER_WS_SEC_KEY); HTTPLIB_RESET(HTTP_HVALUE_BYTES); HTTPLIB_RESET(HTTP_HVALUE_CLOSE); @@ -182,6 +185,7 @@ static void http_lib_cleanup(void *ignr_) { HTTPLIB_RESET(HTTP_HVALUE_NO_CACHE); HTTPLIB_RESET(HTTP_HVALUE_SSE_MIME); HTTPLIB_RESET(HTTP_HVALUE_WEBSOCKET); + HTTPLIB_RESET(HTTP_HVALUE_WS_DEFLATE); HTTPLIB_RESET(HTTP_HVALUE_WS_SEC_VERSION); HTTPLIB_RESET(HTTP_HVALUE_WS_UPGRADE); HTTPLIB_RESET(HTTP_HVALUE_WS_VERSION); @@ -213,6 +217,7 @@ static void http_lib_init(void *ignr_) { HTTP_HEADER_SET_COOKIE = fiobj_str_new("set-cookie", 10); HTTP_HEADER_UPGRADE = fiobj_str_new("upgrade", 7); HTTP_HEADER_WS_SEC_CLIENT_KEY = fiobj_str_new("sec-websocket-key", 17); + HTTP_HEADER_WS_SEC_EXTENSIONS = fiobj_str_new("sec-websocket-extensions", 24); HTTP_HEADER_WS_SEC_KEY = fiobj_str_new("sec-websocket-accept", 20); HTTP_HVALUE_BYTES = fiobj_str_new("bytes", 5); HTTP_HVALUE_CLOSE = fiobj_str_new("close", 5); @@ -224,6 +229,7 @@ static void http_lib_init(void *ignr_) { HTTP_HVALUE_NO_CACHE = fiobj_str_new("no-cache, max-age=0", 19); HTTP_HVALUE_SSE_MIME = fiobj_str_new("text/event-stream", 17); HTTP_HVALUE_WEBSOCKET = fiobj_str_new("websocket", 9); + HTTP_HVALUE_WS_DEFLATE = fiobj_str_new("permessage-deflate", 18); HTTP_HVALUE_WS_SEC_VERSION = fiobj_str_new("sec-websocket-version", 21); HTTP_HVALUE_WS_UPGRADE = fiobj_str_new("Upgrade", 7); HTTP_HVALUE_WS_VERSION = fiobj_str_new("13", 2); diff --git a/ext/iodine/http_internal.h b/ext/iodine/http_internal.h index 4002c022..8783bed9 100644 --- a/ext/iodine/http_internal.h +++ b/ext/iodine/http_internal.h @@ -72,6 +72,7 @@ Constants that shouldn't be accessed by the users (`fiobj_dup` required). extern FIOBJ HTTP_HEADER_ACCEPT_RANGES; extern FIOBJ HTTP_HEADER_WS_SEC_CLIENT_KEY; +extern FIOBJ HTTP_HEADER_WS_SEC_EXTENSIONS; extern FIOBJ HTTP_HEADER_WS_SEC_KEY; extern FIOBJ HTTP_HVALUE_BYTES; extern FIOBJ HTTP_HVALUE_CLOSE; @@ -82,6 +83,7 @@ extern FIOBJ HTTP_HVALUE_MAX_AGE; extern FIOBJ HTTP_HVALUE_NO_CACHE; extern FIOBJ HTTP_HVALUE_SSE_MIME; extern FIOBJ HTTP_HVALUE_WEBSOCKET; +extern FIOBJ HTTP_HVALUE_WS_DEFLATE; extern FIOBJ HTTP_HVALUE_WS_SEC_VERSION; extern FIOBJ HTTP_HVALUE_WS_UPGRADE; extern FIOBJ HTTP_HVALUE_WS_VERSION; diff --git a/ext/iodine/iodine_connection.c b/ext/iodine/iodine_connection.c index 2b82cb34..0eb3aa7b 100644 --- a/ext/iodine/iodine_connection.c +++ b/ext/iodine/iodine_connection.c @@ -167,7 +167,27 @@ Ruby Connection Methods - write, close open? pending * Use {pending} to test how many `write` operations are pending completion * (`on_drained(client)` will be called when they complete). */ -static VALUE iodine_connection_write(VALUE self, VALUE data) { +static VALUE iodine_connection_write(int argc, VALUE* argv, VALUE self) { + VALUE data, opts; + + static ID keyword_ids[1]; + VALUE kwargs[1]; + VALUE deflate = Qnil; + + ws_s *socket; + int rsv = 0; + + if (!keyword_ids[0]) { + CONST_ID(keyword_ids[0], "deflate"); + } + + rb_scan_args(argc, argv, "1:", &data, &opts); + + if (!NIL_P(opts)) { + rb_get_kwargs(opts, keyword_ids, 0, 1, kwargs); + if (kwargs[0] != Qundef) { deflate = kwargs[0]; } + } + iodine_connection_data_s *c = iodine_connection_validate_data(self); if (!c || fio_is_closed(c->info.uuid)) { // don't throw exceptions - closed connections are unavoidable. @@ -187,8 +207,14 @@ static VALUE iodine_connection_write(VALUE self, VALUE data) { switch (c->info.type) { case IODINE_CONNECTION_WEBSOCKET: /* WebSockets*/ - websocket_write(c->info.arg, IODINE_RSTRINFO(data), - rb_enc_get(data) == IodineUTF8Encoding); + socket = c->info.arg; + + if (websocket_has_deflator(socket) && deflate) { + rsv = 4; + } + + websocket_write(socket, IODINE_RSTRINFO(data), + rb_enc_get(data) == IodineUTF8Encoding, rsv); return Qtrue; break; case IODINE_CONNECTION_SSE: @@ -437,7 +463,7 @@ static void iodine_on_pubsub(fio_msg_s *msg) { fiobj_send_free(data->info.uuid, fiobj_dup(s)); } else { fwrite("-", 1, 1, stderr); - websocket_write(data->info.arg, msg->msg, (block == Qnil)); + websocket_write(data->info.arg, msg->msg, (block == Qnil), 0); } return; } @@ -902,7 +928,7 @@ void iodine_connection_init(void) { // define the Connection Class and it's methods ConnectionKlass = rb_define_class_under(IodineModule, "Connection", rb_cObject); rb_define_alloc_func(ConnectionKlass, iodine_connection_data_alloc_c); - rb_define_method(ConnectionKlass, "write", iodine_connection_write, 1); + rb_define_method(ConnectionKlass, "write", iodine_connection_write, -1); rb_define_method(ConnectionKlass, "close", iodine_connection_close, 0); rb_define_method(ConnectionKlass, "open?", iodine_connection_is_open, 0); rb_define_method(ConnectionKlass, "pending", iodine_connection_pending, 0); diff --git a/ext/iodine/iodine_http.c b/ext/iodine/iodine_http.c index 70bbd9d1..d498e693 100644 --- a/ext/iodine/iodine_http.c +++ b/ext/iodine/iodine_http.c @@ -34,7 +34,9 @@ VALUE IODINE_R_HIJACK; VALUE IODINE_R_HIJACK_IO; VALUE IODINE_R_HIJACK_CB; +static VALUE RACK_WS_EXTENSIONS; static VALUE RACK_UPGRADE; +static VALUE RACK_UPGRADE_DEFLATE; static VALUE RACK_UPGRADE_Q; static VALUE RACK_UPGRADE_SSE; static VALUE RACK_UPGRADE_WEBSOCKET; @@ -199,10 +201,24 @@ static void iodine_ws_attach(http_s *h, VALUE handler, VALUE env) { if (io == Qnil) return; + int deflate = 0; + + // check if permessage-deflate allowed + // must have header from client for extensions + // must have the permessage-deflate extension requested + // must have server authorize deflation + VALUE extension_header = rb_hash_aref(env, RACK_WS_EXTENSIONS); + char *extensions = (extension_header == Qnil ? NULL : StringValueCStr(extension_header)); + if (extensions != NULL && strcasestr(extensions, "permessage-deflate") != NULL && + rb_hash_aref(env, RACK_UPGRADE_DEFLATE) == Qtrue) { + deflate = 1; + } + http_upgrade2ws(h, .on_message = iodine_ws_on_message, .on_open = iodine_ws_on_open, .on_ready = iodine_ws_on_ready, .on_shutdown = iodine_ws_on_shutdown, - .on_close = iodine_ws_on_close, .udata = (void *)io); + .on_close = iodine_ws_on_close, .udata = (void *)io, + .deflate = deflate); } /* ***************************************************************************** @@ -1120,7 +1136,9 @@ void iodine_init_http(void) { rack_set(IODINE_R_HIJACK, "rack.hijack"); rack_set(IODINE_R_HIJACK_CB, "iodine.hijack_cb"); + rack_set(RACK_WS_EXTENSIONS, "HTTP_SEC_WEBSOCKET_EXTENSIONS"); rack_set(RACK_UPGRADE, "rack.upgrade"); + rack_set(RACK_UPGRADE_DEFLATE, "rack.upgrade.deflate"); rack_set(RACK_UPGRADE_Q, "rack.upgrade?"); rack_set_sym(RACK_UPGRADE_SSE, "sse"); rack_set_sym(RACK_UPGRADE_WEBSOCKET, "websocket"); diff --git a/ext/iodine/websocket_deflate.h b/ext/iodine/websocket_deflate.h new file mode 100644 index 00000000..fce6b443 --- /dev/null +++ b/ext/iodine/websocket_deflate.h @@ -0,0 +1,94 @@ +/* + copyright: Boaz Segev, 2017-2019 + license: MIT + + Feel free to copy, use and enjoy according to the license specified. +*/ +#ifndef H_WEBSOCKET_DEFLATE_H +/**\file + + A single file WebSocket permessage-deflate wrapper + +*/ +#define H_WEBSOCKET_DEFLATE_H +#include +#include +#include + +#define WS_CHUNK 16384 + +z_stream *new_z_stream() { + z_stream *strm = malloc(sizeof(*strm)); + + *strm = (z_stream){ + .zalloc = Z_NULL, + .zfree = Z_NULL, + .opaque = Z_NULL, + }; + + return strm; +} + +z_stream *new_deflator() { + z_stream *strm = new_z_stream(); + + int ret = deflateInit2(strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, + -MAX_WBITS, 4, Z_DEFAULT_STRATEGY); + + return strm; +} + +z_stream *new_inflator() { + z_stream *strm = new_z_stream(); + + inflateInit2(strm, -MAX_WBITS); + + return strm; +} + +int deflate_message(fio_str_info_s src, FIOBJ dest, z_stream *strm) { + int ret, flush; + unsigned have; + unsigned char out[WS_CHUNK]; + + strm->avail_in = src.len; + strm->next_in = src.data; + + do { + strm->avail_out = WS_CHUNK; + strm->next_out = out; + ret = deflate(strm, Z_SYNC_FLUSH); + have = WS_CHUNK - strm->avail_out; + fiobj_str_write(dest, out, have); + } while (strm->avail_out == 0); + + return Z_OK; +} + +int inflate_message(fio_str_info_s src, FIOBJ dest, z_stream *strm) { + int ret; + unsigned have; + unsigned char out[WS_CHUNK]; + + strm->avail_in = src.len; + strm->next_in = src.data; + + do { + strm->avail_out = WS_CHUNK; + strm->next_out = out; + ret = inflate(strm, Z_SYNC_FLUSH); + switch (ret) { + case Z_NEED_DICT: + ret = Z_DATA_ERROR; + case Z_DATA_ERROR: + case Z_MEM_ERROR: + (void)inflateEnd(strm); + return ret; + } + have = WS_CHUNK - strm->avail_out; + fiobj_str_write(dest, out, have); + } while (strm->avail_out == 0); + + return ret; +} +#endif diff --git a/ext/iodine/websockets.c b/ext/iodine/websockets.c index 87353eff..c261c4af 100644 --- a/ext/iodine/websockets.c +++ b/ext/iodine/websockets.c @@ -26,6 +26,8 @@ Feel free to copy, use and enjoy according to the license provided. #include +#include + #if !defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__) #include #if !defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__) && \ @@ -130,6 +132,9 @@ struct ws_s { uint8_t is_text; /** websocket connection type. */ uint8_t is_client; + + z_stream *inflator; + z_stream *deflator; }; /* ***************************************************************************** @@ -152,7 +157,10 @@ static void websocket_on_unwrapped(void *ws_p, void *msg, uint64_t len, char first, char last, char text, unsigned char rsv) { ws_s *ws = ws_p; - if (last && first) { + static char ws_payload_tail[] = {0x00, 0x00, 0xFF, 0xFF}; + + // Shortcut only if not deflated + if (last && first && !(rsv & 4)) { ws->on_message(ws, (fio_str_info_s){.data = msg, .len = len}, (uint8_t)text); return; @@ -165,7 +173,20 @@ static void websocket_on_unwrapped(void *ws_p, void *msg, uint64_t len, } fiobj_str_write(ws->msg, msg, len); if (last) { - ws->on_message(ws, fiobj_obj2cstr(ws->msg), ws->is_text); + if (rsv & 4) { + fiobj_str_write(ws->msg, ws_payload_tail, 4); + fio_str_info_s deflated = fiobj_obj2cstr(ws->msg); + FIOBJ inflated = fiobj_str_buf(deflated.len); + + if (ws->inflator == NULL) { + ws->inflator = new_inflator(); + } + int ret = inflate_message(deflated, inflated, ws->inflator); + + ws->on_message(ws, fiobj_obj2cstr(inflated), ws->is_text); + } else { + ws->on_message(ws, fiobj_obj2cstr(ws->msg), ws->is_text); + } } (void)rsv; @@ -296,7 +317,7 @@ static void on_data_first(intptr_t sockfd, fio_protocol_s *ws_) { /* later */ static void websocket_write_impl(intptr_t fd, void *data, size_t len, char text, - char first, char last, char client); + char first, char last, char client, char rsv); /******************************************************************************* Create/Destroy the websocket object @@ -305,6 +326,7 @@ Create/Destroy the websocket object static ws_s *new_websocket(intptr_t uuid) { // allocate the protocol object ws_s *ws = malloc(sizeof(*ws)); + *ws = (ws_s){ .protocol.ping = ws_ping, .protocol.on_data = on_data_first, @@ -314,6 +336,8 @@ static ws_s *new_websocket(intptr_t uuid) { .subscriptions = FIO_LS_INIT(ws->subscriptions), .is_client = 0, .fd = uuid, + .inflator = NULL, + .deflator = NULL, }; return ws; } @@ -330,6 +354,7 @@ static void destroy_ws(ws_s *ws) { void websocket_attach(intptr_t uuid, http_settings_s *http_settings, websocket_settings_s *args, void *data, size_t length) { ws_s *ws = new_websocket(uuid); + if (args->deflate) { ws->deflator = new_deflator(); } FIO_ASSERT_ALLOC(ws); // we have an active websocket connection - prep the connection buffer ws->buffer = create_ws_buffer(ws); @@ -380,24 +405,30 @@ Writing to the Websocket (FIO_MEMORY_BLOCK_ALLOC_LIMIT - 4096) // should be less then `unsigned short` static void websocket_write_impl(intptr_t fd, void *data, size_t len, char text, - char first, char last, char client) { + char first, char last, char client, char rsv) { if (len <= WS_MAX_FRAME_SIZE) { void *buff = fio_malloc(len + 16); len = (client ? websocket_client_wrap(buff, data, len, (text ? 1 : 2), - first, last, 0) + first, last, rsv) : websocket_server_wrap(buff, data, len, (text ? 1 : 2), - first, last, 0)); + first, last, rsv)); fio_write2(fd, .data.buffer = buff, .length = len, .after.dealloc = fio_free); } else { + int firstFrame = 1; /* frame fragmentation is better for large data then large frames */ while (len > WS_MAX_FRAME_SIZE) { - websocket_write_impl(fd, data, WS_MAX_FRAME_SIZE, text, first, 0, client); + websocket_write_impl(fd, data, WS_MAX_FRAME_SIZE, text, first, 0, client, rsv); + if (firstFrame) { + firstFrame = 0; + // should only set compressed flag on first frame + rsv &= 3; + } data = ((uint8_t *)data) + WS_MAX_FRAME_SIZE; first = 0; len -= WS_MAX_FRAME_SIZE; } - websocket_write_impl(fd, data, len, text, first, 1, client); + websocket_write_impl(fd, data, len, text, first, 1, client, rsv); } return; } @@ -571,7 +602,7 @@ static inline void websocket_on_pubsub_message_direct_internal(fio_msg_s *msg, FIO_STR_INIT_STATIC2(msg->msg.data, msg->msg.len); // don't free txt = (tmp.len >= (2 << 14) ? 0 : fio_str_utf8_valid(&tmp)); } - websocket_write((ws_s *)pr, msg->msg, txt & 1); + websocket_write((ws_s *)pr, msg->msg, txt & 1, 0); fiobj_free(message); finish: fio_protocol_unlock(pr, FIO_PR_LOCK_WRITE); @@ -713,11 +744,23 @@ void *websocket_udata_set(ws_s *ws, void *udata) { */ uint8_t websocket_is_client(ws_s *ws) { return ws->is_client; } +uint8_t websocket_has_deflator(ws_s *ws) { + return !(ws->deflator != NULL); +} + /** Writes data to the websocket. Returns -1 on failure (0 on success). */ -int websocket_write(ws_s *ws, fio_str_info_s msg, uint8_t is_text) { +int websocket_write(ws_s *ws, fio_str_info_s msg, uint8_t is_text, char rsv) { + + if (rsv & 4) { + FIOBJ deflated = fiobj_str_buf(msg.len); + deflate_message(msg, deflated, ws->deflator); + msg = fiobj_obj2cstr(deflated); + msg.len -= 4; + } + if (fio_is_valid(ws->fd)) { websocket_write_impl(ws->fd, msg.data, msg.len, is_text, 1, 1, - ws->is_client); + ws->is_client, rsv); return 0; } return -1; diff --git a/ext/iodine/websockets.h b/ext/iodine/websockets.h index 6a78fa65..5cfb5ece 100644 --- a/ext/iodine/websockets.h +++ b/ext/iodine/websockets.h @@ -47,12 +47,17 @@ intptr_t websocket_uuid(ws_s *ws); */ uint8_t websocket_is_client(ws_s *ws); +/** + * Returns 1 if the WebSocket has a deflator (i.e. permessage-deflate is set) + */ +uint8_t websocket_has_deflator(ws_s *ws); + /* ***************************************************************************** Websocket Connection Management (write / close) ***************************************************************************** */ /** Writes data to the websocket. Returns -1 on failure (0 on success). */ -int websocket_write(ws_s *ws, fio_str_info_s msg, uint8_t is_text); +int websocket_write(ws_s *ws, fio_str_info_s msg, uint8_t is_text, char rsv); /** Closes a websocket connection. */ void websocket_close(ws_s *ws);