From ce18de928024a6b575d6770a13b6efb4d11b2ab9 Mon Sep 17 00:00:00 2001 From: duanmeng Date: Sat, 21 Dec 2024 19:48:22 +0800 Subject: [PATCH] Add trace task runner --- velox/tool/trace/CMakeLists.txt | 3 +- velox/tool/trace/OperatorReplayerBase.cpp | 49 +++++------ velox/tool/trace/OperatorReplayerBase.h | 5 +- .../tool/trace/PartitionedOutputReplayer.cpp | 2 +- velox/tool/trace/PartitionedOutputReplayer.h | 2 +- velox/tool/trace/TableScanReplayer.cpp | 17 ++-- velox/tool/trace/TableScanReplayer.h | 2 +- velox/tool/trace/TraceReplayRunner.cpp | 3 +- velox/tool/trace/TraceReplayRunner.h | 1 + velox/tool/trace/TraceTaskRunner.cpp | 78 +++++++++++++++++ velox/tool/trace/TraceTaskRunner.h | 59 +++++++++++++ .../tool/trace/tests/HashJoinReplayerTest.cpp | 85 +++++++++++++++++++ 12 files changed, 266 insertions(+), 40 deletions(-) create mode 100644 velox/tool/trace/TraceTaskRunner.cpp create mode 100644 velox/tool/trace/TraceTaskRunner.h diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index d36ee8fab023..6bd9670bbac5 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -21,7 +21,8 @@ add_library( PartitionedOutputReplayer.cpp TableScanReplayer.cpp TableWriterReplayer.cpp - TraceReplayRunner.cpp) + TraceReplayRunner.cpp + TraceTaskRunner.cpp) target_link_libraries( velox_query_trace_replayer_base diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index 5bd8634eb392..1e0b77266784 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -26,6 +26,7 @@ #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/tool/trace/OperatorReplayerBase.h" +#include "velox/tool/trace/TraceTaskRunner.h" using namespace facebook::velox; @@ -72,36 +73,18 @@ OperatorReplayerBase::OperatorReplayerBase( queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false"; } -RowVectorPtr OperatorReplayerBase::run() { - std::shared_ptr task; - const auto restoredPlanNode = createPlan(); - auto queryPool = memory::memoryManager()->addRootPool( - "OperatorReplayerBase", queryCapacity_); - std::unordered_map> - connectorConfigs; - for (auto& [connectorId, configs] : connectorConfigs_) { - connectorConfigs.emplace( - connectorId, std::make_shared(std::move(configs))); - } - auto queryCtx = core::QueryCtx::create( - executor_, - core::QueryConfig{queryConfigs_}, - std::move(connectorConfigs), - nullptr, - std::move(queryPool), - executor_); - +RowVectorPtr OperatorReplayerBase::run(bool copyResults) { + auto queryCtx = createQueryCtx(); std::shared_ptr spillDirectory; if (queryCtx->queryConfig().spillEnabled()) { spillDirectory = exec::test::TempDirectoryPath::create(); } - const auto result = - exec::test::AssertQueryBuilder(restoredPlanNode) - .maxDrivers(driverIds_.size()) - .queryCtx(std::move(queryCtx)) + TraceTaskRunner traceTaskRunner(createPlan(), std::move(queryCtx)); + auto [task, result] = + traceTaskRunner.maxDrivers(driverIds_.size()) .spillDirectory(spillDirectory ? spillDirectory->getPath() : "") - .copyResults(memory::MemoryManager::getInstance()->tracePool(), task); + .run(copyResults); printStats(task); return result; } @@ -129,6 +112,24 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() { .planNode(); } +std::shared_ptr OperatorReplayerBase::createQueryCtx() { + auto queryPool = memory::memoryManager()->addRootPool( + fmt::format("{}_replayer", operatorType_), queryCapacity_); + std::unordered_map> + connectorConfigs; + for (auto& [connectorId, configs] : connectorConfigs_) { + connectorConfigs.emplace( + connectorId, std::make_shared(std::move(configs))); + } + return core::QueryCtx::create( + executor_, + core::QueryConfig{queryConfigs_}, + std::move(connectorConfigs), + nullptr, + std::move(queryPool), + executor_); +} + std::function OperatorReplayerBase::replayNodeFactory(const core::PlanNode* node) const { return [=](const core::PlanNodeId& nodeId, diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index 3cb36ead7b9e..e804e08d768b 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -18,6 +18,7 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" +#include "velox/core/QueryCtx.h" #include "velox/parse/PlanNodeIdGenerator.h" namespace facebook::velox::exec { @@ -44,7 +45,7 @@ class OperatorReplayerBase { OperatorReplayerBase& operator=(OperatorReplayerBase&& other) noexcept = delete; - virtual RowVectorPtr run(); + virtual RowVectorPtr run(bool copyResults = true); protected: virtual core::PlanNodePtr createPlanNode( @@ -54,6 +55,8 @@ class OperatorReplayerBase { core::PlanNodePtr createPlan(); + std::shared_ptr createQueryCtx(); + const std::string queryId_; const std::string taskId_; const std::string nodeId_; diff --git a/velox/tool/trace/PartitionedOutputReplayer.cpp b/velox/tool/trace/PartitionedOutputReplayer.cpp index 678a7e0598aa..ff14324fdb73 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.cpp +++ b/velox/tool/trace/PartitionedOutputReplayer.cpp @@ -138,7 +138,7 @@ PartitionedOutputReplayer::PartitionedOutputReplayer( std::make_shared("Consumer")); } -RowVectorPtr PartitionedOutputReplayer::run() { +RowVectorPtr PartitionedOutputReplayer::run(bool /*copyResult*/) { const auto task = Task::create( "local://partitioned-output-replayer", core::PlanFragment{createPlan()}, diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h index ea2db9a4c1be..6e17221c495e 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.h +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -52,7 +52,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase { folly::Executor* executor, const ConsumerCallBack& consumerCb = [](auto partition, auto page) {}); - RowVectorPtr run() override; + RowVectorPtr run(bool copyResult = false) override; private: core::PlanNodePtr createPlanNode( diff --git a/velox/tool/trace/TableScanReplayer.cpp b/velox/tool/trace/TableScanReplayer.cpp index 0f9805a03c38..659f4b9662d7 100644 --- a/velox/tool/trace/TableScanReplayer.cpp +++ b/velox/tool/trace/TableScanReplayer.cpp @@ -15,6 +15,8 @@ */ #include "velox/tool/trace/TableScanReplayer.h" + +#include "TraceTaskRunner.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/exec/OperatorTraceReader.h" #include "velox/exec/TraceUtil.h" @@ -27,16 +29,11 @@ using namespace facebook::velox::exec::test; namespace facebook::velox::tool::trace { -RowVectorPtr TableScanReplayer::run() { - std::shared_ptr task; - const auto plan = createPlan(); - const auto result = - exec::test::AssertQueryBuilder(plan) - .maxDrivers(driverIds_.size()) - .configs(queryConfigs_) - .connectorSessionProperties(connectorConfigs_) - .splits(getSplits()) - .copyResults(memory::MemoryManager::getInstance()->tracePool(), task); +RowVectorPtr TableScanReplayer::run(bool copyResults) { + TraceTaskRunner traceTaskRunner(createPlan(), createQueryCtx()); + auto [task, result] = traceTaskRunner.maxDrivers(driverIds_.size()) + .splits(replayPlanNodeId_, getSplits()) + .run(copyResults); printStats(task); return result; } diff --git a/velox/tool/trace/TableScanReplayer.h b/velox/tool/trace/TableScanReplayer.h index 2981a5a0b2cf..58121851368b 100644 --- a/velox/tool/trace/TableScanReplayer.h +++ b/velox/tool/trace/TableScanReplayer.h @@ -46,7 +46,7 @@ class TableScanReplayer final : public OperatorReplayerBase { queryCapacity, executor) {} - RowVectorPtr run() override; + RowVectorPtr run(bool copyResults = true) override; private: core::PlanNodePtr createPlanNode( diff --git a/velox/tool/trace/TraceReplayRunner.cpp b/velox/tool/trace/TraceReplayRunner.cpp index 614885143dde..67c603db647e 100644 --- a/velox/tool/trace/TraceReplayRunner.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -95,6 +95,7 @@ DEFINE_uint64( query_memory_capacity_gb, 0, "Specify the query memory capacity limit in GB. If it is zero, then there is no limit."); +DEFINE_bool(copy_results, false, "Copy and return the replaying results."); namespace facebook::velox::tool::trace { namespace { @@ -395,6 +396,6 @@ void TraceReplayRunner::run() { return; } VELOX_USER_CHECK(!FLAGS_task_id.empty(), "--task_id must be provided"); - createReplayer()->run(); + createReplayer()->run(FLAGS_copy_results); } } // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h index 9e0c5f108af5..6dce49ca25a3 100644 --- a/velox/tool/trace/TraceReplayRunner.h +++ b/velox/tool/trace/TraceReplayRunner.h @@ -35,6 +35,7 @@ DECLARE_int32(shuffle_serialization_format); DECLARE_uint64(query_memory_capacity_gb); DECLARE_double(driver_cpu_executor_hw_multiplier); DECLARE_string(memory_arbitrator_type); +DECLARE_bool(copy_results); namespace facebook::velox::tool::trace { diff --git a/velox/tool/trace/TraceTaskRunner.cpp b/velox/tool/trace/TraceTaskRunner.cpp new file mode 100644 index 000000000000..5f415c84d097 --- /dev/null +++ b/velox/tool/trace/TraceTaskRunner.cpp @@ -0,0 +1,78 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/TraceTaskRunner.h" + +namespace facebook::velox::tool::trace { + +std::pair, RowVectorPtr> TraceTaskRunner::run( + bool copyResults) { + auto cursor = exec::test::TaskCursor::create(cursorParams_); + std::vector results; + auto* task = cursor->task().get(); + addSplits(task); + + while (cursor->moveNext()) { + results.push_back(cursor->current()); + } + + task->taskCompletionFuture().wait(); + + if (!copyResults) { + return {cursor->task(), nullptr}; + } + + auto totalCount = 0; + for (const auto& result : results) { + totalCount += result->size(); + } + auto copy = BaseVector::create( + results[0]->type(), totalCount, memory::traceMemoryPool()); + auto copyCount = 0; + for (const auto& result : results) { + copy->copy(result.get(), copyCount, 0, result->size()); + copyCount += result->size(); + } + return {cursor->task(), copy}; +} + +TraceTaskRunner& TraceTaskRunner::maxDrivers(int32_t maxDrivers) { + cursorParams_.maxDrivers = maxDrivers; + return *this; +} + +TraceTaskRunner& TraceTaskRunner::spillDirectory(const std::string& dir) { + cursorParams_.spillDirectory = dir; + return *this; +} + +TraceTaskRunner& TraceTaskRunner::splits( + const core::PlanNodeId& planNodeId, + std::vector splits) { + splits_[planNodeId] = std::move(splits); + return *this; +} + +void TraceTaskRunner::addSplits(exec::Task* task) { + for (auto& [nodeId, nodeSplits] : splits_) { + for (auto& split : nodeSplits) { + task->addSplit(nodeId, std::move(split)); + } + task->noMoreSplits(nodeId); + } +} + +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceTaskRunner.h b/velox/tool/trace/TraceTaskRunner.h new file mode 100644 index 000000000000..82eba3358788 --- /dev/null +++ b/velox/tool/trace/TraceTaskRunner.h @@ -0,0 +1,59 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "velox/exec/Cursor.h" + +namespace facebook::velox::tool::trace { + +class TraceTaskRunner { + public: + explicit TraceTaskRunner( + core::PlanNodePtr plan, + std::shared_ptr queryCtx) { + cursorParams_.planNode = std::move(plan); + cursorParams_.queryCtx = std::move(queryCtx); + } + + /// Run the replaying task. Return the copied results if 'copyResults' is + /// true or return null. + std::pair, RowVectorPtr> run( + bool copyResults = false); + + /// Max number of drivers. Default is 1. + TraceTaskRunner& maxDrivers(int32_t maxDrivers); + + /// Spilling directory, if not empty, then the task's spilling directory would + /// be built from it. + TraceTaskRunner& spillDirectory(const std::string& dir); + + /// Splits of this task. + TraceTaskRunner& splits( + const core::PlanNodeId& planNodeId, + std::vector splits); + + private: + void addSplits(exec::Task* task); + + inline static std::atomic cursorQueryId_ = 0; + + exec::test::CursorParameters cursorParams_; + std::unordered_map> splits_; + bool noMoreSplits_ = false; +}; +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/tests/HashJoinReplayerTest.cpp b/velox/tool/trace/tests/HashJoinReplayerTest.cpp index 19031f05408c..9f02ad67ff24 100644 --- a/velox/tool/trace/tests/HashJoinReplayerTest.cpp +++ b/velox/tool/trace/tests/HashJoinReplayerTest.cpp @@ -27,6 +27,7 @@ #include "velox/common/hyperloglog/SparseHll.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" #include "velox/exec/PlanNodeStats.h" #include "velox/exec/TableWriter.h" @@ -38,6 +39,7 @@ #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/tool/trace/HashJoinReplayer.h" +#include "velox/tool/trace/TraceReplayRunner.h" #include "velox/vector/tests/utils/VectorTestBase.h" using namespace facebook::velox; @@ -325,6 +327,89 @@ TEST_F(HashJoinReplayerTest, partialDriverIds) { faultyFs->clearFileFaultInjections(); } +TEST_F(HashJoinReplayerTest, runner) { + const auto planWithSplits = createPlan( + tableDir_, + core::JoinType::kInner, + probeKeys_, + buildKeys_, + probeInput_, + buildInput_); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + std::shared_ptr task; + auto tracePlanWithSplits = createPlan( + tableDir_, + core::JoinType::kInner, + probeKeys_, + buildKeys_, + probeInput_, + buildInput_); + AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); + traceBuilder.config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_); + for (const auto& [planNodeId, nodeSplits] : tracePlanWithSplits.splits) { + traceBuilder.splits(planNodeId, nodeSplits); + } + auto traceResult = traceBuilder.copyResults(pool(), task); + + const auto taskTraceDir = + exec::trace::getTaskTraceDirectory(traceRoot, *task); + const auto probeOperatorTraceDir = exec::trace::getOpTraceDirectory( + taskTraceDir, + traceNodeId_, + /*pipelineId=*/0, + /*driverId=*/0); + const auto probeSummary = + exec::trace::OperatorTraceSummaryReader(probeOperatorTraceDir, pool()) + .read(); + ASSERT_EQ(probeSummary.opType, "HashProbe"); + ASSERT_GT(probeSummary.peakMemory, 0); + ASSERT_GT(probeSummary.inputRows, 0); + ASSERT_GT(probeSummary.inputBytes, 0); + ASSERT_EQ(probeSummary.rawInputRows, 0); + ASSERT_EQ(probeSummary.rawInputBytes, 0); + + const auto buildOperatorTraceDir = exec::trace::getOpTraceDirectory( + taskTraceDir, + traceNodeId_, + /*pipelineId=*/1, + /*driverId=*/0); + const auto buildSummary = + exec::trace::OperatorTraceSummaryReader(buildOperatorTraceDir, pool()) + .read(); + ASSERT_EQ(buildSummary.opType, "HashBuild"); + ASSERT_GT(buildSummary.peakMemory, 0); + ASSERT_GT(buildSummary.inputRows, 0); + // NOTE: the input bytes is 0 because of the lazy materialization. + ASSERT_EQ(buildSummary.inputBytes, 0); + ASSERT_EQ(buildSummary.rawInputRows, 0); + ASSERT_EQ(buildSummary.rawInputBytes, 0); + + FLAGS_root_dir = traceRoot; + FLAGS_query_id = task->queryCtx()->queryId(); + FLAGS_task_id = task->taskId(); + FLAGS_node_id = traceNodeId_; + FLAGS_summary = true; + { + TraceReplayRunner runner; + runner.init(); + runner.run(); + } + + FLAGS_task_id = task->taskId(); + FLAGS_driver_ids = ""; + FLAGS_summary = false; + { + TraceReplayRunner runner; + runner.init(); + runner.run(); + } +} + DEBUG_ONLY_TEST_F(HashJoinReplayerTest, hashBuildSpill) { const auto planWithSplits = createPlan( tableDir_,