Skip to content

Commit

Permalink
fix(interactive): Fix signal handling for interactive (#3764)
Browse files Browse the repository at this point in the history
- Fix the signal handling of `actor_system`, add signal handlers for
`SIGINT` and `SIGTERM` to clean up actor_system and resources(including
subprocess).
- Fix the route setting in `hqps_http_handler` and `admin_http_handler`.
Each match_rule should have its own handler pointer, can not be shared.
- Remove the handling of unused url `/interactive/query`
- Fix some destructors.

Fix #3760
  • Loading branch information
zhanglei1949 authored May 10, 2024
1 parent 72e3380 commit ee675e1
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 66 deletions.
13 changes: 13 additions & 0 deletions flex/bin/interactive_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ std::string parse_codegen_dir(const bpo::variables_map& vm) {
return codegen_dir;
}

void blockSignal(int sig) {
sigset_t set;
sigemptyset(&set);
sigaddset(&set, sig);
if (pthread_sigmask(SIG_BLOCK, &set, NULL) != 0) {
perror("pthread_sigmask");
}
}

// When graph_schema is not specified, codegen proxy will use the running graph
// schema in hqps_service
void init_codegen_proxy(const bpo::variables_map& vm,
Expand Down Expand Up @@ -130,6 +139,10 @@ void openDefaultGraph(const std::string workspace, int32_t thread_num,
* The main entrance for InteractiveServer.
*/
int main(int argc, char** argv) {
// block sigint and sigterm in main thread, let seastar handle it
gs::blockSignal(SIGINT);
gs::blockSignal(SIGTERM);

bpo::options_description desc("Usage:");
desc.add_options()("help,h", "Display help messages")(
"enable-admin-service,e", bpo::value<bool>()->default_value(false),
Expand Down
2 changes: 2 additions & 0 deletions flex/engines/http_server/actor_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ void actor_system::launch_worker() {
hiactor::actor_app app{std::move(conf)};
app.run(argc, argv.data(), [this] {
sem_post(&ready_);
seastar::engine().handle_signal(SIGINT, on_exit_);
seastar::engine().handle_signal(SIGTERM, on_exit_);
return seastar::make_ready_future<>();
});
}
Expand Down
12 changes: 8 additions & 4 deletions flex/engines/http_server/actor_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@
#include <semaphore.h>
#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <thread>

namespace server {

class actor_system {
public:
actor_system(uint32_t num_shards, bool enable_dpdk,
bool enable_thread_resource_pool = false,
unsigned external_thread_num = 1)
actor_system(
uint32_t num_shards, bool enable_dpdk,
bool enable_thread_resource_pool = false,
unsigned external_thread_num = 1, std::function<void()> on_exit = []() {})
: num_shards_(num_shards),
enable_dpdk_(enable_dpdk),
enable_thread_resource_pool_(enable_thread_resource_pool),
external_thread_num_(external_thread_num) {}
external_thread_num_(external_thread_num),
on_exit_(on_exit) {}
~actor_system();

void launch();
Expand All @@ -46,6 +49,7 @@ class actor_system {
const bool enable_dpdk_;
const bool enable_thread_resource_pool_;
const unsigned external_thread_num_;
std::function<void()> on_exit_;
std::unique_ptr<std::thread> main_thread_;
std::atomic<bool> running_{false};
sem_t ready_;
Expand Down
105 changes: 73 additions & 32 deletions flex/engines/http_server/handler/admin_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,93 +506,129 @@ void admin_http_handler::stop() {

seastar::future<> admin_http_handler::set_routes() {
return server_.set_routes([](seastar::httpd::routes& r) {
auto admin_graph_handler = new admin_http_graph_handler_impl(
interactive_admin_group_id, shard_admin_graph_concurrency);

auto procedures_handler = new admin_http_procedure_handler_impl(
interactive_admin_group_id, shard_admin_procedure_concurrency);

auto service_handler = new admin_http_service_handler_impl(
interactive_admin_group_id, shard_admin_service_concurrency);

auto node_handler = new admin_http_node_handler_impl(
interactive_admin_group_id, shard_admin_node_concurrency);

auto job_handler = new admin_http_job_handler_impl(
interactive_admin_group_id, shard_admin_job_concurrency);

////Procedure management ///
{
auto match_rule = new seastar::httpd::match_rule(procedures_handler);
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
interactive_admin_group_id, shard_admin_procedure_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure");
// Get All procedures
r.add(match_rule, seastar::httpd::operation_type::GET);
}
{
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
interactive_admin_group_id, shard_admin_procedure_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure");
// Create a new procedure
r.add(match_rule, seastar::httpd::operation_type::POST);
}
{
// Each procedure's handling
auto match_rule = new seastar::httpd::match_rule(procedures_handler);
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
interactive_admin_group_id, shard_admin_procedure_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure")
.add_param("procedure_id");
// Get a procedure
r.add(match_rule, seastar::httpd::operation_type::GET);
r.add(new seastar::httpd::match_rule(*match_rule),
seastar::httpd::operation_type::GET);
}
{
// Each procedure's handling
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
interactive_admin_group_id, shard_admin_procedure_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure")
.add_param("procedure_id");
// Delete a procedure
r.add(match_rule, seastar::httpd::operation_type::DELETE);
r.add(new seastar::httpd::match_rule(*match_rule),
seastar::httpd::operation_type::DELETE);
}
{
// Each procedure's handling
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
interactive_admin_group_id, shard_admin_procedure_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure")
.add_param("procedure_id");
// Update a procedure
r.add(match_rule, seastar::httpd::operation_type::PUT);
r.add(new seastar::httpd::match_rule(*match_rule),
seastar::httpd::operation_type::PUT);
}

// List all graphs.
r.add(seastar::httpd::operation_type::GET, seastar::httpd::url("/v1/graph"),
admin_graph_handler);
new admin_http_graph_handler_impl(interactive_admin_group_id,
shard_admin_graph_concurrency));
// Create a new Graph
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/v1/graph"), admin_graph_handler);
seastar::httpd::url("/v1/graph"),
new admin_http_graph_handler_impl(interactive_admin_group_id,
shard_admin_graph_concurrency));

// Delete a graph
r.add(seastar::httpd::operation_type::DELETE,
seastar::httpd::url("/v1/graph").remainder("graph_id"),
admin_graph_handler);
new admin_http_graph_handler_impl(interactive_admin_group_id,
shard_admin_graph_concurrency));

// Get graph metadata
{
// by setting full_path = false, we can match /v1/graph/{graph_id}/ and
// /v1/graph/{graph_id}/schema
auto match_rule = new seastar::httpd::match_rule(admin_graph_handler);
auto match_rule =
new seastar::httpd::match_rule(new admin_http_graph_handler_impl(
interactive_admin_group_id, shard_admin_graph_concurrency));
match_rule->add_str("/v1/graph").add_param("graph_id", false);
// Get graph schema
r.add(match_rule, seastar::httpd::operation_type::GET);
}

{ // load data to graph
auto match_rule = new seastar::httpd::match_rule(admin_graph_handler);
auto match_rule =
new seastar::httpd::match_rule(new admin_http_graph_handler_impl(
interactive_admin_group_id, shard_admin_graph_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/dataloading");
r.add(match_rule, seastar::httpd::operation_type::POST);
}
{ // Get Graph Schema
auto match_rule = new seastar::httpd::match_rule(admin_graph_handler);
auto match_rule =
new seastar::httpd::match_rule(new admin_http_graph_handler_impl(
interactive_admin_group_id, shard_admin_graph_concurrency));
match_rule->add_str("/v1/graph").add_param("graph_id").add_str("/schema");
r.add(match_rule, seastar::httpd::operation_type::GET);
}

{
// Node and service management
r.add(seastar::httpd::operation_type::GET,
seastar::httpd::url("/v1/node/status"), node_handler);
seastar::httpd::url("/v1/node/status"),
new admin_http_node_handler_impl(interactive_admin_group_id,
shard_admin_node_concurrency));

auto match_rule = new seastar::httpd::match_rule(service_handler);
auto match_rule =
new seastar::httpd::match_rule(new admin_http_service_handler_impl(
interactive_admin_group_id, shard_admin_service_concurrency));
match_rule->add_str("/v1/service").add_param("action");
r.add(match_rule, seastar::httpd::operation_type::POST);

r.add(seastar::httpd::operation_type::GET,
seastar::httpd::url("/v1/service/status"), service_handler);
seastar::httpd::url("/v1/service/status"),
new admin_http_service_handler_impl(
interactive_admin_group_id, shard_admin_service_concurrency));
}

{
Expand Down Expand Up @@ -663,14 +699,19 @@ seastar::future<> admin_http_handler::set_routes() {
{
// job request handling.
r.add(seastar::httpd::operation_type::GET, seastar::httpd::url("/v1/job"),
job_handler);
auto match_rule = new seastar::httpd::match_rule(job_handler);
new admin_http_job_handler_impl(interactive_admin_group_id,
shard_admin_job_concurrency));
auto match_rule =
new seastar::httpd::match_rule(new admin_http_job_handler_impl(
interactive_admin_group_id, shard_admin_job_concurrency));

match_rule->add_str("/v1/job").add_param("job_id");
r.add(match_rule, seastar::httpd::operation_type::GET);

r.add(seastar::httpd::operation_type::DELETE,
seastar::httpd::url("/v1/job").remainder("job_id"), job_handler);
seastar::httpd::url("/v1/job").remainder("job_id"),
new admin_http_job_handler_impl(interactive_admin_group_id,
shard_admin_job_concurrency));
}

return seastar::make_ready_future<>();
Expand Down
7 changes: 2 additions & 5 deletions flex/engines/http_server/handler/hqps_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,8 @@ hqps_http_handler::~hqps_http_handler() {
if (is_running()) {
stop();
}
delete ic_handler_;
delete adhoc_query_handler_;
delete exit_handler_;
// DO NOT DELETE the handler pointers, they will be deleted by
// seastar::httpd::match_rule
}

uint16_t hqps_http_handler::get_port() const { return http_port_; }
Expand Down Expand Up @@ -462,8 +461,6 @@ void hqps_http_handler::start_query_actors() {

seastar::future<> hqps_http_handler::set_routes() {
return server_.set_routes([this](seastar::httpd::routes& r) {
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/interactive/query"), ic_handler_);
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/v1/query"), ic_handler_);
r.add(seastar::httpd::operation_type::POST,
Expand Down
15 changes: 1 addition & 14 deletions flex/engines/http_server/service/hqps_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void HQPSService::init(const ServiceConfig& config) {
}
actor_sys_ = std::make_unique<actor_system>(
config.shard_num, config.dpdk_mode, config.enable_thread_resource_pool,
config.external_thread_num);
config.external_thread_num, [this]() { set_exit_state(); });
query_hdl_ = std::make_unique<hqps_http_handler>(config.query_port);
if (config.start_admin_service) {
admin_hdl_ = std::make_unique<admin_http_handler>(config.admin_port);
Expand Down Expand Up @@ -107,7 +107,6 @@ HQPSService::~HQPSService() {
actor_sys_->terminate();
}
stop_compiler_subprocess();
clear_running_graph();
if (metadata_store_) {
metadata_store_->Close();
}
Expand Down Expand Up @@ -385,16 +384,4 @@ gs::GraphId HQPSService::insert_default_graph_meta() {
return res.value();
}

void HQPSService::clear_running_graph() {
if (!metadata_store_) {
std::cerr << "Metadata store has not been inited!" << std::endl;
return;
}
auto res = metadata_store_->ClearRunningGraph();
if (!res.ok()) {
std::cerr << "Failed to clear running graph: "
<< res.status().error_message() << std::endl;
return;
}
}
} // namespace server
11 changes: 1 addition & 10 deletions flex/storages/metadata/default_graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,7 @@ DefaultGraphMetaStore::DefaultGraphMetaStore(
clear_locks();
}

DefaultGraphMetaStore::~DefaultGraphMetaStore() {
if (base_store_ != nullptr) {
base_store_->Close();
}
auto res = Close();
if (!res.ok()) {
LOG(ERROR) << "Fail to close DefaultGraphMetaStore: "
<< res.status().error_message();
}
}
DefaultGraphMetaStore::~DefaultGraphMetaStore() { Close(); }

Result<bool> DefaultGraphMetaStore::Open() { return base_store_->Open(); }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class HttpExecutionClient extends ExecutionClient<URI> {
private static final Logger logger = LoggerFactory.getLogger(HttpExecutionClient.class);
private static final String CONTENT_TYPE = "Content-Type";
private static final String TEXT_PLAIN = "text/plain;charset=UTF-8";
private static final String INTERACTIVE_QUERY_PATH = "/interactive/query";
private static final String INTERACTIVE_QUERY_PATH = "/v1/query";
private static final String INTERACTIVE_ADHOC_QUERY_PATH = "/interactive/adhoc_query";
private final HttpClient httpClient;

Expand Down

0 comments on commit ee675e1

Please sign in to comment.