Skip to content

Commit

Permalink
WebSocket permessage-deflate server implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
wishdev committed Oct 19, 2021
1 parent 46146c4 commit 9f2c3a8
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 18 deletions.
1 change: 1 addition & 0 deletions ext/iodine/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ typedef struct {
void (*on_close)(intptr_t uuid, void *udata);
/** Opaque user data. */
void *udata;
int deflate;
} websocket_settings_s;

/**
Expand Down
4 changes: 4 additions & 0 deletions ext/iodine/http1.c
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ static int http1_http2websocket_server(http_s *h, websocket_settings_s *args) {
stmp = fiobj_obj2cstr(tmp);
fiobj_str_resize(tmp,
fio_base64_encode(stmp.data, fio_sha1_result(&sha1), 20));

if (args->deflate) {
http_set_header(h, HTTP_HEADER_WS_SEC_EXTENSIONS, fiobj_dup(HTTP_HVALUE_WS_DEFLATE));
}
http_set_header(h, HTTP_HEADER_CONNECTION, fiobj_dup(HTTP_HVALUE_WS_UPGRADE));
http_set_header(h, HTTP_HEADER_UPGRADE, fiobj_dup(HTTP_HVALUE_WEBSOCKET));
http_set_header(h, HTTP_HEADER_WS_SEC_KEY, tmp);
Expand Down
6 changes: 6 additions & 0 deletions ext/iodine/http_internal.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ FIOBJ HTTP_HEADER_ORIGIN;
FIOBJ HTTP_HEADER_SET_COOKIE;
FIOBJ HTTP_HEADER_UPGRADE;
FIOBJ HTTP_HEADER_WS_SEC_CLIENT_KEY;
FIOBJ HTTP_HEADER_WS_SEC_EXTENSIONS;
FIOBJ HTTP_HEADER_WS_SEC_KEY;
FIOBJ HTTP_HVALUE_BYTES;
FIOBJ HTTP_HVALUE_CLOSE;
Expand All @@ -136,6 +137,7 @@ FIOBJ HTTP_HVALUE_MAX_AGE;
FIOBJ HTTP_HVALUE_NO_CACHE;
FIOBJ HTTP_HVALUE_SSE_MIME;
FIOBJ HTTP_HVALUE_WEBSOCKET;
FIOBJ HTTP_HVALUE_WS_DEFLATE;
FIOBJ HTTP_HVALUE_WS_SEC_VERSION;
FIOBJ HTTP_HVALUE_WS_UPGRADE;
FIOBJ HTTP_HVALUE_WS_VERSION;
Expand Down Expand Up @@ -172,6 +174,7 @@ static void http_lib_cleanup(void *ignr_) {
HTTPLIB_RESET(HTTP_HEADER_SET_COOKIE);
HTTPLIB_RESET(HTTP_HEADER_UPGRADE);
HTTPLIB_RESET(HTTP_HEADER_WS_SEC_CLIENT_KEY);
HTTPLIB_RESET(HTTP_HEADER_WS_SEC_EXTENSIONS);
HTTPLIB_RESET(HTTP_HEADER_WS_SEC_KEY);
HTTPLIB_RESET(HTTP_HVALUE_BYTES);
HTTPLIB_RESET(HTTP_HVALUE_CLOSE);
Expand All @@ -182,6 +185,7 @@ static void http_lib_cleanup(void *ignr_) {
HTTPLIB_RESET(HTTP_HVALUE_NO_CACHE);
HTTPLIB_RESET(HTTP_HVALUE_SSE_MIME);
HTTPLIB_RESET(HTTP_HVALUE_WEBSOCKET);
HTTPLIB_RESET(HTTP_HVALUE_WS_DEFLATE);
HTTPLIB_RESET(HTTP_HVALUE_WS_SEC_VERSION);
HTTPLIB_RESET(HTTP_HVALUE_WS_UPGRADE);
HTTPLIB_RESET(HTTP_HVALUE_WS_VERSION);
Expand Down Expand Up @@ -213,6 +217,7 @@ static void http_lib_init(void *ignr_) {
HTTP_HEADER_SET_COOKIE = fiobj_str_new("set-cookie", 10);
HTTP_HEADER_UPGRADE = fiobj_str_new("upgrade", 7);
HTTP_HEADER_WS_SEC_CLIENT_KEY = fiobj_str_new("sec-websocket-key", 17);
HTTP_HEADER_WS_SEC_EXTENSIONS = fiobj_str_new("sec-websocket-extensions", 24);
HTTP_HEADER_WS_SEC_KEY = fiobj_str_new("sec-websocket-accept", 20);
HTTP_HVALUE_BYTES = fiobj_str_new("bytes", 5);
HTTP_HVALUE_CLOSE = fiobj_str_new("close", 5);
Expand All @@ -224,6 +229,7 @@ static void http_lib_init(void *ignr_) {
HTTP_HVALUE_NO_CACHE = fiobj_str_new("no-cache, max-age=0", 19);
HTTP_HVALUE_SSE_MIME = fiobj_str_new("text/event-stream", 17);
HTTP_HVALUE_WEBSOCKET = fiobj_str_new("websocket", 9);
HTTP_HVALUE_WS_DEFLATE = fiobj_str_new("permessage-deflate", 18);
HTTP_HVALUE_WS_SEC_VERSION = fiobj_str_new("sec-websocket-version", 21);
HTTP_HVALUE_WS_UPGRADE = fiobj_str_new("Upgrade", 7);
HTTP_HVALUE_WS_VERSION = fiobj_str_new("13", 2);
Expand Down
2 changes: 2 additions & 0 deletions ext/iodine/http_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Constants that shouldn't be accessed by the users (`fiobj_dup` required).

extern FIOBJ HTTP_HEADER_ACCEPT_RANGES;
extern FIOBJ HTTP_HEADER_WS_SEC_CLIENT_KEY;
extern FIOBJ HTTP_HEADER_WS_SEC_EXTENSIONS;
extern FIOBJ HTTP_HEADER_WS_SEC_KEY;
extern FIOBJ HTTP_HVALUE_BYTES;
extern FIOBJ HTTP_HVALUE_CLOSE;
Expand All @@ -82,6 +83,7 @@ extern FIOBJ HTTP_HVALUE_MAX_AGE;
extern FIOBJ HTTP_HVALUE_NO_CACHE;
extern FIOBJ HTTP_HVALUE_SSE_MIME;
extern FIOBJ HTTP_HVALUE_WEBSOCKET;
extern FIOBJ HTTP_HVALUE_WS_DEFLATE;
extern FIOBJ HTTP_HVALUE_WS_SEC_VERSION;
extern FIOBJ HTTP_HVALUE_WS_UPGRADE;
extern FIOBJ HTTP_HVALUE_WS_VERSION;
Expand Down
36 changes: 31 additions & 5 deletions ext/iodine/iodine_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,27 @@ Ruby Connection Methods - write, close open? pending
* Use {pending} to test how many `write` operations are pending completion
* (`on_drained(client)` will be called when they complete).
*/
static VALUE iodine_connection_write(VALUE self, VALUE data) {
static VALUE iodine_connection_write(int argc, VALUE* argv, VALUE self) {
VALUE data, opts;

static ID keyword_ids[1];
VALUE kwargs[1];
VALUE deflate = Qnil;

ws_s *socket;
int rsv = 0;

if (!keyword_ids[0]) {
CONST_ID(keyword_ids[0], "deflate");
}

rb_scan_args(argc, argv, "1:", &data, &opts);

if (!NIL_P(opts)) {
rb_get_kwargs(opts, keyword_ids, 0, 1, kwargs);
if (kwargs[0] != Qundef) { deflate = kwargs[0]; }
}

iodine_connection_data_s *c = iodine_connection_validate_data(self);
if (!c || fio_is_closed(c->info.uuid)) {
// don't throw exceptions - closed connections are unavoidable.
Expand All @@ -187,8 +207,14 @@ static VALUE iodine_connection_write(VALUE self, VALUE data) {
switch (c->info.type) {
case IODINE_CONNECTION_WEBSOCKET:
/* WebSockets*/
websocket_write(c->info.arg, IODINE_RSTRINFO(data),
rb_enc_get(data) == IodineUTF8Encoding);
socket = c->info.arg;

if (websocket_has_deflator(socket) && deflate) {
rsv = 4;
}

websocket_write(socket, IODINE_RSTRINFO(data),
rb_enc_get(data) == IodineUTF8Encoding, rsv);
return Qtrue;
break;
case IODINE_CONNECTION_SSE:
Expand Down Expand Up @@ -437,7 +463,7 @@ static void iodine_on_pubsub(fio_msg_s *msg) {
fiobj_send_free(data->info.uuid, fiobj_dup(s));
} else {
fwrite("-", 1, 1, stderr);
websocket_write(data->info.arg, msg->msg, (block == Qnil));
websocket_write(data->info.arg, msg->msg, (block == Qnil), 0);
}
return;
}
Expand Down Expand Up @@ -902,7 +928,7 @@ void iodine_connection_init(void) {
// define the Connection Class and it's methods
ConnectionKlass = rb_define_class_under(IodineModule, "Connection", rb_cObject);
rb_define_alloc_func(ConnectionKlass, iodine_connection_data_alloc_c);
rb_define_method(ConnectionKlass, "write", iodine_connection_write, 1);
rb_define_method(ConnectionKlass, "write", iodine_connection_write, -1);
rb_define_method(ConnectionKlass, "close", iodine_connection_close, 0);
rb_define_method(ConnectionKlass, "open?", iodine_connection_is_open, 0);
rb_define_method(ConnectionKlass, "pending", iodine_connection_pending, 0);
Expand Down
20 changes: 19 additions & 1 deletion ext/iodine/iodine_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ VALUE IODINE_R_HIJACK;
VALUE IODINE_R_HIJACK_IO;
VALUE IODINE_R_HIJACK_CB;

static VALUE RACK_WS_EXTENSIONS;
static VALUE RACK_UPGRADE;
static VALUE RACK_UPGRADE_DEFLATE;
static VALUE RACK_UPGRADE_Q;
static VALUE RACK_UPGRADE_SSE;
static VALUE RACK_UPGRADE_WEBSOCKET;
Expand Down Expand Up @@ -199,10 +201,24 @@ static void iodine_ws_attach(http_s *h, VALUE handler, VALUE env) {
if (io == Qnil)
return;

int deflate = 0;

// check if permessage-deflate allowed
// must have header from client for extensions
// must have the permessage-deflate extension requested
// must have server authorize deflation
VALUE extension_header = rb_hash_aref(env, RACK_WS_EXTENSIONS);
char *extensions = (extension_header == Qnil ? NULL : StringValueCStr(extension_header));
if (extensions != NULL && strcasestr(extensions, "permessage-deflate") != NULL &&
rb_hash_aref(env, RACK_UPGRADE_DEFLATE) == Qtrue) {
deflate = 1;
}

http_upgrade2ws(h, .on_message = iodine_ws_on_message,
.on_open = iodine_ws_on_open, .on_ready = iodine_ws_on_ready,
.on_shutdown = iodine_ws_on_shutdown,
.on_close = iodine_ws_on_close, .udata = (void *)io);
.on_close = iodine_ws_on_close, .udata = (void *)io,
.deflate = deflate);
}

/* *****************************************************************************
Expand Down Expand Up @@ -1120,7 +1136,9 @@ void iodine_init_http(void) {
rack_set(IODINE_R_HIJACK, "rack.hijack");
rack_set(IODINE_R_HIJACK_CB, "iodine.hijack_cb");

rack_set(RACK_WS_EXTENSIONS, "HTTP_SEC_WEBSOCKET_EXTENSIONS");
rack_set(RACK_UPGRADE, "rack.upgrade");
rack_set(RACK_UPGRADE_DEFLATE, "rack.upgrade.deflate");
rack_set(RACK_UPGRADE_Q, "rack.upgrade?");
rack_set_sym(RACK_UPGRADE_SSE, "sse");
rack_set_sym(RACK_UPGRADE_WEBSOCKET, "websocket");
Expand Down
94 changes: 94 additions & 0 deletions ext/iodine/websocket_deflate.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
copyright: Boaz Segev, 2017-2019
license: MIT
Feel free to copy, use and enjoy according to the license specified.
*/
#ifndef H_WEBSOCKET_DEFLATE_H
/**\file
A single file WebSocket permessage-deflate wrapper
*/
#define H_WEBSOCKET_DEFLATE_H
#include <fiobj.h>
#include <stdlib.h>
#include <zlib.h>

#define WS_CHUNK 16384

z_stream *new_z_stream() {
z_stream *strm = malloc(sizeof(*strm));

*strm = (z_stream){
.zalloc = Z_NULL,
.zfree = Z_NULL,
.opaque = Z_NULL,
};

return strm;
}

z_stream *new_deflator() {
z_stream *strm = new_z_stream();

int ret = deflateInit2(strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
-MAX_WBITS, 4, Z_DEFAULT_STRATEGY);

return strm;
}

z_stream *new_inflator() {
z_stream *strm = new_z_stream();

inflateInit2(strm, -MAX_WBITS);

return strm;
}

int deflate_message(fio_str_info_s src, FIOBJ dest, z_stream *strm) {
int ret, flush;
unsigned have;
unsigned char out[WS_CHUNK];

strm->avail_in = src.len;
strm->next_in = src.data;

do {
strm->avail_out = WS_CHUNK;
strm->next_out = out;
ret = deflate(strm, Z_SYNC_FLUSH);
have = WS_CHUNK - strm->avail_out;
fiobj_str_write(dest, out, have);
} while (strm->avail_out == 0);

return Z_OK;
}

int inflate_message(fio_str_info_s src, FIOBJ dest, z_stream *strm) {
int ret;
unsigned have;
unsigned char out[WS_CHUNK];

strm->avail_in = src.len;
strm->next_in = src.data;

do {
strm->avail_out = WS_CHUNK;
strm->next_out = out;
ret = inflate(strm, Z_SYNC_FLUSH);
switch (ret) {
case Z_NEED_DICT:
ret = Z_DATA_ERROR;
case Z_DATA_ERROR:
case Z_MEM_ERROR:
(void)inflateEnd(strm);
return ret;
}
have = WS_CHUNK - strm->avail_out;
fiobj_str_write(dest, out, have);
} while (strm->avail_out == 0);

return ret;
}
#endif
Loading

0 comments on commit 9f2c3a8

Please sign in to comment.