Skip to content

Commit

Permalink
feat: Add query trace TaskRunner (facebookincubator#11927)
Browse files Browse the repository at this point in the history
Summary:
Add a query trace TaskRunner to execute the replay task replacing `AssertQueryBuilder`.
Use a flag named `copy_results` to control whether to copy the replay result (used in UT).

Pull Request resolved: facebookincubator#11927

Reviewed By: tanjialiang

Differential Revision: D67719866

Pulled By: xiaoxmeng

fbshipit-source-id: 04735e9eb4da6516998e4f63f6d23d0605e681c8
  • Loading branch information
duanmeng authored and facebook-github-bot committed Dec 30, 2024
1 parent 7cbfce0 commit 7fdb321
Show file tree
Hide file tree
Showing 14 changed files with 280 additions and 42 deletions.
4 changes: 4 additions & 0 deletions velox/docs/develop/debugging/tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,7 @@ Here is a full list of supported command line arguments.
* ``--shuffle_serialization_format``: Specify the shuffle serialization format.
* ``--table_writer_output_dir``: Specify the output directory of TableWriter.
* ``--hiveConnectorExecutorHwMultiplier``: Hardware multiplier for hive connector.
* ``--driver_cpu_executor_hw_multiplier``: Hardware multipler for driver cpu executor.
* ``--memory_arbitrator_type``: Specify the memory arbitrator type.
* ``--query_memory_capacity_gb``: Specify the query memory capacity limit in GB. If it is zero, then there is no limit.
* ``--copy_results``: If true, copy the replaying result.
3 changes: 2 additions & 1 deletion velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ add_library(
PartitionedOutputReplayer.cpp
TableScanReplayer.cpp
TableWriterReplayer.cpp
TraceReplayRunner.cpp)
TraceReplayRunner.cpp
TraceReplayTaskRunner.cpp)

