Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PAUSED] Port, caches refactor #1489

Open
wants to merge 5 commits into
base: branch-21.06
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


## Improvements
- #1489 Refactoring port and caches



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ ComputeAggregateKernel::ComputeAggregateKernel(std::size_t kernel_id, const std:
}

ral::execution::task_result ComputeAggregateKernel::do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
std::string port_name,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& /*args*/) {

try{
Expand All @@ -30,7 +30,7 @@ ral::execution::task_result ComputeAggregateKernel::do_process(std::vector< std:
columns = ral::operators::compute_aggregations_with_groupby(
input->toBlazingTableView(), aggregation_input_expressions, this->aggregation_types, aggregation_column_assigned_aliases, group_column_indices);
}
output->addToCache(std::move(columns));
this->output_.addToCache(port_name, std::move(columns));
}catch(const rmm::bad_alloc& e){
return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)};
}catch(const std::exception& e){
Expand All @@ -55,7 +55,7 @@ kstatus ComputeAggregateKernel::run() {

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
this->output_cache(),
"", //default port_name
this);

cache_data = this->input_cache()->pullCacheData();
Expand Down Expand Up @@ -122,7 +122,7 @@ DistributeAggregateKernel::DistributeAggregateKernel(std::size_t kernel_id, cons
}

ral::execution::task_result DistributeAggregateKernel::do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
std::string port_name,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& /*args*/) {
auto & input = inputs[0];

Expand All @@ -135,7 +135,7 @@ ral::execution::task_result DistributeAggregateKernel::do_process(std::vector< s
if (group_column_indices.size() == 0) {
try{
if(this->context->isMasterNode(self_node)) {
bool added = this->output_.get_cache()->addToCache(std::move(input),"",false);
bool added = this->output_.addToCache("", std::move(input), "", false);
if (added) {
increment_node_count(self_node.id());
}
Expand Down Expand Up @@ -186,7 +186,6 @@ ral::execution::task_result DistributeAggregateKernel::do_process(std::vector< s
}

scatter(partitions,
output.get(),
"", //message_id_prefix
"" //cache_id
);
Expand Down Expand Up @@ -219,7 +218,7 @@ kstatus DistributeAggregateKernel::run() {

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
this->output_cache(),
"", //default port_name
this);

cache_data = this->input_cache()->pullCacheData();
Expand Down Expand Up @@ -275,7 +274,7 @@ MergeAggregateKernel::MergeAggregateKernel(std::size_t kernel_id, const std::str
}

ral::execution::task_result MergeAggregateKernel::do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
std::string port_name,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& /*args*/) {
try{

Expand Down Expand Up @@ -336,7 +335,7 @@ ral::execution::task_result MergeAggregateKernel::do_process(std::vector< std::u
auto log_output_num_rows = columns->num_rows();
auto log_output_num_bytes = columns->sizeInBytes();

output->addToCache(std::move(columns));
this->output_.addToCache(port_name, std::move(columns));
columns = nullptr;
}catch(const rmm::bad_alloc& e){
return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)};
Expand Down Expand Up @@ -367,7 +366,7 @@ kstatus MergeAggregateKernel::run() {

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
this->output_cache(),
"", //default port_name
this);

if(logger){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ComputeAggregateKernel : public kernel {
std::string kernel_name() { return "ComputeAggregate";}

ral::execution::task_result do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
std::string port_name,
cudaStream_t stream, const std::map<std::string, std::string>& args) override;

virtual kstatus run();
Expand All @@ -40,7 +40,7 @@ class DistributeAggregateKernel : public distributing_kernel {
std::string kernel_name() { return "DistributeAggregate";}

ral::execution::task_result do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
std::string port_name,
cudaStream_t stream, const std::map<std::string, std::string>& args) override;

virtual kstatus run();
Expand All @@ -60,7 +60,7 @@ class MergeAggregateKernel : public kernel {
std::string kernel_name() { return "MergeAggregate";}

ral::execution::task_result do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
std::string port_name,
cudaStream_t stream, const std::map<std::string, std::string>& args) override;

virtual kstatus run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ void split_inequality_join_into_join_and_filter(const std::string & join_stateme
PartwiseJoin::PartwiseJoin(std::size_t kernel_id, const std::string & queryString, std::shared_ptr<Context> context, std::shared_ptr<ral::cache::graph> query_graph)
: kernel{kernel_id, queryString, context, kernel_type::PartwiseJoinKernel} {
this->query_graph = query_graph;
this->input_.add_port("input_a", "input_b");
this->input_.add_port("input_a").add_port("input_b");

this->max_left_ind = -1;
this->max_right_ind = -1;
Expand Down Expand Up @@ -405,7 +405,7 @@ std::unique_ptr<ral::frame::BlazingTable> PartwiseJoin::join_set(
}

ral::execution::task_result PartwiseJoin::do_process(std::vector<std::unique_ptr<ral::frame::BlazingTable>> inputs,
std::shared_ptr<ral::cache::CacheMachine> /*output*/,
std::string /*output*/,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& args) {
CodeTimer eventTimer;

Expand Down Expand Up @@ -543,7 +543,7 @@ kstatus PartwiseJoin::run() {

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
this->output_cache(),
"", //default port_name
this,
{{"left_idx", std::to_string(left_ind)}, {"right_idx", std::to_string(right_ind)}});

Expand Down Expand Up @@ -600,8 +600,8 @@ JoinPartitionKernel::JoinPartitionKernel(std::size_t kernel_id, const std::strin
this->query_graph = query_graph;
set_number_of_message_trackers(2); //default for left and right partitions

this->input_.add_port("input_a", "input_b");
this->output_.add_port("output_a", "output_b");
this->input_.add_port("input_a").add_port("input_b");
this->output_.add_port("output_a").add_port("output_b");

std::tie(this->expression, this->condition, this->filter_statement, this->join_type) = parseExpressionToGetTypeAndCondition(this->expression);
}
Expand Down Expand Up @@ -835,7 +835,7 @@ void JoinPartitionKernel::perform_standard_hash_partitioning(

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
this->output_cache("output_a"),
"output_a",
this,
{{"operation_type", "hash_partition"}, {"side", "left"}});

Expand All @@ -850,7 +850,7 @@ void JoinPartitionKernel::perform_standard_hash_partitioning(

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
this->output_cache("output_b"),
"output_b",
this,
{{"operation_type", "hash_partition"}, {"side", "right"}});

Expand Down Expand Up @@ -937,7 +937,7 @@ void JoinPartitionKernel::small_table_scatter_distribution(std::unique_ptr<ral::

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
this->output_cache(small_output_cache_name),
small_output_cache_name,
this,
{{"operation_type", "small_table_scatter"}});

Expand Down Expand Up @@ -1001,7 +1001,7 @@ void JoinPartitionKernel::small_table_scatter_distribution(std::unique_ptr<ral::
}

ral::execution::task_result JoinPartitionKernel::do_process(std::vector<std::unique_ptr<ral::frame::BlazingTable>> inputs,
std::shared_ptr<ral::cache::CacheMachine> /*output*/,
std::string /*output*/,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& args) {
bool input_consumed = false;
try{
Expand Down Expand Up @@ -1069,9 +1069,8 @@ ral::execution::task_result JoinPartitionKernel::do_process(std::vector<std::uni
}

scatter(partitions,
this->output_.get_cache(cache_id).get(),
"", //message_id_prefix
cache_id, //cache_id
cache_id, //cache_id = port_name
table_idx //message_tracker_idx
);
} else { // not an option! error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class PartwiseJoin : public kernel {
const ral::frame::BlazingTableView & table_right);

ral::execution::task_result do_process(std::vector<std::unique_ptr<ral::frame::BlazingTable>> inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
std::string port_name,
cudaStream_t stream, const std::map<std::string, std::string>& args) override;

kstatus run() override;
Expand Down Expand Up @@ -97,7 +97,7 @@ class JoinPartitionKernel : public distributing_kernel {
JoinPartitionKernel(std::size_t kernel_id, const std::string & queryString, std::shared_ptr<Context> context, std::shared_ptr<ral::cache::graph> query_graph);

ral::execution::task_result do_process(std::vector<std::unique_ptr<ral::frame::BlazingTable>> inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
std::string port_name,
cudaStream_t stream, const std::map<std::string, std::string>& args) override;

kstatus run() override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace batch {
PartitionSingleNodeKernel::PartitionSingleNodeKernel(std::size_t kernel_id, const std::string & queryString, std::shared_ptr<Context> context, std::shared_ptr<ral::cache::graph> query_graph)
: kernel{kernel_id, queryString, context, kernel_type::PartitionSingleNodeKernel} {
this->query_graph = query_graph;
this->input_.add_port("input_a", "input_b");
this->input_.add_port("input_a").add_port("input_b");

if (is_window_function(this->expression)) {
if (window_expression_contains_partition_by(this->expression)){
Expand All @@ -26,7 +26,7 @@ PartitionSingleNodeKernel::PartitionSingleNodeKernel(std::size_t kernel_id, cons
}

ral::execution::task_result PartitionSingleNodeKernel::do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> /*output*/,
std::string /*output*/,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& /*args*/) {

try{
Expand Down Expand Up @@ -65,7 +65,7 @@ kstatus PartitionSingleNodeKernel::run() {

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
nullptr,
"",//nullptr?
this);
}
}
Expand Down Expand Up @@ -111,7 +111,7 @@ SortAndSampleKernel::SortAndSampleKernel(std::size_t kernel_id, const std::strin
{
this->query_graph = query_graph;
set_number_of_message_trackers(2); //default
this->output_.add_port("output_a", "output_b");
this->output_.add_port("output_a").add_port("output_b");
get_samples = true;
already_computed_partition_plan = false;
}
Expand Down Expand Up @@ -144,7 +144,7 @@ void SortAndSampleKernel::make_partition_plan_task(){

ral::execution::executor::get_instance()->add_task(
std::move(sampleCacheDatas),
this->output_cache("output_b"),
"output_b",
this,
{{"operation_type", "compute_partition_plan"}});

Expand Down Expand Up @@ -225,7 +225,7 @@ bool SortAndSampleKernel::all_node_samples_are_available(){
}

ral::execution::task_result SortAndSampleKernel::do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
std::string port_name,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& args) {

try{
Expand Down Expand Up @@ -265,7 +265,7 @@ ral::execution::task_result SortAndSampleKernel::do_process(std::vector< std::un
auto num_bytes = sortedTable->sizeInBytes();
}

output->addToCache(std::move(sortedTable), "output_a");
this->output_.addToCache(port_name, std::move(sortedTable), "output_a");
}
else if (operation_type == "compute_partition_plan") {
compute_partition_plan(std::move(inputs));
Expand Down Expand Up @@ -294,7 +294,7 @@ kstatus SortAndSampleKernel::run() {

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
this->output_cache("output_a"),
"output_a",
this,
{{"operation_type", "ordering_and_get_samples"}});

Expand Down Expand Up @@ -347,7 +347,7 @@ kstatus SortAndSampleKernel::run() {
PartitionKernel::PartitionKernel(std::size_t kernel_id, const std::string & queryString, std::shared_ptr<Context> context, std::shared_ptr<ral::cache::graph> query_graph)
: distributing_kernel{kernel_id, queryString, context, kernel_type::PartitionKernel} {
this->query_graph = query_graph;
this->input_.add_port("input_a", "input_b");
this->input_.add_port("input_a").add_port("input_b");

std::map<std::string, std::string> config_options = context->getConfigOptions();
int max_num_order_by_partitions_per_node = 8;
Expand All @@ -369,7 +369,7 @@ PartitionKernel::PartitionKernel(std::size_t kernel_id, const std::string & quer
}

ral::execution::task_result PartitionKernel::do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> /*output*/,
std::string /*output*/,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& /*args*/) {
try{
auto & input = inputs[0];
Expand Down Expand Up @@ -421,7 +421,7 @@ kstatus PartitionKernel::run() {

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
nullptr,
"",//nullptr?
this);

cache_data = this->input_.get_cache("input_a")->pullCacheData();
Expand Down Expand Up @@ -485,21 +485,21 @@ MergeStreamKernel::MergeStreamKernel(std::size_t kernel_id, const std::string &
}

ral::execution::task_result MergeStreamKernel::do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
std::string port_name,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& /*args*/) {

try{
if (inputs.empty()) {
// no op
} else if(inputs.size() == 1) {
output->addToCache(std::move(inputs[0]));
this->output_.addToCache(port_name, std::move(inputs[0]));
} else {
std::vector< ral::frame::BlazingTableView > tableViewsToConcat;
for (std::size_t i = 0; i < inputs.size(); i++){
tableViewsToConcat.emplace_back(inputs[i]->toBlazingTableView());
}
auto output_merge = ral::operators::merge(tableViewsToConcat, this->expression);
output->addToCache(std::move(output_merge));
this->output_.addToCache(port_name, std::move(output_merge));
}
}catch(const rmm::bad_alloc& e){
return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)};
Expand Down Expand Up @@ -530,7 +530,7 @@ kstatus MergeStreamKernel::run() {

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
this->output_cache(),
"", //default port_name
this);

batch_count++;
Expand Down Expand Up @@ -593,13 +593,13 @@ LimitKernel::LimitKernel(std::size_t kernel_id, const std::string & queryString,
}

ral::execution::task_result LimitKernel::do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
std::string port_name,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& /*args*/) {
try{
CodeTimer eventTimer(false);
auto & input = inputs[0];
if (rows_limit<0) {
output->addToCache(std::move(input));
this->output_.addToCache(port_name, std::move(input));
} else {
auto log_input_num_rows = input->num_rows();
auto log_input_num_bytes = input->sizeInBytes();
Expand All @@ -615,9 +615,9 @@ ral::execution::task_result LimitKernel::do_process(std::vector< std::unique_ptr
auto log_output_num_bytes = output_is_just_input ? input->sizeInBytes() : limited_input->sizeInBytes();

if (output_is_just_input)
output->addToCache(std::move(input));
this->output_.addToCache(port_name, std::move(input));
else
output->addToCache(std::move(limited_input));
this->output_.addToCache(port_name, std::move(limited_input));
}
}catch(const rmm::bad_alloc& e){
return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)};
Expand Down Expand Up @@ -687,7 +687,7 @@ kstatus LimitKernel::run() {

ral::execution::executor::get_instance()->add_task(
std::move(inputs),
this->output_cache(),
"", //default port_name
this);

if (rows_limit == 0){
Expand Down
Loading