diff --git a/velox/docs/develop/debugging/tracing.rst b/velox/docs/develop/debugging/tracing.rst index 3b2baaf11553..f651a368239e 100644 --- a/velox/docs/develop/debugging/tracing.rst +++ b/velox/docs/develop/debugging/tracing.rst @@ -366,3 +366,7 @@ Here is a full list of supported command line arguments. * ``--shuffle_serialization_format``: Specify the shuffle serialization format. * ``--table_writer_output_dir``: Specify the output directory of TableWriter. * ``--hiveConnectorExecutorHwMultiplier``: Hardware multiplier for hive connector. +* ``--driver_cpu_executor_hw_multiplier``: Hardware multipler for driver cpu executor. +* ``--memory_arbitrator_type``: Specify the memory arbitrator type. +* ``--query_memory_capacity_gb``: Specify the query memory capacity limit in GB. If it is zero, then there is no limit. +* ``--copy_results``: If true, copy the replaying result. diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index d36ee8fab023..fe765bd8dfe4 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -21,7 +21,8 @@ add_library( PartitionedOutputReplayer.cpp TableScanReplayer.cpp TableWriterReplayer.cpp - TraceReplayRunner.cpp) + TraceReplayRunner.cpp + TraceReplayTaskRunner.cpp) target_link_libraries( velox_query_trace_replayer_base diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index 5bd8634eb392..f75549f4349b 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -26,6 +26,7 @@ #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/tool/trace/OperatorReplayerBase.h" +#include "velox/tool/trace/TraceReplayTaskRunner.h" using namespace facebook::velox; @@ -72,36 +73,18 @@ OperatorReplayerBase::OperatorReplayerBase( queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false"; } -RowVectorPtr OperatorReplayerBase::run() { - std::shared_ptr task; - const auto restoredPlanNode = createPlan(); - auto queryPool = memory::memoryManager()->addRootPool( - "OperatorReplayerBase", queryCapacity_); - std::unordered_map> - connectorConfigs; - for (auto& [connectorId, configs] : connectorConfigs_) { - connectorConfigs.emplace( - connectorId, std::make_shared(std::move(configs))); - } - auto queryCtx = core::QueryCtx::create( - executor_, - core::QueryConfig{queryConfigs_}, - std::move(connectorConfigs), - nullptr, - std::move(queryPool), - executor_); - +RowVectorPtr OperatorReplayerBase::run(bool copyResults) { + auto queryCtx = createQueryCtx(); std::shared_ptr spillDirectory; if (queryCtx->queryConfig().spillEnabled()) { spillDirectory = exec::test::TempDirectoryPath::create(); } - const auto result = - exec::test::AssertQueryBuilder(restoredPlanNode) - .maxDrivers(driverIds_.size()) - .queryCtx(std::move(queryCtx)) + TraceReplayTaskRunner traceTaskRunner(createPlan(), std::move(queryCtx)); + auto [task, result] = + traceTaskRunner.maxDrivers(driverIds_.size()) .spillDirectory(spillDirectory ? spillDirectory->getPath() : "") - .copyResults(memory::MemoryManager::getInstance()->tracePool(), task); + .run(copyResults); printStats(task); return result; } @@ -129,6 +112,24 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() { .planNode(); } +std::shared_ptr OperatorReplayerBase::createQueryCtx() { + auto queryPool = memory::memoryManager()->addRootPool( + fmt::format("{}_replayer", operatorType_), queryCapacity_); + std::unordered_map> + connectorConfigs; + for (auto& [connectorId, configs] : connectorConfigs_) { + connectorConfigs.emplace( + connectorId, std::make_shared(std::move(configs))); + } + return core::QueryCtx::create( + executor_, + core::QueryConfig{queryConfigs_}, + std::move(connectorConfigs), + nullptr, + std::move(queryPool), + executor_); +} + std::function OperatorReplayerBase::replayNodeFactory(const core::PlanNode* node) const { return [=](const core::PlanNodeId& nodeId, diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index 3cb36ead7b9e..e804e08d768b 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -18,6 +18,7 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" +#include "velox/core/QueryCtx.h" #include "velox/parse/PlanNodeIdGenerator.h" namespace facebook::velox::exec { @@ -44,7 +45,7 @@ class OperatorReplayerBase { OperatorReplayerBase& operator=(OperatorReplayerBase&& other) noexcept = delete; - virtual RowVectorPtr run(); + virtual RowVectorPtr run(bool copyResults = true); protected: virtual core::PlanNodePtr createPlanNode( @@ -54,6 +55,8 @@ class OperatorReplayerBase { core::PlanNodePtr createPlan(); + std::shared_ptr createQueryCtx(); + const std::string queryId_; const std::string taskId_; const std::string nodeId_; diff --git a/velox/tool/trace/PartitionedOutputReplayer.cpp b/velox/tool/trace/PartitionedOutputReplayer.cpp index 678a7e0598aa..48c7a6b1459d 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.cpp +++ b/velox/tool/trace/PartitionedOutputReplayer.cpp @@ -138,7 +138,7 @@ PartitionedOutputReplayer::PartitionedOutputReplayer( std::make_shared("Consumer")); } -RowVectorPtr PartitionedOutputReplayer::run() { +RowVectorPtr PartitionedOutputReplayer::run(bool /*unused*/) { const auto task = Task::create( "local://partitioned-output-replayer", core::PlanFragment{createPlan()}, diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h index ea2db9a4c1be..f9eb0c1cd9ed 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.h +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -52,7 +52,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase { folly::Executor* executor, const ConsumerCallBack& consumerCb = [](auto partition, auto page) {}); - RowVectorPtr run() override; + RowVectorPtr run(bool /*unused*/) override; private: core::PlanNodePtr createPlanNode( diff --git a/velox/tool/trace/TableScanReplayer.cpp b/velox/tool/trace/TableScanReplayer.cpp index 0f9805a03c38..78dcc6612a81 100644 --- a/velox/tool/trace/TableScanReplayer.cpp +++ b/velox/tool/trace/TableScanReplayer.cpp @@ -15,11 +15,13 @@ */ #include "velox/tool/trace/TableScanReplayer.h" + #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/exec/OperatorTraceReader.h" #include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/tool/trace/TraceReplayTaskRunner.h" using namespace facebook::velox; using namespace facebook::velox::exec; @@ -27,16 +29,11 @@ using namespace facebook::velox::exec::test; namespace facebook::velox::tool::trace { -RowVectorPtr TableScanReplayer::run() { - std::shared_ptr task; - const auto plan = createPlan(); - const auto result = - exec::test::AssertQueryBuilder(plan) - .maxDrivers(driverIds_.size()) - .configs(queryConfigs_) - .connectorSessionProperties(connectorConfigs_) - .splits(getSplits()) - .copyResults(memory::MemoryManager::getInstance()->tracePool(), task); +RowVectorPtr TableScanReplayer::run(bool copyResults) { + TraceReplayTaskRunner traceTaskRunner(createPlan(), createQueryCtx()); + auto [task, result] = traceTaskRunner.maxDrivers(driverIds_.size()) + .splits(replayPlanNodeId_, getSplits()) + .run(copyResults); printStats(task); return result; } diff --git a/velox/tool/trace/TableScanReplayer.h b/velox/tool/trace/TableScanReplayer.h index 2981a5a0b2cf..58121851368b 100644 --- a/velox/tool/trace/TableScanReplayer.h +++ b/velox/tool/trace/TableScanReplayer.h @@ -46,7 +46,7 @@ class TableScanReplayer final : public OperatorReplayerBase { queryCapacity, executor) {} - RowVectorPtr run() override; + RowVectorPtr run(bool copyResults = true) override; private: core::PlanNodePtr createPlanNode( diff --git a/velox/tool/trace/TraceReplayRunner.cpp b/velox/tool/trace/TraceReplayRunner.cpp index 614885143dde..b88e281335bf 100644 --- a/velox/tool/trace/TraceReplayRunner.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -95,6 +95,7 @@ DEFINE_uint64( query_memory_capacity_gb, 0, "Specify the query memory capacity limit in GB. If it is zero, then there is no limit."); +DEFINE_bool(copy_results, false, "Copy the replaying results."); namespace facebook::velox::tool::trace { namespace { @@ -395,6 +396,6 @@ void TraceReplayRunner::run() { return; } VELOX_USER_CHECK(!FLAGS_task_id.empty(), "--task_id must be provided"); - createReplayer()->run(); + createReplayer()->run(FLAGS_copy_results); } } // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h index 9e0c5f108af5..7af682c2b14b 100644 --- a/velox/tool/trace/TraceReplayRunner.h +++ b/velox/tool/trace/TraceReplayRunner.h @@ -31,10 +31,12 @@ DECLARE_string(node_id); DECLARE_int32(driver_id); DECLARE_string(driver_ids); DECLARE_string(table_writer_output_dir); +DECLARE_double(hive_connector_executor_hw_multiplier); DECLARE_int32(shuffle_serialization_format); DECLARE_uint64(query_memory_capacity_gb); DECLARE_double(driver_cpu_executor_hw_multiplier); DECLARE_string(memory_arbitrator_type); +DECLARE_bool(copy_results); namespace facebook::velox::tool::trace { diff --git a/velox/tool/trace/TraceReplayTaskRunner.cpp b/velox/tool/trace/TraceReplayTaskRunner.cpp new file mode 100644 index 000000000000..0a5aa32eb05d --- /dev/null +++ b/velox/tool/trace/TraceReplayTaskRunner.cpp @@ -0,0 +1,84 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/TraceReplayTaskRunner.h" + +namespace facebook::velox::tool::trace { + +std::pair, RowVectorPtr> TraceReplayTaskRunner::run( + bool copyResults) { + auto cursor = exec::test::TaskCursor::create(cursorParams_); + std::vector results; + auto* task = cursor->task().get(); + addSplits(task); + + while (cursor->moveNext()) { + results.push_back(cursor->current()); + } + + task->taskCompletionFuture().wait(); + + if (copyResults) { + return {cursor->task(), copy(results)}; + } + + return {cursor->task(), nullptr}; +} + +std::shared_ptr TraceReplayTaskRunner::copy( + const std::vector& results) { + auto totalRows = 0; + for (const auto& result : results) { + totalRows += result->size(); + } + auto copyResult = BaseVector::create( + results[0]->type(), totalRows, memory::traceMemoryPool()); + auto resultRowOffset = 0; + for (const auto& result : results) { + copyResult->copy(result.get(), resultRowOffset, 0, result->size()); + resultRowOffset += result->size(); + } + return copyResult; +} + +TraceReplayTaskRunner& TraceReplayTaskRunner::maxDrivers(int32_t maxDrivers) { + cursorParams_.maxDrivers = maxDrivers; + return *this; +} + +TraceReplayTaskRunner& TraceReplayTaskRunner::spillDirectory( + const std::string& dir) { + cursorParams_.spillDirectory = dir; + return *this; +} + +TraceReplayTaskRunner& TraceReplayTaskRunner::splits( + const core::PlanNodeId& planNodeId, + std::vector splits) { + splits_[planNodeId] = std::move(splits); + return *this; +} + +void TraceReplayTaskRunner::addSplits(exec::Task* task) { + for (auto& [nodeId, nodeSplits] : splits_) { + for (auto& split : nodeSplits) { + task->addSplit(nodeId, std::move(split)); + } + task->noMoreSplits(nodeId); + } +} + +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceReplayTaskRunner.h b/velox/tool/trace/TraceReplayTaskRunner.h new file mode 100644 index 000000000000..a00b2d2311ea --- /dev/null +++ b/velox/tool/trace/TraceReplayTaskRunner.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "velox/exec/Cursor.h" + +namespace facebook::velox::tool::trace { + +class TraceReplayTaskRunner { + public: + explicit TraceReplayTaskRunner( + core::PlanNodePtr plan, + std::shared_ptr queryCtx) { + cursorParams_.planNode = std::move(plan); + cursorParams_.queryCtx = std::move(queryCtx); + } + + /// Run the replaying task. Return the copied results if 'copyResults' is + /// true or return null. + std::pair, RowVectorPtr> run( + bool copyResults = false); + + /// Max number of drivers. Default is 1. + TraceReplayTaskRunner& maxDrivers(int32_t maxDrivers); + + /// Spilling directory, if not empty, then the task's spilling directory would + /// be built from it. + TraceReplayTaskRunner& spillDirectory(const std::string& dir); + + /// Splits of this task. + TraceReplayTaskRunner& splits( + const core::PlanNodeId& planNodeId, + std::vector splits); + + private: + void addSplits(exec::Task* task); + + static std::shared_ptr copy( + const std::vector& results); + + exec::test::CursorParameters cursorParams_; + std::unordered_map> splits_; + bool noMoreSplits_ = false; +}; +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/tests/HashJoinReplayerTest.cpp b/velox/tool/trace/tests/HashJoinReplayerTest.cpp index 19031f05408c..9f02ad67ff24 100644 --- a/velox/tool/trace/tests/HashJoinReplayerTest.cpp +++ b/velox/tool/trace/tests/HashJoinReplayerTest.cpp @@ -27,6 +27,7 @@ #include "velox/common/hyperloglog/SparseHll.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" #include "velox/exec/PlanNodeStats.h" #include "velox/exec/TableWriter.h" @@ -38,6 +39,7 @@ #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/tool/trace/HashJoinReplayer.h" +#include "velox/tool/trace/TraceReplayRunner.h" #include "velox/vector/tests/utils/VectorTestBase.h" using namespace facebook::velox; @@ -325,6 +327,89 @@ TEST_F(HashJoinReplayerTest, partialDriverIds) { faultyFs->clearFileFaultInjections(); } +TEST_F(HashJoinReplayerTest, runner) { + const auto planWithSplits = createPlan( + tableDir_, + core::JoinType::kInner, + probeKeys_, + buildKeys_, + probeInput_, + buildInput_); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + std::shared_ptr task; + auto tracePlanWithSplits = createPlan( + tableDir_, + core::JoinType::kInner, + probeKeys_, + buildKeys_, + probeInput_, + buildInput_); + AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); + traceBuilder.config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_); + for (const auto& [planNodeId, nodeSplits] : tracePlanWithSplits.splits) { + traceBuilder.splits(planNodeId, nodeSplits); + } + auto traceResult = traceBuilder.copyResults(pool(), task); + + const auto taskTraceDir = + exec::trace::getTaskTraceDirectory(traceRoot, *task); + const auto probeOperatorTraceDir = exec::trace::getOpTraceDirectory( + taskTraceDir, + traceNodeId_, + /*pipelineId=*/0, + /*driverId=*/0); + const auto probeSummary = + exec::trace::OperatorTraceSummaryReader(probeOperatorTraceDir, pool()) + .read(); + ASSERT_EQ(probeSummary.opType, "HashProbe"); + ASSERT_GT(probeSummary.peakMemory, 0); + ASSERT_GT(probeSummary.inputRows, 0); + ASSERT_GT(probeSummary.inputBytes, 0); + ASSERT_EQ(probeSummary.rawInputRows, 0); + ASSERT_EQ(probeSummary.rawInputBytes, 0); + + const auto buildOperatorTraceDir = exec::trace::getOpTraceDirectory( + taskTraceDir, + traceNodeId_, + /*pipelineId=*/1, + /*driverId=*/0); + const auto buildSummary = + exec::trace::OperatorTraceSummaryReader(buildOperatorTraceDir, pool()) + .read(); + ASSERT_EQ(buildSummary.opType, "HashBuild"); + ASSERT_GT(buildSummary.peakMemory, 0); + ASSERT_GT(buildSummary.inputRows, 0); + // NOTE: the input bytes is 0 because of the lazy materialization. + ASSERT_EQ(buildSummary.inputBytes, 0); + ASSERT_EQ(buildSummary.rawInputRows, 0); + ASSERT_EQ(buildSummary.rawInputBytes, 0); + + FLAGS_root_dir = traceRoot; + FLAGS_query_id = task->queryCtx()->queryId(); + FLAGS_task_id = task->taskId(); + FLAGS_node_id = traceNodeId_; + FLAGS_summary = true; + { + TraceReplayRunner runner; + runner.init(); + runner.run(); + } + + FLAGS_task_id = task->taskId(); + FLAGS_driver_ids = ""; + FLAGS_summary = false; + { + TraceReplayRunner runner; + runner.init(); + runner.run(); + } +} + DEBUG_ONLY_TEST_F(HashJoinReplayerTest, hashBuildSpill) { const auto planWithSplits = createPlan( tableDir_, diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp index 87a896bfb8a8..46e2ddea5b35 100644 --- a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -163,7 +163,7 @@ TEST_P(PartitionedOutputReplayerTest, defaultConsumer) { "", 0, executor_.get()) - .run()); + .run(false)); } TEST_P(PartitionedOutputReplayerTest, basic) { @@ -260,7 +260,7 @@ TEST_P(PartitionedOutputReplayerTest, basic) { [&](auto partition, auto page) { replayedPartitionedResults[partition].push_back(std::move(page)); }) - .run(); + .run(false); ASSERT_EQ(replayedPartitionedResults.size(), testParam.numPartitions); for (uint32_t partition = 0; partition < testParam.numPartitions;