From 71cbe2e4f990cef7c146429ef1842353b28e9b51 Mon Sep 17 00:00:00 2001 From: qicosmos Date: Tue, 10 Dec 2024 13:31:14 +0800 Subject: [PATCH] [coro_rpc_client][fix]Add request timeout confg (#844) --- include/ylt/coro_rpc/impl/coro_rpc_client.hpp | 95 +++++++++---------- src/coro_rpc/tests/test_coro_rpc_client.cpp | 76 +++++++++++++-- 2 files changed, 116 insertions(+), 55 deletions(-) diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index e84b09f6c..955e59d2e 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -161,14 +161,14 @@ class coro_rpc_client { "client has been closed"}; struct config { uint64_t client_id = get_global_client_id(); - std::chrono::milliseconds timeout_duration = - std::chrono::milliseconds{30000}; - std::string host; - std::string port; + std::optional connect_timeout_duration; + std::optional request_timeout_duration; + std::string host{}; + std::string port{}; bool enable_tcp_no_delay = true; #ifdef YLT_ENABLE_SSL - std::filesystem::path ssl_cert_path; - std::string ssl_domain; + std::filesystem::path ssl_cert_path{}; + std::string ssl_domain{}; #endif }; @@ -203,6 +203,8 @@ class coro_rpc_client { const config &get_config() const { return config_; } + config &get_config() { return config_; } + [[nodiscard]] bool init_config(const config &conf) { config_ = conf; #ifdef YLT_ENABLE_SSL @@ -228,12 +230,12 @@ class coro_rpc_client { * * @param host server address * @param port server port - * @param timeout_duration RPC call timeout + * @param connect_timeout_duration RPC call timeout seconds * @return error code */ [[nodiscard]] async_simple::coro::Lazy connect( std::string host, std::string port, - std::chrono::steady_clock::duration timeout_duration = + std::chrono::steady_clock::duration connect_timeout_duration = std::chrono::seconds(30)) { auto lock_ok = connect_mutex_.tryLock(); if (!lock_ok) { @@ -241,44 +243,32 @@ class coro_rpc_client { co_return err_code{}; // do nothing, someone has reconnect the client } - config_.host = std::move(host); - config_.port = std::move(port); - config_.timeout_duration = - std::chrono::duration_cast(timeout_duration); + + if (config_.host.empty()) { + config_.host = std::move(host); + } + if (config_.port.empty()) { + config_.port = std::move(port); + } + if (!config_.connect_timeout_duration) { + config_.connect_timeout_duration = + std::chrono::duration_cast( + connect_timeout_duration); + } + auto ret = co_await connect_impl(); connect_mutex_.unlock(); co_return std::move(ret); } [[nodiscard]] async_simple::coro::Lazy connect( std::string_view endpoint, - std::chrono::steady_clock::duration timeout_duration = + std::chrono::steady_clock::duration connect_timeout_duration = std::chrono::seconds(30)) { auto pos = endpoint.find(':'); - auto lock_ok = connect_mutex_.tryLock(); - if (!lock_ok) { - co_await connect_mutex_.coScopedLock(); - co_return err_code{}; - // do nothing, someone has reconnect the client - } - config_.host = endpoint.substr(0, pos); - config_.port = endpoint.substr(pos + 1); - config_.timeout_duration = - std::chrono::duration_cast(timeout_duration); - auto ret = co_await connect_impl(); - connect_mutex_.unlock(); - co_return std::move(ret); - } + std::string host(endpoint.substr(0, pos)); + std::string port(endpoint.substr(pos + 1)); - [[nodiscard]] async_simple::coro::Lazy connect() { - auto lock_ok = connect_mutex_.tryLock(); - if (!lock_ok) { - co_await connect_mutex_.coScopedLock(); - co_return err_code{}; - // do nothing, someone has reconnect the client - } - auto ret = co_await connect_impl(); - connect_mutex_.unlock(); - co_return std::move(ret); + return connect(std::move(host), std::move(port), connect_timeout_duration); } #ifdef YLT_ENABLE_SSL @@ -323,11 +313,12 @@ class coro_rpc_client { */ template async_simple::coro::Lazy())>> - call_for(auto duration, Args &&...args) { + call_for(auto request_timeout_duration, Args &&...args) { using return_type = decltype(get_return_type()); auto async_result = co_await co_await send_request_for_with_attachment( - duration, req_attachment_, std::forward(args)...); + request_timeout_duration, req_attachment_, + std::forward(args)...); req_attachment_ = {}; if (async_result) { control_->resp_buffer_ = async_result->release_buffer(); @@ -410,9 +401,12 @@ class coro_rpc_client { ELOGV(INFO, "client_id %d begin to connect %s", config_.client_id, config_.port.data()); - timeout(*this->timer_, config_.timeout_duration, "connect timer canceled") - .start([](auto &&) { - }); + auto conn_timeout_dur = *config_.connect_timeout_duration; + if (conn_timeout_dur.count() >= 0) { + timeout(*this->timer_, conn_timeout_dur, "connect timer canceled") + .start([](auto &&) { + }); + } std::error_code ec = co_await coro_io::async_connect( &control_->executor_, control_->socket_, config_.host, config_.port); @@ -747,7 +741,7 @@ class coro_rpc_client { private: template async_simple::coro::Lazy send_request_for_impl( - auto duration, uint32_t &id, coro_io::period_timer &timer, + auto request_timeout_duration, uint32_t &id, coro_io::period_timer &timer, std::string_view attachment, Args &&...args) { using R = decltype(get_return_type()); @@ -766,9 +760,10 @@ class coro_rpc_client { static_check(); - if (duration.count() > 0) { - timeout(timer, duration, "rpc call timer canceled").start([](auto &&) { - }); + if (request_timeout_duration.count() >= 0) { + timeout(timer, request_timeout_duration, "rpc call timer canceled") + .start([](auto &&) { + }); } #ifdef YLT_ENABLE_SSL @@ -965,16 +960,20 @@ class coro_rpc_client { template async_simple::coro::Lazy())>>> - send_request_for_with_attachment(auto time_out_duration, + send_request_for_with_attachment(auto request_timeout_duration, std::string_view request_attachment, Args &&...args) { using rpc_return_t = decltype(get_return_type()); recving_guard guard(control_.get()); uint32_t id; + if (!config_.request_timeout_duration) { + config_.request_timeout_duration = request_timeout_duration; + } + auto timer = std::make_unique( control_->executor_.get_asio_executor()); auto result = co_await send_request_for_impl( - time_out_duration, id, *timer, request_attachment, + *config_.request_timeout_duration, id, *timer, request_attachment, std::forward(args)...); auto &control = *control_; if (!result) { diff --git a/src/coro_rpc/tests/test_coro_rpc_client.cpp b/src/coro_rpc/tests/test_coro_rpc_client.cpp index 156dbbc5d..21412afed 100644 --- a/src/coro_rpc/tests/test_coro_rpc_client.cpp +++ b/src/coro_rpc/tests/test_coro_rpc_client.cpp @@ -480,14 +480,76 @@ TEST_CASE("testing client with shutdown") { g_action = {}; } TEST_CASE("testing client timeout") { - // SUBCASE("connect, 0ms timeout") { - // coro_rpc_client client; - // auto ret = client.connect("127.0.0.1", "8801", 0ms); - // auto val = syncAwait(ret); + coro_rpc_server server(2, 8801); + server.register_handler(); + auto res = server.async_start(); + CHECK_MESSAGE(!res.hasResult(), "server start timeout"); - // CHECK_MESSAGE(val == std::errc::timed_out, - // make_error_code(val).message()); - // } + SUBCASE("connect, 0ms connect timeout") { + coro_rpc_client client; + coro_rpc_client::config config{}; + config.connect_timeout_duration = 0ms; + bool r = client.init_config(config); + CHECK(r); + auto ret = client.connect( + "127.0.0.1", "8801", + 1000ms); // this arg won't update config connect timeout duration. + auto val = syncAwait(ret); + + if (val) { + CHECK_MESSAGE(val == coro_rpc::errc::timed_out, val.message()); + } + } + SUBCASE("connect, -1ms never timeout") { + coro_rpc_client client; + coro_rpc_client::config config{}; + config.connect_timeout_duration = -1ms; // less than 0, no timeout + // checking. + bool r = client.init_config(config); + CHECK(r); + auto ret = client.connect( + "127.0.0.1", "8801", + 1000ms); // this arg won't update config connect timeout duration. + auto val = syncAwait(ret); + + CHECK(!val); + } + + SUBCASE("connect, 0ms request timeout") { + coro_rpc_client client; + coro_rpc_client::config config{}; + config.connect_timeout_duration = 1000ms; + config.request_timeout_duration = 0ms; + bool r = client.init_config(config); + CHECK(r); + auto ret = client.connect( + "127.0.0.1", "8801", + 0ms); // 0ms won't cover config connect timeout duration. + auto val = syncAwait(ret); + + CHECK(!val); + auto result = syncAwait(client.call()); + + if (result.has_value()) { + std::cout << result.value() << std::endl; + } + else { + CHECK_MESSAGE(result.error().code == coro_rpc::errc::timed_out, + result.error().msg); + } + } + SUBCASE("connect, -1ms never request timeout") { + coro_rpc_client client; + client.get_config().request_timeout_duration = + -1ms; // less than 0, never timeout. + auto ret = client.connect("127.0.0.1", "8801"); + auto val = syncAwait(ret); + + CHECK(!val); + auto result = syncAwait(client.call()); + + CHECK(result.has_value()); + } SUBCASE("connect, ip timeout") { g_action = {}; // https://stackoverflow.com/questions/100841/artificially-create-a-connection-timeout-error