diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ed3abea2..4dca301da 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ## Improvements +- #1489 Refactoring port and caches diff --git a/engine/src/execution_graph/logic_controllers/BatchAggregationProcessing.cpp b/engine/src/execution_graph/logic_controllers/BatchAggregationProcessing.cpp index e11ea3e23..f4fa1e43b 100644 --- a/engine/src/execution_graph/logic_controllers/BatchAggregationProcessing.cpp +++ b/engine/src/execution_graph/logic_controllers/BatchAggregationProcessing.cpp @@ -14,7 +14,7 @@ ComputeAggregateKernel::ComputeAggregateKernel(std::size_t kernel_id, const std: } ral::execution::task_result ComputeAggregateKernel::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& /*args*/) { try{ @@ -30,7 +30,7 @@ ral::execution::task_result ComputeAggregateKernel::do_process(std::vector< std: columns = ral::operators::compute_aggregations_with_groupby( input->toBlazingTableView(), aggregation_input_expressions, this->aggregation_types, aggregation_column_assigned_aliases, group_column_indices); } - output->addToCache(std::move(columns)); + this->output_.addToCache(port_name, std::move(columns)); }catch(const rmm::bad_alloc& e){ return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)}; }catch(const std::exception& e){ @@ -55,7 +55,7 @@ kstatus ComputeAggregateKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this); cache_data = this->input_cache()->pullCacheData(); @@ -122,7 +122,7 @@ DistributeAggregateKernel::DistributeAggregateKernel(std::size_t kernel_id, cons } ral::execution::task_result DistributeAggregateKernel::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& /*args*/) { auto & input = inputs[0]; @@ -135,7 +135,7 @@ ral::execution::task_result DistributeAggregateKernel::do_process(std::vector< s if (group_column_indices.size() == 0) { try{ if(this->context->isMasterNode(self_node)) { - bool added = this->output_.get_cache()->addToCache(std::move(input),"",false); + bool added = this->output_.addToCache("", std::move(input), "", false); if (added) { increment_node_count(self_node.id()); } @@ -186,7 +186,6 @@ ral::execution::task_result DistributeAggregateKernel::do_process(std::vector< s } scatter(partitions, - output.get(), "", //message_id_prefix "" //cache_id ); @@ -219,7 +218,7 @@ kstatus DistributeAggregateKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this); cache_data = this->input_cache()->pullCacheData(); @@ -275,7 +274,7 @@ MergeAggregateKernel::MergeAggregateKernel(std::size_t kernel_id, const std::str } ral::execution::task_result MergeAggregateKernel::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& /*args*/) { try{ @@ -336,7 +335,7 @@ ral::execution::task_result MergeAggregateKernel::do_process(std::vector< std::u auto log_output_num_rows = columns->num_rows(); auto log_output_num_bytes = columns->sizeInBytes(); - output->addToCache(std::move(columns)); + this->output_.addToCache(port_name, std::move(columns)); columns = nullptr; }catch(const rmm::bad_alloc& e){ return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)}; @@ -367,7 +366,7 @@ kstatus MergeAggregateKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this); if(logger){ diff --git a/engine/src/execution_graph/logic_controllers/BatchAggregationProcessing.h b/engine/src/execution_graph/logic_controllers/BatchAggregationProcessing.h index c9d443be9..78d71f507 100644 --- a/engine/src/execution_graph/logic_controllers/BatchAggregationProcessing.h +++ b/engine/src/execution_graph/logic_controllers/BatchAggregationProcessing.h @@ -19,7 +19,7 @@ class ComputeAggregateKernel : public kernel { std::string kernel_name() { return "ComputeAggregate";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; virtual kstatus run(); @@ -40,7 +40,7 @@ class DistributeAggregateKernel : public distributing_kernel { std::string kernel_name() { return "DistributeAggregate";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; virtual kstatus run(); @@ -60,7 +60,7 @@ class MergeAggregateKernel : public kernel { std::string kernel_name() { return "MergeAggregate";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; virtual kstatus run(); diff --git a/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.cpp b/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.cpp index 06d87c5d2..061dfc0cd 100644 --- a/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.cpp +++ b/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.cpp @@ -239,7 +239,7 @@ void split_inequality_join_into_join_and_filter(const std::string & join_stateme PartwiseJoin::PartwiseJoin(std::size_t kernel_id, const std::string & queryString, std::shared_ptr context, std::shared_ptr query_graph) : kernel{kernel_id, queryString, context, kernel_type::PartwiseJoinKernel} { this->query_graph = query_graph; - this->input_.add_port("input_a", "input_b"); + this->input_.add_port("input_a").add_port("input_b"); this->max_left_ind = -1; this->max_right_ind = -1; @@ -405,7 +405,7 @@ std::unique_ptr PartwiseJoin::join_set( } ral::execution::task_result PartwiseJoin::do_process(std::vector> inputs, - std::shared_ptr /*output*/, + std::string /*output*/, cudaStream_t /*stream*/, const std::map& args) { CodeTimer eventTimer; @@ -543,7 +543,7 @@ kstatus PartwiseJoin::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this, {{"left_idx", std::to_string(left_ind)}, {"right_idx", std::to_string(right_ind)}}); @@ -600,8 +600,8 @@ JoinPartitionKernel::JoinPartitionKernel(std::size_t kernel_id, const std::strin this->query_graph = query_graph; set_number_of_message_trackers(2); //default for left and right partitions - this->input_.add_port("input_a", "input_b"); - this->output_.add_port("output_a", "output_b"); + this->input_.add_port("input_a").add_port("input_b"); + this->output_.add_port("output_a").add_port("output_b"); std::tie(this->expression, this->condition, this->filter_statement, this->join_type) = parseExpressionToGetTypeAndCondition(this->expression); } @@ -835,7 +835,7 @@ void JoinPartitionKernel::perform_standard_hash_partitioning( ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache("output_a"), + "output_a", this, {{"operation_type", "hash_partition"}, {"side", "left"}}); @@ -850,7 +850,7 @@ void JoinPartitionKernel::perform_standard_hash_partitioning( ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache("output_b"), + "output_b", this, {{"operation_type", "hash_partition"}, {"side", "right"}}); @@ -937,7 +937,7 @@ void JoinPartitionKernel::small_table_scatter_distribution(std::unique_ptradd_task( std::move(inputs), - this->output_cache(small_output_cache_name), + small_output_cache_name, this, {{"operation_type", "small_table_scatter"}}); @@ -1001,7 +1001,7 @@ void JoinPartitionKernel::small_table_scatter_distribution(std::unique_ptr> inputs, - std::shared_ptr /*output*/, + std::string /*output*/, cudaStream_t /*stream*/, const std::map& args) { bool input_consumed = false; try{ @@ -1069,9 +1069,8 @@ ral::execution::task_result JoinPartitionKernel::do_process(std::vectoroutput_.get_cache(cache_id).get(), "", //message_id_prefix - cache_id, //cache_id + cache_id, //cache_id = port_name table_idx //message_tracker_idx ); } else { // not an option! error diff --git a/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.h b/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.h index e9eee1322..202f71780 100644 --- a/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.h +++ b/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.h @@ -47,7 +47,7 @@ class PartwiseJoin : public kernel { const ral::frame::BlazingTableView & table_right); ral::execution::task_result do_process(std::vector> inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; kstatus run() override; @@ -97,7 +97,7 @@ class JoinPartitionKernel : public distributing_kernel { JoinPartitionKernel(std::size_t kernel_id, const std::string & queryString, std::shared_ptr context, std::shared_ptr query_graph); ral::execution::task_result do_process(std::vector> inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; kstatus run() override; diff --git a/engine/src/execution_graph/logic_controllers/BatchOrderByProcessing.cpp b/engine/src/execution_graph/logic_controllers/BatchOrderByProcessing.cpp index 6024822c8..d5af9f895 100644 --- a/engine/src/execution_graph/logic_controllers/BatchOrderByProcessing.cpp +++ b/engine/src/execution_graph/logic_controllers/BatchOrderByProcessing.cpp @@ -12,7 +12,7 @@ namespace batch { PartitionSingleNodeKernel::PartitionSingleNodeKernel(std::size_t kernel_id, const std::string & queryString, std::shared_ptr context, std::shared_ptr query_graph) : kernel{kernel_id, queryString, context, kernel_type::PartitionSingleNodeKernel} { this->query_graph = query_graph; - this->input_.add_port("input_a", "input_b"); + this->input_.add_port("input_a").add_port("input_b"); if (is_window_function(this->expression)) { if (window_expression_contains_partition_by(this->expression)){ @@ -26,7 +26,7 @@ PartitionSingleNodeKernel::PartitionSingleNodeKernel(std::size_t kernel_id, cons } ral::execution::task_result PartitionSingleNodeKernel::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr /*output*/, + std::string /*output*/, cudaStream_t /*stream*/, const std::map& /*args*/) { try{ @@ -65,7 +65,7 @@ kstatus PartitionSingleNodeKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - nullptr, + "",//nullptr? this); } } @@ -111,7 +111,7 @@ SortAndSampleKernel::SortAndSampleKernel(std::size_t kernel_id, const std::strin { this->query_graph = query_graph; set_number_of_message_trackers(2); //default - this->output_.add_port("output_a", "output_b"); + this->output_.add_port("output_a").add_port("output_b"); get_samples = true; already_computed_partition_plan = false; } @@ -144,7 +144,7 @@ void SortAndSampleKernel::make_partition_plan_task(){ ral::execution::executor::get_instance()->add_task( std::move(sampleCacheDatas), - this->output_cache("output_b"), + "output_b", this, {{"operation_type", "compute_partition_plan"}}); @@ -225,7 +225,7 @@ bool SortAndSampleKernel::all_node_samples_are_available(){ } ral::execution::task_result SortAndSampleKernel::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& args) { try{ @@ -265,7 +265,7 @@ ral::execution::task_result SortAndSampleKernel::do_process(std::vector< std::un auto num_bytes = sortedTable->sizeInBytes(); } - output->addToCache(std::move(sortedTable), "output_a"); + this->output_.addToCache(port_name, std::move(sortedTable), "output_a"); } else if (operation_type == "compute_partition_plan") { compute_partition_plan(std::move(inputs)); @@ -294,7 +294,7 @@ kstatus SortAndSampleKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache("output_a"), + "output_a", this, {{"operation_type", "ordering_and_get_samples"}}); @@ -347,7 +347,7 @@ kstatus SortAndSampleKernel::run() { PartitionKernel::PartitionKernel(std::size_t kernel_id, const std::string & queryString, std::shared_ptr context, std::shared_ptr query_graph) : distributing_kernel{kernel_id, queryString, context, kernel_type::PartitionKernel} { this->query_graph = query_graph; - this->input_.add_port("input_a", "input_b"); + this->input_.add_port("input_a").add_port("input_b"); std::map config_options = context->getConfigOptions(); int max_num_order_by_partitions_per_node = 8; @@ -369,7 +369,7 @@ PartitionKernel::PartitionKernel(std::size_t kernel_id, const std::string & quer } ral::execution::task_result PartitionKernel::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr /*output*/, + std::string /*output*/, cudaStream_t /*stream*/, const std::map& /*args*/) { try{ auto & input = inputs[0]; @@ -421,7 +421,7 @@ kstatus PartitionKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - nullptr, + "",//nullptr? this); cache_data = this->input_.get_cache("input_a")->pullCacheData(); @@ -485,21 +485,21 @@ MergeStreamKernel::MergeStreamKernel(std::size_t kernel_id, const std::string & } ral::execution::task_result MergeStreamKernel::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& /*args*/) { try{ if (inputs.empty()) { // no op } else if(inputs.size() == 1) { - output->addToCache(std::move(inputs[0])); + this->output_.addToCache(port_name, std::move(inputs[0])); } else { std::vector< ral::frame::BlazingTableView > tableViewsToConcat; for (std::size_t i = 0; i < inputs.size(); i++){ tableViewsToConcat.emplace_back(inputs[i]->toBlazingTableView()); } auto output_merge = ral::operators::merge(tableViewsToConcat, this->expression); - output->addToCache(std::move(output_merge)); + this->output_.addToCache(port_name, std::move(output_merge)); } }catch(const rmm::bad_alloc& e){ return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)}; @@ -530,7 +530,7 @@ kstatus MergeStreamKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this); batch_count++; @@ -593,13 +593,13 @@ LimitKernel::LimitKernel(std::size_t kernel_id, const std::string & queryString, } ral::execution::task_result LimitKernel::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& /*args*/) { try{ CodeTimer eventTimer(false); auto & input = inputs[0]; if (rows_limit<0) { - output->addToCache(std::move(input)); + this->output_.addToCache(port_name, std::move(input)); } else { auto log_input_num_rows = input->num_rows(); auto log_input_num_bytes = input->sizeInBytes(); @@ -615,9 +615,9 @@ ral::execution::task_result LimitKernel::do_process(std::vector< std::unique_ptr auto log_output_num_bytes = output_is_just_input ? input->sizeInBytes() : limited_input->sizeInBytes(); if (output_is_just_input) - output->addToCache(std::move(input)); + this->output_.addToCache(port_name, std::move(input)); else - output->addToCache(std::move(limited_input)); + this->output_.addToCache(port_name, std::move(limited_input)); } }catch(const rmm::bad_alloc& e){ return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)}; @@ -687,7 +687,7 @@ kstatus LimitKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this); if (rows_limit == 0){ diff --git a/engine/src/execution_graph/logic_controllers/BatchOrderByProcessing.h b/engine/src/execution_graph/logic_controllers/BatchOrderByProcessing.h index 3dcd149b8..306e14192 100644 --- a/engine/src/execution_graph/logic_controllers/BatchOrderByProcessing.h +++ b/engine/src/execution_graph/logic_controllers/BatchOrderByProcessing.h @@ -23,7 +23,7 @@ class PartitionSingleNodeKernel : public kernel { std::string kernel_name() { return "PartitionSingleNode";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; kstatus run() override; @@ -54,7 +54,7 @@ std::size_t PARTITION_PLAN_MESSAGE_TRACKER_IDX = 1; std::vector> inputSamples); ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; kstatus run() override; @@ -79,7 +79,7 @@ class PartitionKernel : public distributing_kernel { std::string kernel_name() { return "Partition";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; kstatus run() override; @@ -104,7 +104,7 @@ class MergeStreamKernel : public kernel { std::string kernel_name() { return "MergeStream";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; kstatus run() override; @@ -124,7 +124,7 @@ class LimitKernel : public distributing_kernel { std::string kernel_name() { return "Limit";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; kstatus run() override; diff --git a/engine/src/execution_graph/logic_controllers/BatchProcessing.cpp b/engine/src/execution_graph/logic_controllers/BatchProcessing.cpp index f1eac6049..7cadbef1c 100644 --- a/engine/src/execution_graph/logic_controllers/BatchProcessing.cpp +++ b/engine/src/execution_graph/logic_controllers/BatchProcessing.cpp @@ -154,10 +154,10 @@ TableScan::TableScan(std::size_t kernel_id, const std::string & queryString, std } ral::execution::task_result TableScan::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& /*args*/) { try{ - output->addToCache(std::move(inputs[0])); + this->output_.addToCache(port_name, std::move(inputs[0])); }catch(const rmm::bad_alloc& e){ //can still recover if the input was not a GPUCacheData return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)}; @@ -190,11 +190,10 @@ kstatus TableScan::run() { std::make_unique(handle,parser,schema,file_schema,row_group_ids,projections); std::vector > inputs; inputs.push_back(std::move(input)); - auto output_cache = this->output_cache(); ral::execution::executor::get_instance()->add_task( std::move(inputs), - output_cache, + "", //default port_name this); /*if (this->has_limit_ && output_cache->get_num_rows_added() >= this->limit_rows_) { @@ -303,7 +302,7 @@ BindableTableScan::BindableTableScan(std::size_t kernel_id, const std::string & } ral::execution::task_result BindableTableScan::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& /*args*/) { auto & input = inputs[0]; std::unique_ptr filtered_input; @@ -312,10 +311,10 @@ ral::execution::task_result BindableTableScan::do_process(std::vector< std::uniq if(this->filterable && !this->predicate_pushdown_done) { filtered_input = ral::processor::process_filter(input->toBlazingTableView(), expression, this->context.get()); filtered_input->setNames(fix_column_aliases(filtered_input->names(), expression)); - output->addToCache(std::move(filtered_input)); + this->output_.addToCache(port_name, std::move(filtered_input)); } else { input->setNames(fix_column_aliases(input->names(), expression)); - output->addToCache(std::move(input)); + this->output_.addToCache(port_name, std::move(input)); } }catch(const rmm::bad_alloc& e){ //can still recover if the input was not a GPUCacheData @@ -352,11 +351,9 @@ kstatus BindableTableScan::run() { std::vector > inputs; inputs.push_back(std::move(input)); - auto output_cache = this->output_cache(); - ral::execution::executor::get_instance()->add_task( std::move(inputs), - output_cache, + "", //default port_name this); file_index++; @@ -418,13 +415,13 @@ Projection::Projection(std::size_t kernel_id, const std::string & queryString, s } ral::execution::task_result Projection::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& /*args*/) { try{ auto & input = inputs[0]; auto columns = ral::processor::process_project(std::move(input), expression, this->context.get()); - output->addToCache(std::move(columns)); + this->output_.addToCache(port_name, std::move(columns)); }catch(const rmm::bad_alloc& e){ //can still recover if the input was not a GPUCacheData return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)}; @@ -459,7 +456,7 @@ kstatus Projection::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this); } cache_data = this->input_cache()->pullCacheData(); @@ -507,14 +504,14 @@ Filter::Filter(std::size_t kernel_id, const std::string & queryString, std::shar } ral::execution::task_result Filter::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& /*args*/) { std::unique_ptr columns; try{ auto & input = inputs[0]; columns = ral::processor::process_filter(input->toBlazingTableView(), expression, this->context.get()); - output->addToCache(std::move(columns)); + this->output_.addToCache(port_name, std::move(columns)); }catch(const rmm::bad_alloc& e){ return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)}; }catch(const std::exception& e){ @@ -534,7 +531,7 @@ kstatus Filter::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this); cache_data = this->input_cache()->pullCacheData(); diff --git a/engine/src/execution_graph/logic_controllers/BatchProcessing.h b/engine/src/execution_graph/logic_controllers/BatchProcessing.h index 62a063561..dbc873fd6 100644 --- a/engine/src/execution_graph/logic_controllers/BatchProcessing.h +++ b/engine/src/execution_graph/logic_controllers/BatchProcessing.h @@ -131,7 +131,7 @@ class TableScan : public kernel { std::string kernel_name() { return "TableScan";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; /** @@ -179,7 +179,7 @@ class BindableTableScan : public kernel { std::string kernel_name() { return "BindableTableScan";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; /** @@ -223,7 +223,7 @@ class Projection : public kernel { std::string kernel_name() { return "Projection";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; /** @@ -252,7 +252,7 @@ class Filter : public kernel { std::string kernel_name() { return "Filter";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; /** @@ -314,7 +314,7 @@ class OutputKernel : public kernel { std::string kernel_name() { return "Output";} ral::execution::task_result do_process(std::vector< std::unique_ptr > /*inputs*/, - std::shared_ptr /*output*/, + std::string /*output*/, cudaStream_t /*stream*/, const std::map& /*args*/) override { //for now the output kernel is not using do_process //i believe the output should be a cachemachine itself diff --git a/engine/src/execution_graph/logic_controllers/BatchUnionProcessing.cpp b/engine/src/execution_graph/logic_controllers/BatchUnionProcessing.cpp index c034b536b..e3e6e65bb 100644 --- a/engine/src/execution_graph/logic_controllers/BatchUnionProcessing.cpp +++ b/engine/src/execution_graph/logic_controllers/BatchUnionProcessing.cpp @@ -11,11 +11,11 @@ namespace batch { UnionKernel::UnionKernel(std::size_t kernel_id, const std::string & queryString, std::shared_ptr context, std::shared_ptr query_graph) : kernel{kernel_id, queryString, context, kernel_type::UnionKernel} { this->query_graph = query_graph; - this->input_.add_port("input_a", "input_b"); + this->input_.add_port("input_a").add_port("input_b"); } ral::execution::task_result UnionKernel::do_process(std::vector< std::unique_ptr> inputs, - std::shared_ptr /*output*/, + std::string /*output*/, cudaStream_t /*stream*/, const std::map& /*args*/) { auto & input = inputs[0]; @@ -67,7 +67,7 @@ kstatus UnionKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this); } else { this->add_to_output_cache(std::move(cache_data_a)); @@ -87,7 +87,7 @@ kstatus UnionKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this); } else { this->add_to_output_cache(std::move(cache_data_b)); diff --git a/engine/src/execution_graph/logic_controllers/BatchUnionProcessing.h b/engine/src/execution_graph/logic_controllers/BatchUnionProcessing.h index acbf9a7dd..3191a8c8c 100644 --- a/engine/src/execution_graph/logic_controllers/BatchUnionProcessing.h +++ b/engine/src/execution_graph/logic_controllers/BatchUnionProcessing.h @@ -19,7 +19,7 @@ class UnionKernel : public kernel { std::string kernel_name() { return "Union";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; virtual kstatus run(); diff --git a/engine/src/execution_graph/logic_controllers/BatchWindowFunctionProcessing.cpp b/engine/src/execution_graph/logic_controllers/BatchWindowFunctionProcessing.cpp index f1e9301a0..0deb26bfc 100644 --- a/engine/src/execution_graph/logic_controllers/BatchWindowFunctionProcessing.cpp +++ b/engine/src/execution_graph/logic_controllers/BatchWindowFunctionProcessing.cpp @@ -161,7 +161,7 @@ std::unique_ptr ComputeWindowKernel::compute_column_from_window_func } ral::execution::task_result ComputeWindowKernel::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& args) { @@ -243,7 +243,7 @@ ral::execution::task_result ComputeWindowKernel::do_process(std::vector< std::un std::size_t num_bytes = windowed_table->sizeInBytes(); } - output->addToCache(std::move(windowed_table)); + this->output_.addToCache(port_name, std::move(windowed_table)); }catch(const rmm::bad_alloc& e){ return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)}; }catch(const std::exception& e){ @@ -282,7 +282,7 @@ kstatus ComputeWindowKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this, task_args); @@ -295,7 +295,7 @@ kstatus ComputeWindowKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this); cache_data = this->input_cache()->pullCacheData(); @@ -330,7 +330,7 @@ OverlapGeneratorKernel::OverlapGeneratorKernel(std::size_t kernel_id, const std: std::shared_ptr query_graph) : kernel{kernel_id, queryString, context, kernel_type::OverlapGeneratorKernel} { this->query_graph = query_graph; - this->output_.add_port("batches", "preceding_overlaps", "following_overlaps"); + this->output_.add_port("batches").add_port("preceding_overlaps").add_port("following_overlaps"); std::tie(this->preceding_value, this->following_value) = get_bounds_from_window_expression(this->expression); @@ -341,7 +341,7 @@ OverlapGeneratorKernel::OverlapGeneratorKernel(std::size_t kernel_id, const std: ral::execution::task_result OverlapGeneratorKernel::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& args) { try { @@ -356,12 +356,12 @@ ral::execution::task_result OverlapGeneratorKernel::do_process(std::vector< std: auto limited = ral::utilities::getLimitedRows(input->toBlazingTableView(), this->preceding_value, false); ral::cache::MetadataDictionary extra_metadata; extra_metadata.add_value(ral::cache::OVERLAP_STATUS, DONE_OVERLAP_STATUS); - this->output_preceding_overlap_cache->addToCache(std::move(limited), "", true, extra_metadata); + this->output_.addToCache("preceding_overlaps", std::move(limited), "", true, extra_metadata); } else { auto clone = input->toBlazingTableView().clone(); ral::cache::MetadataDictionary extra_metadata; extra_metadata.add_value(ral::cache::OVERLAP_STATUS, INCOMPLETE_OVERLAP_STATUS); - this->output_preceding_overlap_cache->addToCache(std::move(clone), "", true, extra_metadata); + this->output_.addToCache("preceding_overlaps", std::move(clone), "", true, extra_metadata); } } @@ -371,16 +371,16 @@ ral::execution::task_result OverlapGeneratorKernel::do_process(std::vector< std: auto limited = ral::utilities::getLimitedRows(input->toBlazingTableView(), this->following_value, true); ral::cache::MetadataDictionary extra_metadata; extra_metadata.add_value(ral::cache::OVERLAP_STATUS, DONE_OVERLAP_STATUS); - this->output_following_overlap_cache->addToCache(std::move(limited), "", true, extra_metadata); + this->output_.addToCache("following_overlaps", std::move(limited), "", true, extra_metadata); } else { auto clone = input->toBlazingTableView().clone(); ral::cache::MetadataDictionary extra_metadata; extra_metadata.add_value(ral::cache::OVERLAP_STATUS, INCOMPLETE_OVERLAP_STATUS); - this->output_following_overlap_cache->addToCache(std::move(clone), "", true, extra_metadata); + this->output_.addToCache("following_overlaps", std::move(clone), "", true, extra_metadata); } } - this->output_batches_cache->addToCache(std::move(input)); - + this->output_.addToCache("batches", std::move(input)); + }catch(rmm::bad_alloc e){ return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)}; }catch(std::exception e){ @@ -397,10 +397,6 @@ kstatus OverlapGeneratorKernel::run() { bool neighbors_notified_of_complete = false; int total_nodes = context->getTotalNodes(); - output_batches_cache = this->output_.get_cache("batches"); - output_preceding_overlap_cache = this->output_.get_cache("preceding_overlaps"); - output_following_overlap_cache = this->output_.get_cache("following_overlaps"); - std::unique_ptr cache_data = this->input_cache()->pullCacheData(); int batch_index = 0; while (cache_data != nullptr ){ @@ -424,16 +420,15 @@ kstatus OverlapGeneratorKernel::run() { } if (cache_data == nullptr && batch_index == 0) { // this is the first and last batch, then we dont need to process overlaps - this->output_batches_cache->addCacheData(std::move(inputs[0])); + this->output_.addCacheData("batches", std::move(inputs[0])); } else { ral::execution::executor::get_instance()->add_task( std::move(inputs), - this->output_cache(), + "", //default port_name this, task_args); } - - + batch_index++; } @@ -468,7 +463,7 @@ OverlapAccumulatorKernel::OverlapAccumulatorKernel(std::size_t kernel_id, const std::shared_ptr query_graph) : distributing_kernel{kernel_id, queryString, context, kernel_type::OverlapAccumulatorKernel} { this->query_graph = query_graph; - this->input_.add_port("batches", "preceding_overlaps", "following_overlaps"); + this->input_.add_port("batches").add_port("preceding_overlaps").add_port("following_overlaps"); this->num_batches = 0; @@ -561,7 +556,7 @@ void OverlapAccumulatorKernel::combine_overlaps(bool preceding, int target_batch ral::execution::task_result OverlapAccumulatorKernel::do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t /*stream*/, const std::map& args) { try { @@ -802,7 +797,7 @@ void OverlapAccumulatorKernel::prepare_overlap_task(bool preceding, int source_b task_args[ral::cache::OVERLAP_STATUS] = overlap_status; ral::execution::executor::get_instance()->add_task( std::move(cache_datas_for_task_vect), - preceding ? preceding_overlap_cache : following_overlap_cache, + preceding ? "preceding_overlaps" : "following_overlaps", this, task_args); } diff --git a/engine/src/execution_graph/logic_controllers/BatchWindowFunctionProcessing.h b/engine/src/execution_graph/logic_controllers/BatchWindowFunctionProcessing.h index 29a7c065d..822089b03 100644 --- a/engine/src/execution_graph/logic_controllers/BatchWindowFunctionProcessing.h +++ b/engine/src/execution_graph/logic_controllers/BatchWindowFunctionProcessing.h @@ -33,7 +33,7 @@ class ComputeWindowKernel : public kernel { std::string kernel_name() { return "ComputeWindow";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; kstatus run() override; @@ -98,19 +98,14 @@ class OverlapGeneratorKernel : public kernel { std::string kernel_name() { return "OverlapGenerator";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; kstatus run() override; private: - int preceding_value; // X PRECEDING - int following_value; // Y FOLLOWING - - // these are the three output caches - std::shared_ptr output_batches_cache; - std::shared_ptr output_preceding_overlap_cache; - std::shared_ptr output_following_overlap_cache; + int preceding_value; // X PRECEDING + int following_value; // Y FOLLOWING int self_node_index; int total_nodes; @@ -147,7 +142,7 @@ class OverlapAccumulatorKernel : public distributing_kernel { std::string kernel_name() { return "OverlapAccumulator";} ral::execution::task_result do_process(std::vector< std::unique_ptr > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args) override; kstatus run() override; diff --git a/engine/src/execution_graph/logic_controllers/taskflow/distributing_kernel.cpp b/engine/src/execution_graph/logic_controllers/taskflow/distributing_kernel.cpp index cad37866d..5aee7c783 100644 --- a/engine/src/execution_graph/logic_controllers/taskflow/distributing_kernel.cpp +++ b/engine/src/execution_graph/logic_controllers/taskflow/distributing_kernel.cpp @@ -168,10 +168,11 @@ void distributing_kernel::broadcast(std::unique_ptr ta void distributing_kernel::scatter(std::vector partitions, - ral::cache::CacheMachine* output, std::string message_id_prefix, - std::string cache_id, + std::string cache_id, //port_name std::size_t message_tracker_idx) { + + ral::cache::CacheMachine* output = this->output_.get_cache(cache_id).get(); auto nodes = context->getAllNodes(); assert(nodes.size() == partitions.size()); diff --git a/engine/src/execution_graph/logic_controllers/taskflow/distributing_kernel.h b/engine/src/execution_graph/logic_controllers/taskflow/distributing_kernel.h index 5cdedde80..bbc701aef 100644 --- a/engine/src/execution_graph/logic_controllers/taskflow/distributing_kernel.h +++ b/engine/src/execution_graph/logic_controllers/taskflow/distributing_kernel.h @@ -69,13 +69,11 @@ class distributing_kernel : public kernel { * It is assumed that the size of the vector is the same as the number of nodes. * * @param partitions The table partitions to be sent. - * @param output The output cache. * @param message_id_prefix The prefix of the identifier of this message. * @param cache_id Indicates what cache a message should be routed to. * @param message_tracker_idx The message tracker index. */ void scatter(std::vector partitions, - ral::cache::CacheMachine* output, std::string message_id_prefix, std::string cache_id, std::size_t message_tracker_idx = 0); diff --git a/engine/src/execution_graph/logic_controllers/taskflow/executor.cpp b/engine/src/execution_graph/logic_controllers/taskflow/executor.cpp index cdce70f47..2b850fd81 100644 --- a/engine/src/execution_graph/logic_controllers/taskflow/executor.cpp +++ b/engine/src/execution_graph/logic_controllers/taskflow/executor.cpp @@ -6,15 +6,16 @@ namespace ral { namespace execution{ size_t executor::add_task(std::vector > inputs, - std::shared_ptr output, - ral::cache::kernel * kernel, const std::map& args) { + std::string port_name, + ral::cache::kernel * kernel, + const std::map& args) { auto task_id = task_id_counter.fetch_add(1, std::memory_order_relaxed); kernel->add_task(task_id); auto task_added = std::make_unique( - std::move(inputs),output,task_id, kernel, attempts_limit, args + std::move(inputs), port_name, task_id, kernel, attempts_limit, args ); task_queue.put(std::move(task_added)); @@ -23,13 +24,13 @@ size_t executor::add_task(std::vector > void executor::add_task(std::vector > inputs, - std::shared_ptr output, + std::string port_name, ral::cache::kernel * kernel, size_t attempts, size_t task_id, const std::map& args){ auto task_added = std::make_unique( - std::move(inputs),output,task_id, kernel, attempts_limit, args, attempts + std::move(inputs), port_name, task_id, kernel, attempts_limit, args, attempts ); task_queue.put(std::move(task_added)); } @@ -44,13 +45,13 @@ std::unique_ptr executor::remove_task_from_back(){ task::task( std::vector > inputs, - std::shared_ptr output, + std::string port_name, size_t task_id, ral::cache::kernel * kernel, size_t attempts_limit, const std::map& args, size_t attempts) : inputs(std::move(inputs)), - output(output), task_id(task_id), + port_name(port_name), task_id(task_id), kernel(kernel),attempts(attempts), attempts_limit(attempts_limit), args(args) { @@ -106,7 +107,7 @@ void task::run(cudaStream_t stream, executor * executor){ this->attempts++; if(this->attempts < this->attempts_limit){ - executor->add_task(std::move(inputs), output, kernel, attempts, task_id, args); + executor->add_task(std::move(inputs), port_name, kernel, attempts, task_id, args); return; }else{ throw; @@ -130,7 +131,7 @@ void task::run(cudaStream_t stream, executor * executor){ } CodeTimer executionEventTimer; - auto task_result = kernel->process(std::move(input_gpu),output,stream, args); + auto task_result = kernel->process(std::move(input_gpu), port_name, stream, args); if(task_logger) { task_logger->info("{time_started}|{ral_id}|{query_id}|{kernel_id}|{duration_decaching}|{duration_execution}|{input_num_rows}|{input_num_bytes}", @@ -166,7 +167,7 @@ void task::run(cudaStream_t stream, executor * executor){ } this->attempts++; if(this->attempts < this->attempts_limit){ - executor->add_task(std::move(inputs), output, kernel, attempts, task_id, args); + executor->add_task(std::move(inputs), port_name, kernel, attempts, task_id, args); }else{ throw rmm::bad_alloc("Ran out of memory processing"); } diff --git a/engine/src/execution_graph/logic_controllers/taskflow/executor.h b/engine/src/execution_graph/logic_controllers/taskflow/executor.h index 001bda8c3..a46c26a41 100644 --- a/engine/src/execution_graph/logic_controllers/taskflow/executor.h +++ b/engine/src/execution_graph/logic_controllers/taskflow/executor.h @@ -24,7 +24,7 @@ class task { task( std::vector > inputs, - std::shared_ptr output, + std::string port_name, size_t task_id, ral::cache::kernel * kernel, size_t attempts_limit, const std::map& args, size_t attempts = 0); @@ -51,7 +51,8 @@ class task { protected: std::vector > inputs; - std::shared_ptr output; + //std::shared_ptr output; + std::string port_name; size_t task_id; ral::cache::kernel * kernel; size_t attempts = 0; @@ -89,11 +90,12 @@ class executor{ bool has_exception(); size_t add_task(std::vector > inputs, - std::shared_ptr output, - ral::cache::kernel * kernel, const std::map& args = {}); + std::string port_name, + ral::cache::kernel * kernel, + const std::map& args = {}); void add_task(std::vector > inputs, - std::shared_ptr output, + std::string port_name, ral::cache::kernel * kernel, size_t attempts, size_t task_id, const std::map& args = {}); diff --git a/engine/src/execution_graph/logic_controllers/taskflow/kernel.cpp b/engine/src/execution_graph/logic_controllers/taskflow/kernel.cpp index ff151b8c4..94c85ade3 100644 --- a/engine/src/execution_graph/logic_controllers/taskflow/kernel.cpp +++ b/engine/src/execution_graph/logic_controllers/taskflow/kernel.cpp @@ -43,7 +43,7 @@ bool kernel::add_to_output_cache(std::unique_ptr table std::string message_id = get_message_id(); message_id = !cache_id.empty() ? cache_id + "_" + message_id : message_id; cache_id = cache_id.empty() ? std::to_string(this->get_id()) : cache_id; - bool added = this->output_.get_cache(cache_id)->addToCache(std::move(table), message_id, always_add); + bool added = this->output_.addToCache(cache_id, std::move(table), message_id, always_add); return added; } @@ -52,7 +52,7 @@ bool kernel::add_to_output_cache(std::unique_ptr cache_da std::string message_id = get_message_id(); message_id = !cache_id.empty() ? cache_id + "_" + message_id : message_id; cache_id = cache_id.empty() ? std::to_string(this->get_id()) : cache_id; - bool added = this->output_.get_cache(cache_id)->addCacheData(std::move(cache_data), message_id, always_add); + bool added = this->output_.addCacheData(cache_id, std::move(cache_data), message_id, always_add); return added; } @@ -61,7 +61,7 @@ bool kernel::add_to_output_cache(std::unique_ptr h std::string message_id = get_message_id(); message_id = !cache_id.empty() ? cache_id + "_" + message_id : message_id; cache_id = cache_id.empty() ? std::to_string(this->get_id()) : cache_id; - bool added = this->output_.get_cache(cache_id)->addHostFrameToCache(std::move(host_table), message_id); + bool added = this->output_.addHostFrameToCache(cache_id, std::move(host_table), message_id); return added; } @@ -73,7 +73,7 @@ std::pair kernel::get_estimated_output_num_rows(){ } ral::execution::task_result kernel::process(std::vector> inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args){ @@ -92,7 +92,7 @@ ral::execution::task_result kernel::process(std::vectorsizeInBytes(); rows += input->num_rows(); } - auto result = do_process(std::move(inputs), output, stream, args); + auto result = do_process(std::move(inputs), port_name, stream, args); if(result.status == ral::execution::SUCCESS){ // increment these AFTER its been processed successfully total_input_bytes_processed += bytes; diff --git a/engine/src/execution_graph/logic_controllers/taskflow/kernel.h b/engine/src/execution_graph/logic_controllers/taskflow/kernel.h index e07a21899..667b13f08 100644 --- a/engine/src/execution_graph/logic_controllers/taskflow/kernel.h +++ b/engine/src/execution_graph/logic_controllers/taskflow/kernel.h @@ -186,7 +186,7 @@ class kernel { * @brief Invokes the do_process function. */ ral::execution::task_result process(std::vector > inputs, - std::shared_ptr output, + std::string port_name, cudaStream_t stream, const std::map& args); /** @@ -197,7 +197,8 @@ class kernel { * @param args any additional arguments the kernel may need to perform its execution that may not be available to the kernel at instantiation. */ virtual ral::execution::task_result do_process(std::vector > /*inputs*/, - std::shared_ptr /*output*/, + //std::shared_ptr /*output*/, + std::string /*port_name*/, cudaStream_t /*stream*/, const std::map& /*args*/){ return {ral::execution::task_status::SUCCESS, std::string(), std::vector< std::unique_ptr > ()}; } diff --git a/engine/src/execution_graph/logic_controllers/taskflow/port.h b/engine/src/execution_graph/logic_controllers/taskflow/port.h index fcc2194f3..e017e7177 100644 --- a/engine/src/execution_graph/logic_controllers/taskflow/port.h +++ b/engine/src/execution_graph/logic_controllers/taskflow/port.h @@ -4,44 +4,12 @@ #include #include #include "execution_graph/logic_controllers/CacheMachine.h" +#include "bmr/MemoryMonitor.h" namespace ral { namespace cache { enum kstatus { stop, proceed }; -//static const std::uint32_t MAX_SYSTEM_SIGNAL(0xfff); -//enum signal : std::uint32_t { none = 0, quit, term, eof = MAX_SYSTEM_SIGNAL }; - -/** some helper structs for recursive port adding **/ -template -struct port_helper {}; - -/** stop recursion **/ -template -struct port_helper { - static void register_port(PORT & /*port*/) { return; } -}; - -class port; - -/** continue recursion **/ -template -struct port_helper { - static void register_port(PORT & port, PORTNAME && portname, PORTNAMES &&... portnames) { - port.register_port(portname); - port_helper::register_port(port, std::forward(portnames)...); - return; - } -}; - -/** kicks off recursion for adding ports **/ -template -static void kick_port_helper(PORT & port, PORTNAMES &&... ports) { - port_helper::register_port(port, std::forward(ports)...); - return; -} - - class kernel; /** @@ -54,9 +22,9 @@ class port { virtual ~port() = default; - template - void add_port(PORTNAMES &&... ports) { - kick_port_helper((*this), std::forward(ports)...); + port& add_port(std::string port_name) { + register_port(port_name); + return *this; } size_t count() const { return cache_machines_.size(); } @@ -83,10 +51,26 @@ class port { uint64_t get_num_rows_added(const std::string & port_name); + template + bool addToCache(const std::string & port_name, Args&&... args){ + this->get_cache(port_name)->addToCache(std::forward(args)...); + } -public: + template + bool addCacheData(const std::string & port_name, Args&&... args){ + this->get_cache(port_name)->addCacheData(std::forward(args)...); + } + + template + bool addHostFrameToCache(const std::string & port_name, Args&&... args){ + this->get_cache(port_name)->addHostFrameToCache(std::forward(args)...); + } + + friend class ral::MemoryMonitor; + +private: kernel * kernel_; - std::map> cache_machines_; + std::map> cache_machines_; //port_name,cache_machines }; } // namespace cache