target_link_libraries(
velox_query_trace_replayer_base
Expand Down
49 changes: 25 additions & 24 deletions velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/tool/trace/OperatorReplayerBase.h"
#include "velox/tool/trace/TraceReplayTaskRunner.h"

using namespace facebook::velox;

Expand Down Expand Up @@ -72,36 +73,18 @@ OperatorReplayerBase::OperatorReplayerBase(
queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false";
}

RowVectorPtr OperatorReplayerBase::run() {
std::shared_ptr<exec::Task> task;
const auto restoredPlanNode = createPlan();
auto queryPool = memory::memoryManager()->addRootPool(
"OperatorReplayerBase", queryCapacity_);
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>
connectorConfigs;
for (auto& [connectorId, configs] : connectorConfigs_) {
connectorConfigs.emplace(
connectorId, std::make_shared<config::ConfigBase>(std::move(configs)));
}
auto queryCtx = core::QueryCtx::create(
executor_,
core::QueryConfig{queryConfigs_},
std::move(connectorConfigs),
nullptr,
std::move(queryPool),
executor_);

RowVectorPtr OperatorReplayerBase::run(bool copyResults) {
auto queryCtx = createQueryCtx();
std::shared_ptr<exec::test::TempDirectoryPath> spillDirectory;
if (queryCtx->queryConfig().spillEnabled()) {
spillDirectory = exec::test::TempDirectoryPath::create();
}

const auto result =
exec::test::AssertQueryBuilder(restoredPlanNode)
.maxDrivers(driverIds_.size())
.queryCtx(std::move(queryCtx))
TraceReplayTaskRunner traceTaskRunner(createPlan(), std::move(queryCtx));
auto [task, result] =
traceTaskRunner.maxDrivers(driverIds_.size())
.spillDirectory(spillDirectory ? spillDirectory->getPath() : "")
.copyResults(memory::MemoryManager::getInstance()->tracePool(), task);
.run(copyResults);
printStats(task);
return result;
}
Expand Down Expand Up @@ -129,6 +112,24 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() {
.planNode();
}

std::shared_ptr<core::QueryCtx> OperatorReplayerBase::createQueryCtx() {
auto queryPool = memory::memoryManager()->addRootPool(
fmt::format("{}_replayer", operatorType_), queryCapacity_);
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>
connectorConfigs;
for (auto& [connectorId, configs] : connectorConfigs_) {
connectorConfigs.emplace(
connectorId, std::make_shared<config::ConfigBase>(std::move(configs)));
}
return core::QueryCtx::create(
executor_,
core::QueryConfig{queryConfigs_},
std::move(connectorConfigs),
nullptr,
std::move(queryPool),
executor_);
}

std::function<core::PlanNodePtr(std::string, core::PlanNodePtr)>
OperatorReplayerBase::replayNodeFactory(const core::PlanNode* node) const {
return [=](const core::PlanNodeId& nodeId,
Expand Down
5 changes: 4 additions & 1 deletion velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"
#include "velox/core/QueryCtx.h"
#include "velox/parse/PlanNodeIdGenerator.h"

namespace facebook::velox::exec {
Expand All @@ -44,7 +45,7 @@ class OperatorReplayerBase {
OperatorReplayerBase& operator=(OperatorReplayerBase&& other) noexcept =
delete;

virtual RowVectorPtr run();
virtual RowVectorPtr run(bool copyResults = true);

protected:
virtual core::PlanNodePtr createPlanNode(
Expand All @@ -54,6 +55,8 @@ class OperatorReplayerBase {

core::PlanNodePtr createPlan();

std::shared_ptr<core::QueryCtx> createQueryCtx();

const std::string queryId_;
const std::string taskId_;
const std::string nodeId_;
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/PartitionedOutputReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ PartitionedOutputReplayer::PartitionedOutputReplayer(
std::make_shared<folly::NamedThreadFactory>("Consumer"));
}

RowVectorPtr PartitionedOutputReplayer::run() {
RowVectorPtr PartitionedOutputReplayer::run(bool /*unused*/) {
const auto task = Task::create(
"local://partitioned-output-replayer",
core::PlanFragment{createPlan()},
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/PartitionedOutputReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase {
folly::Executor* executor,
const ConsumerCallBack& consumerCb = [](auto partition, auto page) {});

RowVectorPtr run() override;
RowVectorPtr run(bool /*unused*/) override;

private:
core::PlanNodePtr createPlanNode(
Expand Down
17 changes: 7 additions & 10 deletions velox/tool/trace/TableScanReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,25 @@
*/

#include "velox/tool/trace/TableScanReplayer.h"

#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/OperatorTraceReader.h"
#include "velox/exec/TraceUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/tool/trace/TraceReplayTaskRunner.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {

RowVectorPtr TableScanReplayer::run() {
std::shared_ptr<exec::Task> task;
const auto plan = createPlan();
const auto result =
exec::test::AssertQueryBuilder(plan)
.maxDrivers(driverIds_.size())
.configs(queryConfigs_)
.connectorSessionProperties(connectorConfigs_)
.splits(getSplits())
.copyResults(memory::MemoryManager::getInstance()->tracePool(), task);
RowVectorPtr TableScanReplayer::run(bool copyResults) {
TraceReplayTaskRunner traceTaskRunner(createPlan(), createQueryCtx());
auto [task, result] = traceTaskRunner.maxDrivers(driverIds_.size())
.splits(replayPlanNodeId_, getSplits())
.run(copyResults);
printStats(task);
return result;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/TableScanReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TableScanReplayer final : public OperatorReplayerBase {
queryCapacity,
executor) {}

RowVectorPtr run() override;
RowVectorPtr run(bool copyResults = true) override;

private:
core::PlanNodePtr createPlanNode(
Expand Down
3 changes: 2 additions & 1 deletion velox/tool/trace/TraceReplayRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ DEFINE_uint64(
query_memory_capacity_gb,
0,
"Specify the query memory capacity limit in GB. If it is zero, then there is no limit.");
DEFINE_bool(copy_results, false, "Copy the replaying results.");

namespace facebook::velox::tool::trace {
namespace {
Expand Down Expand Up @@ -395,6 +396,6 @@ void TraceReplayRunner::run() {
return;
}
VELOX_USER_CHECK(!FLAGS_task_id.empty(), "--task_id must be provided");
createReplayer()->run();
createReplayer()->run(FLAGS_copy_results);
}
} // namespace facebook::velox::tool::trace
2 changes: 2 additions & 0 deletions velox/tool/trace/TraceReplayRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ DECLARE_string(node_id);
DECLARE_int32(driver_id);
DECLARE_string(driver_ids);
DECLARE_string(table_writer_output_dir);
DECLARE_double(hive_connector_executor_hw_multiplier);
DECLARE_int32(shuffle_serialization_format);
DECLARE_uint64(query_memory_capacity_gb);
DECLARE_double(driver_cpu_executor_hw_multiplier);
DECLARE_string(memory_arbitrator_type);
DECLARE_bool(copy_results);

namespace facebook::velox::tool::trace {

Expand Down
84 changes: 84 additions & 0 deletions velox/tool/trace/TraceReplayTaskRunner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/TraceReplayTaskRunner.h"

namespace facebook::velox::tool::trace {

std::pair<std::shared_ptr<exec::Task>, RowVectorPtr> TraceReplayTaskRunner::run(
bool copyResults) {
auto cursor = exec::test::TaskCursor::create(cursorParams_);
std::vector<RowVectorPtr> results;
auto* task = cursor->task().get();
addSplits(task);

while (cursor->moveNext()) {
results.push_back(cursor->current());
}

task->taskCompletionFuture().wait();

if (copyResults) {
return {cursor->task(), copy(results)};
}

return {cursor->task(), nullptr};
}

std::shared_ptr<RowVector> TraceReplayTaskRunner::copy(
const std::vector<RowVectorPtr>& results) {
auto totalRows = 0;
for (const auto& result : results) {
totalRows += result->size();
}
auto copyResult = BaseVector::create<RowVector>(
results[0]->type(), totalRows, memory::traceMemoryPool());
auto resultRowOffset = 0;
for (const auto& result : results) {
copyResult->copy(result.get(), resultRowOffset, 0, result->size());
resultRowOffset += result->size();
}
return copyResult;
}

TraceReplayTaskRunner& TraceReplayTaskRunner::maxDrivers(int32_t maxDrivers) {
cursorParams_.maxDrivers = maxDrivers;
return *this;
}

TraceReplayTaskRunner& TraceReplayTaskRunner::spillDirectory(
const std::string& dir) {
cursorParams_.spillDirectory = dir;
return *this;
}

TraceReplayTaskRunner& TraceReplayTaskRunner::splits(
const core::PlanNodeId& planNodeId,
std::vector<exec::Split> splits) {
splits_[planNodeId] = std::move(splits);
return *this;
}

void TraceReplayTaskRunner::addSplits(exec::Task* task) {
for (auto& [nodeId, nodeSplits] : splits_) {
for (auto& split : nodeSplits) {
task->addSplit(nodeId, std::move(split));
}
task->noMoreSplits(nodeId);
}
}

} // namespace facebook::velox::tool::trace
60 changes: 60 additions & 0 deletions velox/tool/trace/TraceReplayTaskRunner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <folly/executors/IOThreadPoolExecutor.h>
#include "velox/exec/Cursor.h"

namespace facebook::velox::tool::trace {

class TraceReplayTaskRunner {
public:
explicit TraceReplayTaskRunner(
core::PlanNodePtr plan,
std::shared_ptr<core::QueryCtx> queryCtx) {
cursorParams_.planNode = std::move(plan);
cursorParams_.queryCtx = std::move(queryCtx);
}

/// Run the replaying task. Return the copied results if 'copyResults' is
/// true or return null.
std::pair<std::shared_ptr<exec::Task>, RowVectorPtr> run(
bool copyResults = false);

/// Max number of drivers. Default is 1.
TraceReplayTaskRunner& maxDrivers(int32_t maxDrivers);

/// Spilling directory, if not empty, then the task's spilling directory would
/// be built from it.
TraceReplayTaskRunner& spillDirectory(const std::string& dir);

/// Splits of this task.
TraceReplayTaskRunner& splits(
const core::PlanNodeId& planNodeId,
std::vector<exec::Split> splits);

private:
void addSplits(exec::Task* task);

static std::shared_ptr<RowVector> copy(
const std::vector<RowVectorPtr>& results);

exec::test::CursorParameters cursorParams_;
std::unordered_map<core::PlanNodeId, std::vector<exec::Split>> splits_;
bool noMoreSplits_ = false;
};
} // namespace facebook::velox::tool::trace
Loading

0 comments on commit 7fdb321

Please sign in to comment.