Skip to content

Commit

Permalink
Add trace task runner
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Dec 29, 2024
1 parent 20eb8ec commit 6d0fbe2
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 39 deletions.
3 changes: 2 additions & 1 deletion velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ add_library(
PartitionedOutputReplayer.cpp
TableScanReplayer.cpp
TableWriterReplayer.cpp
TraceReplayRunner.cpp)
TraceReplayRunner.cpp
TraceTaskRunner.cpp)

target_link_libraries(
velox_query_trace_replayer_base
Expand Down
49 changes: 25 additions & 24 deletions velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/tool/trace/OperatorReplayerBase.h"
#include "velox/tool/trace/TraceTaskRunner.h"

using namespace facebook::velox;

Expand Down Expand Up @@ -72,36 +73,18 @@ OperatorReplayerBase::OperatorReplayerBase(
queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false";
}

RowVectorPtr OperatorReplayerBase::run() {
std::shared_ptr<exec::Task> task;
const auto restoredPlanNode = createPlan();
auto queryPool = memory::memoryManager()->addRootPool(
"OperatorReplayerBase", queryCapacity_);
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>
connectorConfigs;
for (auto& [connectorId, configs] : connectorConfigs_) {
connectorConfigs.emplace(
connectorId, std::make_shared<config::ConfigBase>(std::move(configs)));
}
auto queryCtx = core::QueryCtx::create(
executor_,
core::QueryConfig{queryConfigs_},
std::move(connectorConfigs),
nullptr,
std::move(queryPool),
executor_);

RowVectorPtr OperatorReplayerBase::run(bool copyResults) {
auto queryCtx = createQueryCtx();
std::shared_ptr<exec::test::TempDirectoryPath> spillDirectory;
if (queryCtx->queryConfig().spillEnabled()) {
spillDirectory = exec::test::TempDirectoryPath::create();
}

const auto result =
exec::test::AssertQueryBuilder(restoredPlanNode)
.maxDrivers(driverIds_.size())
.queryCtx(std::move(queryCtx))
TraceTaskRunner traceTaskRunner(createPlan(), std::move(queryCtx));
auto [task, result] =
traceTaskRunner.maxDrivers(driverIds_.size())
.spillDirectory(spillDirectory ? spillDirectory->getPath() : "")
.copyResults(memory::MemoryManager::getInstance()->tracePool(), task);
.run(copyResults);
printStats(task);
return result;
}
Expand Down Expand Up @@ -129,6 +112,24 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() {
.planNode();
}

std::shared_ptr<core::QueryCtx> OperatorReplayerBase::createQueryCtx() {
auto queryPool = memory::memoryManager()->addRootPool(
fmt::format("{}_replayer", operatorType_), queryCapacity_);
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>
connectorConfigs;
for (auto& [connectorId, configs] : connectorConfigs_) {
connectorConfigs.emplace(
connectorId, std::make_shared<config::ConfigBase>(std::move(configs)));
}
return core::QueryCtx::create(
executor_,
core::QueryConfig{queryConfigs_},
std::move(connectorConfigs),
nullptr,
std::move(queryPool),
executor_);
}

std::function<core::PlanNodePtr(std::string, core::PlanNodePtr)>
OperatorReplayerBase::replayNodeFactory(const core::PlanNode* node) const {
return [=](const core::PlanNodeId& nodeId,
Expand Down
5 changes: 4 additions & 1 deletion velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"
#include "velox/core/QueryCtx.h"
#include "velox/parse/PlanNodeIdGenerator.h"

namespace facebook::velox::exec {
Expand All @@ -44,7 +45,7 @@ class OperatorReplayerBase {
OperatorReplayerBase& operator=(OperatorReplayerBase&& other) noexcept =
delete;

virtual RowVectorPtr run();
virtual RowVectorPtr run(bool copyResults = true);

protected:
virtual core::PlanNodePtr createPlanNode(
Expand All @@ -54,6 +55,8 @@ class OperatorReplayerBase {

core::PlanNodePtr createPlan();

std::shared_ptr<core::QueryCtx> createQueryCtx();

const std::string queryId_;
const std::string taskId_;
const std::string nodeId_;
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/PartitionedOutputReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ PartitionedOutputReplayer::PartitionedOutputReplayer(
std::make_shared<folly::NamedThreadFactory>("Consumer"));
}

RowVectorPtr PartitionedOutputReplayer::run() {
RowVectorPtr PartitionedOutputReplayer::run(bool /*copyResult*/) {
const auto task = Task::create(
"local://partitioned-output-replayer",
core::PlanFragment{createPlan()},
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/PartitionedOutputReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase {
folly::Executor* executor,
const ConsumerCallBack& consumerCb = [](auto partition, auto page) {});

RowVectorPtr run() override;
RowVectorPtr run(bool copyResult = false) override;

private:
core::PlanNodePtr createPlanNode(
Expand Down
16 changes: 7 additions & 9 deletions velox/tool/trace/TableScanReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

#include "velox/tool/trace/TableScanReplayer.h"

#include "TraceTaskRunner.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/OperatorTraceReader.h"
#include "velox/exec/TraceUtil.h"
Expand All @@ -27,16 +29,12 @@ using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {

RowVectorPtr TableScanReplayer::run() {
std::shared_ptr<exec::Task> task;
RowVectorPtr TableScanReplayer::run(bool copyResults) {
const auto plan = createPlan();
const auto result =
exec::test::AssertQueryBuilder(plan)
.maxDrivers(driverIds_.size())
.configs(queryConfigs_)
.connectorSessionProperties(connectorConfigs_)
.splits(getSplits())
.copyResults(memory::MemoryManager::getInstance()->tracePool(), task);
TraceTaskRunner traceTaskRunner(createPlan(), createQueryCtx());
auto [task, result] = traceTaskRunner.maxDrivers(driverIds_.size())
.splits(replayPlanNodeId_, getSplits())
.run(copyResults);
printStats(task);
return result;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/TableScanReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TableScanReplayer final : public OperatorReplayerBase {
queryCapacity,
executor) {}

RowVectorPtr run() override;
RowVectorPtr run(bool copyResults = true) override;

private:
core::PlanNodePtr createPlanNode(
Expand Down
3 changes: 2 additions & 1 deletion velox/tool/trace/TraceReplayRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ DEFINE_uint64(
query_memory_capacity_gb,
0,
"Specify the query memory capacity limit in GB. If it is zero, then there is no limit.");
DEFINE_bool(copy_results, false, "Copy and return the replaying results.");

namespace facebook::velox::tool::trace {
namespace {
Expand Down Expand Up @@ -395,6 +396,6 @@ void TraceReplayRunner::run() {
return;
}
VELOX_USER_CHECK(!FLAGS_task_id.empty(), "--task_id must be provided");
createReplayer()->run();
createReplayer()->run(FLAGS_copy_results);
}
} // namespace facebook::velox::tool::trace
1 change: 1 addition & 0 deletions velox/tool/trace/TraceReplayRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ DECLARE_int32(shuffle_serialization_format);
DECLARE_uint64(query_memory_capacity_gb);
DECLARE_double(driver_cpu_executor_hw_multiplier);
DECLARE_string(memory_arbitrator_type);
DECLARE_bool(copy_results);

namespace facebook::velox::tool::trace {

Expand Down
78 changes: 78 additions & 0 deletions velox/tool/trace/TraceTaskRunner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/TraceTaskRunner.h"

namespace facebook::velox::tool::trace {

std::pair<std::shared_ptr<exec::Task>, RowVectorPtr> TraceTaskRunner::run(
bool copyResults) {
auto cursor = exec::test::TaskCursor::create(cursorParams_);
std::vector<RowVectorPtr> results;
auto* task = cursor->task().get();
addSplits(task);

while (cursor->moveNext()) {
results.push_back(cursor->current());
}

task->taskCompletionFuture().wait();

if (!copyResults) {
return {cursor->task(), nullptr};
}

auto totalCount = 0;
for (const auto& result : results) {
totalCount += result->size();
}
auto copy = BaseVector::create<RowVector>(
results[0]->type(), totalCount, memory::traceMemoryPool());
auto copyCount = 0;
for (const auto& result : results) {
copy->copy(result.get(), copyCount, 0, result->size());
copyCount += result->size();
}
return {cursor->task(), copy};
}

TraceTaskRunner& TraceTaskRunner::maxDrivers(int32_t maxDrivers) {
cursorParams_.maxDrivers = maxDrivers;
return *this;
}

TraceTaskRunner& TraceTaskRunner::spillDirectory(const std::string& dir) {
cursorParams_.spillDirectory = dir;
return *this;
}

TraceTaskRunner& TraceTaskRunner::splits(
const core::PlanNodeId& planNodeId,
std::vector<exec::Split> splits) {
splits_[planNodeId] = std::move(splits);
return *this;
}

void TraceTaskRunner::addSplits(exec::Task* task) {
for (auto& [nodeId, nodeSplits] : splits_) {
for (auto& split : nodeSplits) {
task->addSplit(nodeId, std::move(split));
}
task->noMoreSplits(nodeId);
}
}

} // namespace facebook::velox::tool::trace
59 changes: 59 additions & 0 deletions velox/tool/trace/TraceTaskRunner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <folly/executors/IOThreadPoolExecutor.h>
#include "velox/exec/Cursor.h"

namespace facebook::velox::tool::trace {

class TraceTaskRunner {
public:
explicit TraceTaskRunner(
core::PlanNodePtr plan,
std::shared_ptr<core::QueryCtx> queryCtx) {
cursorParams_.planNode = std::move(plan);
cursorParams_.queryCtx = std::move(queryCtx);
}

/// Run the replaying task. Return the copied results if 'copyResults' is
/// true or return null.
std::pair<std::shared_ptr<exec::Task>, RowVectorPtr> run(
bool copyResults = false);

/// Max number of drivers. Default is 1.
TraceTaskRunner& maxDrivers(int32_t maxDrivers);

/// Spilling directory, if not empty, then the task's spilling directory would
/// be built from it.
TraceTaskRunner& spillDirectory(const std::string& dir);

/// Splits of this task.
TraceTaskRunner& splits(
const core::PlanNodeId& planNodeId,
std::vector<exec::Split> splits);

private:
void addSplits(exec::Task* task);

inline static std::atomic<uint64_t> cursorQueryId_ = 0;

exec::test::CursorParameters cursorParams_;
std::unordered_map<core::PlanNodeId, std::vector<exec::Split>> splits_;
bool noMoreSplits_ = false;
};
} // namespace facebook::velox::tool::trace

0 comments on commit 6d0fbe2

Please sign in to comment.