Skip to content

Commit

Permalink
[coro_rpc_client][fix]Add request timeout confg (#844)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Dec 10, 2024
1 parent e10d8c7 commit 71cbe2e
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 55 deletions.
95 changes: 47 additions & 48 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,14 @@ class coro_rpc_client {
"client has been closed"};
struct config {
uint64_t client_id = get_global_client_id();
std::chrono::milliseconds timeout_duration =
std::chrono::milliseconds{30000};
std::string host;
std::string port;
std::optional<std::chrono::milliseconds> connect_timeout_duration;
std::optional<std::chrono::milliseconds> request_timeout_duration;
std::string host{};
std::string port{};
bool enable_tcp_no_delay = true;
#ifdef YLT_ENABLE_SSL
std::filesystem::path ssl_cert_path;
std::string ssl_domain;
std::filesystem::path ssl_cert_path{};
std::string ssl_domain{};
#endif
};

Expand Down Expand Up @@ -203,6 +203,8 @@ class coro_rpc_client {

const config &get_config() const { return config_; }

config &get_config() { return config_; }

[[nodiscard]] bool init_config(const config &conf) {
config_ = conf;
#ifdef YLT_ENABLE_SSL
Expand All @@ -228,57 +230,45 @@ class coro_rpc_client {
*
* @param host server address
* @param port server port
* @param timeout_duration RPC call timeout
* @param connect_timeout_duration RPC call timeout seconds
* @return error code
*/
[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string host, std::string port,
std::chrono::steady_clock::duration timeout_duration =
std::chrono::steady_clock::duration connect_timeout_duration =
std::chrono::seconds(30)) {
auto lock_ok = connect_mutex_.tryLock();
if (!lock_ok) {
co_await connect_mutex_.coScopedLock();
co_return err_code{};
// do nothing, someone has reconnect the client
}
config_.host = std::move(host);
config_.port = std::move(port);
config_.timeout_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(timeout_duration);

if (config_.host.empty()) {
config_.host = std::move(host);
}
if (config_.port.empty()) {
config_.port = std::move(port);
}
if (!config_.connect_timeout_duration) {
config_.connect_timeout_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(
connect_timeout_duration);
}

auto ret = co_await connect_impl();
connect_mutex_.unlock();
co_return std::move(ret);
}
[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string_view endpoint,
std::chrono::steady_clock::duration timeout_duration =
std::chrono::steady_clock::duration connect_timeout_duration =
std::chrono::seconds(30)) {
auto pos = endpoint.find(':');
auto lock_ok = connect_mutex_.tryLock();
if (!lock_ok) {
co_await connect_mutex_.coScopedLock();
co_return err_code{};
// do nothing, someone has reconnect the client
}
config_.host = endpoint.substr(0, pos);
config_.port = endpoint.substr(pos + 1);
config_.timeout_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(timeout_duration);
auto ret = co_await connect_impl();
connect_mutex_.unlock();
co_return std::move(ret);
}
std::string host(endpoint.substr(0, pos));
std::string port(endpoint.substr(pos + 1));

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect() {
auto lock_ok = connect_mutex_.tryLock();
if (!lock_ok) {
co_await connect_mutex_.coScopedLock();
co_return err_code{};
// do nothing, someone has reconnect the client
}
auto ret = co_await connect_impl();
connect_mutex_.unlock();
co_return std::move(ret);
return connect(std::move(host), std::move(port), connect_timeout_duration);
}

#ifdef YLT_ENABLE_SSL
Expand Down Expand Up @@ -323,11 +313,12 @@ class coro_rpc_client {
*/
template <auto func, typename... Args>
async_simple::coro::Lazy<rpc_result<decltype(get_return_type<func>())>>
call_for(auto duration, Args &&...args) {
call_for(auto request_timeout_duration, Args &&...args) {
using return_type = decltype(get_return_type<func>());
auto async_result =
co_await co_await send_request_for_with_attachment<func, Args...>(
duration, req_attachment_, std::forward<Args>(args)...);
request_timeout_duration, req_attachment_,
std::forward<Args>(args)...);
req_attachment_ = {};
if (async_result) {
control_->resp_buffer_ = async_result->release_buffer();
Expand Down Expand Up @@ -410,9 +401,12 @@ class coro_rpc_client {

ELOGV(INFO, "client_id %d begin to connect %s", config_.client_id,
config_.port.data());
timeout(*this->timer_, config_.timeout_duration, "connect timer canceled")
.start([](auto &&) {
});
auto conn_timeout_dur = *config_.connect_timeout_duration;
if (conn_timeout_dur.count() >= 0) {
timeout(*this->timer_, conn_timeout_dur, "connect timer canceled")
.start([](auto &&) {
});
}

std::error_code ec = co_await coro_io::async_connect(
&control_->executor_, control_->socket_, config_.host, config_.port);
Expand Down Expand Up @@ -747,7 +741,7 @@ class coro_rpc_client {
private:
template <auto func, typename... Args>
async_simple::coro::Lazy<rpc_error> send_request_for_impl(
auto duration, uint32_t &id, coro_io::period_timer &timer,
auto request_timeout_duration, uint32_t &id, coro_io::period_timer &timer,
std::string_view attachment, Args &&...args) {
using R = decltype(get_return_type<func>());

Expand All @@ -766,9 +760,10 @@ class coro_rpc_client {

static_check<func, Args...>();

if (duration.count() > 0) {
timeout(timer, duration, "rpc call timer canceled").start([](auto &&) {
});
if (request_timeout_duration.count() >= 0) {
timeout(timer, request_timeout_duration, "rpc call timer canceled")
.start([](auto &&) {
});
}

#ifdef YLT_ENABLE_SSL
Expand Down Expand Up @@ -965,16 +960,20 @@ class coro_rpc_client {
template <auto func, typename... Args>
async_simple::coro::Lazy<async_simple::coro::Lazy<
async_rpc_result<decltype(get_return_type<func>())>>>
send_request_for_with_attachment(auto time_out_duration,
send_request_for_with_attachment(auto request_timeout_duration,
std::string_view request_attachment,
Args &&...args) {
using rpc_return_t = decltype(get_return_type<func>());
recving_guard guard(control_.get());
uint32_t id;
if (!config_.request_timeout_duration) {
config_.request_timeout_duration = request_timeout_duration;
}

auto timer = std::make_unique<coro_io::period_timer>(
control_->executor_.get_asio_executor());
auto result = co_await send_request_for_impl<func>(
time_out_duration, id, *timer, request_attachment,
*config_.request_timeout_duration, id, *timer, request_attachment,
std::forward<Args>(args)...);
auto &control = *control_;
if (!result) {
Expand Down
76 changes: 69 additions & 7 deletions src/coro_rpc/tests/test_coro_rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,14 +480,76 @@ TEST_CASE("testing client with shutdown") {
g_action = {};
}
TEST_CASE("testing client timeout") {
// SUBCASE("connect, 0ms timeout") {
// coro_rpc_client client;
// auto ret = client.connect("127.0.0.1", "8801", 0ms);
// auto val = syncAwait(ret);
coro_rpc_server server(2, 8801);
server.register_handler<hello, client_hello>();
auto res = server.async_start();
CHECK_MESSAGE(!res.hasResult(), "server start timeout");

// CHECK_MESSAGE(val == std::errc::timed_out,
// make_error_code(val).message());
// }
SUBCASE("connect, 0ms connect timeout") {
coro_rpc_client client;
coro_rpc_client::config config{};
config.connect_timeout_duration = 0ms;
bool r = client.init_config(config);
CHECK(r);
auto ret = client.connect(
"127.0.0.1", "8801",
1000ms); // this arg won't update config connect timeout duration.
auto val = syncAwait(ret);

if (val) {
CHECK_MESSAGE(val == coro_rpc::errc::timed_out, val.message());
}
}
SUBCASE("connect, -1ms never timeout") {
coro_rpc_client client;
coro_rpc_client::config config{};
config.connect_timeout_duration = -1ms; // less than 0, no timeout
// checking.
bool r = client.init_config(config);
CHECK(r);
auto ret = client.connect(
"127.0.0.1", "8801",
1000ms); // this arg won't update config connect timeout duration.
auto val = syncAwait(ret);

CHECK(!val);
}

SUBCASE("connect, 0ms request timeout") {
coro_rpc_client client;
coro_rpc_client::config config{};
config.connect_timeout_duration = 1000ms;
config.request_timeout_duration = 0ms;
bool r = client.init_config(config);
CHECK(r);
auto ret = client.connect(
"127.0.0.1", "8801",
0ms); // 0ms won't cover config connect timeout duration.
auto val = syncAwait(ret);

CHECK(!val);
auto result = syncAwait(client.call<hello>());

if (result.has_value()) {
std::cout << result.value() << std::endl;
}
else {
CHECK_MESSAGE(result.error().code == coro_rpc::errc::timed_out,
result.error().msg);
}
}
SUBCASE("connect, -1ms never request timeout") {
coro_rpc_client client;
client.get_config().request_timeout_duration =
-1ms; // less than 0, never timeout.
auto ret = client.connect("127.0.0.1", "8801");
auto val = syncAwait(ret);

CHECK(!val);
auto result = syncAwait(client.call<hello>());

CHECK(result.has_value());
}
SUBCASE("connect, ip timeout") {
g_action = {};
// https://stackoverflow.com/questions/100841/artificially-create-a-connection-timeout-error
Expand Down

0 comments on commit 71cbe2e

Please sign in to comment.