Skip to content

Commit

Permalink
[coro_http_client]fix websocket compress (#846)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Dec 10, 2024
1 parent ef73fe5 commit e10d8c7
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 12 deletions.
1 change: 1 addition & 0 deletions .bazeliskrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
USE_BAZEL_VERSION=6.4.0
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ build:windows --cxxopt="/std:c++20"
# This workaround is needed to prevent Bazel from compiling the same file twice (once PIC and once not).
build:linux --force_pic
build:macos --force_pic
build:clang-cl --compiler=clang-cl
build:msvc-cl --compiler=msvc-cl
# `LC_ALL` and `LANG` is needed for cpp worker tests, because they will call "ray start".
# If we don't add them, python's `click` library will raise an error.
Expand Down Expand Up @@ -49,4 +48,5 @@ build:clang-cl --host_copt="-Wno-microsoft-unqualified-friend"
build:msvc-cl --per_file_copt="external/boost/libs/regex/src/wc_regex_traits\\.cpp@-wd4244"
build --http_timeout_scaling=5.0
build --verbose_failures
build --incompatible_enable_cc_toolchain_resolution

19 changes: 12 additions & 7 deletions include/ylt/standalone/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
websocket ws, opcode op,
resp_data &data,
bool eof = true) {
auto header = ws.encode_frame(msg, op, eof, true);
auto header = ws.encode_frame(msg, op, eof, enable_ws_deflate_);
std::vector<asio::const_buffer> buffers{
asio::buffer(header), asio::buffer(msg.data(), msg.size())};

Expand Down Expand Up @@ -448,18 +448,22 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
span = {source.data(), source.size()};
#ifdef CINATRA_ENABLE_GZIP
std::string dest_buf;
gzip_compress({source.data(), source.size()}, dest_buf, span, data);
if (enable_ws_deflate_) {
gzip_compress({source.data(), source.size()}, dest_buf, span, data);
}
#endif
co_await write_ws_frame(span, ws, op, data);
co_await write_ws_frame(span, ws, op, data, true);
}
else {
while (true) {
auto result = co_await source();
span = {result.buf.data(), result.buf.size()};
#ifdef CINATRA_ENABLE_GZIP
std::string dest_buf;
gzip_compress({result.buf.data(), result.buf.size()}, dest_buf, span,
data);
if (enable_ws_deflate_) {
gzip_compress({result.buf.data(), result.buf.size()}, dest_buf, span,
data);
}
#endif
co_await write_ws_frame(span, ws, op, data, result.eof);

Expand Down Expand Up @@ -613,7 +617,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
: self(self) {
self->socket_->is_timeout_ = false;

if (self->enable_timeout_) {
if (self->enable_timeout_ && duration.count() >= 0) {
self->timeout(self->timer_, duration, std::move(msg))
.start([](auto &&) {
});
Expand Down Expand Up @@ -1968,6 +1972,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

async_simple::coro::Lazy<resp_data> connect(const uri_t &u) {
if (socket_->has_closed_) {
socket_->is_timeout_ = false;
host_ = proxy_host_.empty() ? u.get_host() : proxy_host_;
port_ = proxy_port_.empty() ? u.get_port() : proxy_port_;
if (auto ec = co_await coro_io::async_connect(
Expand Down Expand Up @@ -2397,8 +2402,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
bool should_reset_ = false;
config config_;

#ifdef CINATRA_ENABLE_GZIP
bool enable_ws_deflate_ = false;
#ifdef CINATRA_ENABLE_GZIP
bool is_server_support_ws_deflate_ = false;
std::string inflate_str_;
#endif
Expand Down
2 changes: 1 addition & 1 deletion include/ylt/standalone/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ class coro_http_connection
std::string dest_buf;
if (is_client_ws_compressed_ && msg.size() > 0) {
if (!cinatra::gzip_codec::deflate(msg, dest_buf)) {
CINATRA_LOG_ERROR << "compuress data error, data: " << msg;
CINATRA_LOG_ERROR << "compress data error, data: " << msg;
co_return std::make_error_code(std::errc::protocol_error);
}

Expand Down
1 change: 1 addition & 0 deletions include/ylt/standalone/cinatra/uri.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class uri_t {
case '$':
case '&':
case '\'':
case '|':
case '(':
case ')':
case '*':
Expand Down
6 changes: 3 additions & 3 deletions include/ylt/standalone/cinatra/websocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ class websocket {
bool is_client = true) {
frame_header hdr{};
hdr.fin = eof;
hdr.rsv1 = 0;
hdr.rsv2 = 0;
if (need_compression)
hdr.rsv2 = 1;
hdr.rsv1 = 1;
else
hdr.rsv2 = 0;
hdr.rsv1 = 0;
hdr.rsv3 = 0;
hdr.opcode = static_cast<uint8_t>(op);
hdr.mask = is_client;
Expand Down
36 changes: 36 additions & 0 deletions src/coro_http/tests/test_cinatra.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,42 @@ TEST_CASE("test for gzip") {
CHECK(result.resp_body == "hello world");
}

{
coro_http_client client{};
client.add_header("Content-Encoding", "none");
client.set_conn_timeout(0ms);
std::string uri = "http://127.0.0.1:8090/none";
auto result = async_simple::coro::syncAwait(client.connect(uri));
if (result.net_err)
CHECK(result.net_err == std::errc::timed_out);

client.set_conn_timeout(-1ms);
client.set_req_timeout(0ms);
result = async_simple::coro::syncAwait(client.connect(uri));
if (result.net_err)
CHECK(!result.net_err);

result = async_simple::coro::syncAwait(client.async_get("/none"));
if (result.net_err)
CHECK(result.net_err == std::errc::timed_out);

client.add_header("Content-Encoding", "none");
client.set_req_timeout(-1ms);
result = async_simple::coro::syncAwait(client.async_get(uri));
CHECK(!result.net_err);
client.add_header("Content-Encoding", "none");
result = async_simple::coro::syncAwait(client.async_get(uri));
CHECK(!result.net_err);

client.add_header("Content-Encoding", "none");
coro_http_client::config conf{};
conf.req_timeout_duration = 0ms;
client.init_config(conf);
result = async_simple::coro::syncAwait(client.async_get(uri));
if (result.net_err)
CHECK(result.net_err == std::errc::timed_out);
}

{
coro_http_client client{};
std::string uri = "http://127.0.0.1:8090/none";
Expand Down

0 comments on commit e10d8c7

Please sign in to comment.