Skip to content

Commit

Permalink
Support different vector serialization format for streaming shuffle (#…
Browse files Browse the repository at this point in the history
…11445)

Summary:
Pull Request resolved: #11445

Add to support row wise shuffle to optimize both cpu and memory efficiency in workload with very large
number of shuffle columns. This PR includes:
(1) make the vector serde format configurable through plan node: producer side: partition output
consumer side: exchange and merge exchange
(2) all the three shuffle operators support to get the vector serde for serialization/deserialization based
on the one specified in the corresponding query plan node. A separate change from Presto
coordinator will set the format based on the number of column streams in shuffle data type plus
a per-query session property to enable/disable
(3) add two set of APIs in VectorSerde and the corresponding serializer to support estimate and serialize the
vectors in two different row formats: compact row and unsafe row. The construction of the two row formats are
cpu intensive and we shall construct them once per one partition output batch processing. Also the two row formats
serialized size estimations are accurate which is unlike the columnar size estimation which is approximate and the
actual memory allocations happens progressively during the serialization process through a memory arena.
The row wise serialization allocates memory once before the actual serialization. Given that, we optimize that by directly
leveraging the estimated serialized size for
serialization buffer allocation. So we construct row wise vector once, estimate serialized size once.
(4) changes exchange to optimize the row deserializer which don't support append deserialization (as for now) by
merging the iobufs from multiple serialized pages and deserialize all together and we expect row deserializer can consume
all the serialized vectors given its serialization format
(5) Fix the bug in compact row and unsafe row's deserialization code path as the vector extends, the pointers (string view) to the existing
buffers are invalid. The current processing is very inefficient and optimize the process with unique_ptr to the string buffer and
avoid unnecessary data copy.
(6) Add an operator runtime stats to indicate if the serialization format used for shuffle to help performance debugging
(7) Fix a couple of issues in operator trace replay like fix partition output close to call base operator
close to write the summary trace file
(8) some code, test refactor and cleanup in relevant code path.

With partition output replay with trace collected from production, we have seen the peak memory usage
has been reduced by 10x and replay execution time reduced by half.

A couple of followup optimizations:
(1) support compression for unsafe row and compact row on flush
if needs
(2) support columnar wise row size estimations which might have 2x improvement on estimation time based on
a draft implementation
(3) support row wise deserialize append to avoid potential small vectors at consumer side.

Reviewed By: arhimondr, oerling

Differential Revision: D65258176
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Nov 7, 2024
1 parent 396b2bb commit 10180f5
Show file tree
Hide file tree
Showing 64 changed files with 2,145 additions and 881 deletions.
3 changes: 2 additions & 1 deletion velox/benchmarks/basic/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,5 @@ target_link_libraries(
${velox_benchmark_deps}
velox_vector_test_lib
velox_functions_spark
velox_functions_prestosql)
velox_functions_prestosql
velox_row_fast)
3 changes: 0 additions & 3 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@

#pragma once

#include <array>
#include <atomic>
#include <memory>
#include <optional>
#include <queue>

#include <fmt/format.h>
#include "velox/common/base/BitUtil.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/Portability.h"
#include "velox/common/future/VeloxPromise.h"
#include "velox/common/memory/Allocation.h"
#include "velox/common/memory/MemoryAllocator.h"
#include "velox/common/memory/MemoryArbitrator.h"
Expand Down
137 changes: 129 additions & 8 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ void addSortingKeys(
stream << sortingKeys[i]->name() << " " << sortingOrders[i].toString();
}
}

void addVectorSerdeKind(VectorSerde::Kind kind, std::stringstream& stream) {
stream << VectorSerde::kindName(kind);
}
} // namespace

bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
Expand Down Expand Up @@ -804,20 +808,23 @@ const std::vector<PlanNodePtr>& ExchangeNode::sources() const {
return kEmptySources;
}

void ExchangeNode::addDetails(std::stringstream& /* stream */) const {
// Nothing to add.
void ExchangeNode::addDetails(std::stringstream& stream) const {
addVectorSerdeKind(serdeKind_, stream);
}

folly::dynamic ExchangeNode::serialize() const {
auto obj = PlanNode::serialize();
obj["outputType"] = ExchangeNode::outputType()->serialize();
obj["serdeKind"] = VectorSerde::kindName(serdeKind_);
return obj;
}

// static
PlanNodePtr ExchangeNode::create(const folly::dynamic& obj, void* context) {
auto outputType = deserializeRowType(obj["outputType"]);
return std::make_shared<ExchangeNode>(deserializePlanNodeId(obj), outputType);
return std::make_shared<ExchangeNode>(
deserializePlanNodeId(obj),
deserializeRowType(obj["outputType"]),
VectorSerde::kindByName(obj["serdeKind"].asString()));
}

UnnestNode::UnnestNode(
Expand Down Expand Up @@ -1945,27 +1952,45 @@ PlanNodePtr TableWriteMergeNode::create(
id, outputType, aggregationNode, source);
}

MergeExchangeNode::MergeExchangeNode(
const PlanNodeId& id,
const RowTypePtr& type,
const std::vector<FieldAccessTypedExprPtr>& sortingKeys,
const std::vector<SortOrder>& sortingOrders,
VectorSerde::Kind serdeKind)
: ExchangeNode(id, type, serdeKind),
sortingKeys_(sortingKeys),
sortingOrders_(sortingOrders) {}

void MergeExchangeNode::addDetails(std::stringstream& stream) const {
addSortingKeys(sortingKeys_, sortingOrders_, stream);
stream << ", ";
addVectorSerdeKind(serdeKind(), stream);
}

folly::dynamic MergeExchangeNode::serialize() const {
auto obj = PlanNode::serialize();
obj["outputType"] = ExchangeNode::outputType()->serialize();
obj["sortingKeys"] = ISerializable::serialize(sortingKeys_);
obj["sortingOrders"] = serializeSortingOrders(sortingOrders_);
obj["serdeKind"] = VectorSerde::kindName(serdeKind());
return obj;
}

// static
PlanNodePtr MergeExchangeNode::create(
const folly::dynamic& obj,
void* context) {
auto outputType = deserializeRowType(obj["outputType"]);
auto sortingKeys = deserializeFields(obj["sortingKeys"], context);
auto sortingOrders = deserializeSortingOrders(obj["sortingOrders"]);
const auto outputType = deserializeRowType(obj["outputType"]);
const auto sortingKeys = deserializeFields(obj["sortingKeys"], context);
const auto sortingOrders = deserializeSortingOrders(obj["sortingOrders"]);
const auto serdeKind = VectorSerde::kindByName(obj["serdeKind"].asString());
return std::make_shared<MergeExchangeNode>(
deserializePlanNodeId(obj), outputType, sortingKeys, sortingOrders);
deserializePlanNodeId(obj),
outputType,
sortingKeys,
sortingOrders,
serdeKind);
}

void LocalPartitionNode::addDetails(std::stringstream& stream) const {
Expand Down Expand Up @@ -2024,6 +2049,97 @@ LocalPartitionNode::Type LocalPartitionNode::typeFromName(
return it->second;
}

PartitionedOutputNode::PartitionedOutputNode(
const PlanNodeId& id,
Kind kind,
const std::vector<TypedExprPtr>& keys,
int numPartitions,
bool replicateNullsAndAny,
PartitionFunctionSpecPtr partitionFunctionSpec,
RowTypePtr outputType,
VectorSerde::Kind serdeKind,
PlanNodePtr source)
: PlanNode(id),
kind_(kind),
sources_{{std::move(source)}},
keys_(keys),
numPartitions_(numPartitions),
replicateNullsAndAny_(replicateNullsAndAny),
partitionFunctionSpec_(std::move(partitionFunctionSpec)),
serdeKind_(serdeKind),
outputType_(std::move(outputType)) {
VELOX_USER_CHECK_GT(numPartitions_, 0);
if (numPartitions_ == 1) {
VELOX_USER_CHECK(
keys_.empty(),
"Non-empty partitioning keys require more than one partition");
}
if (!isPartitioned()) {
VELOX_USER_CHECK(
keys_.empty(),
"{} partitioning doesn't allow for partitioning keys",
kindString(kind_));
}
}

// static
std::shared_ptr<PartitionedOutputNode> PartitionedOutputNode::broadcast(
const PlanNodeId& id,
int numPartitions,
RowTypePtr outputType,
VectorSerde::Kind serdeKind,
PlanNodePtr source) {
std::vector<TypedExprPtr> noKeys;
return std::make_shared<PartitionedOutputNode>(
id,
Kind::kBroadcast,
noKeys,
numPartitions,
false,
std::make_shared<GatherPartitionFunctionSpec>(),
std::move(outputType),
serdeKind,
std::move(source));
}

// static
std::shared_ptr<PartitionedOutputNode> PartitionedOutputNode::arbitrary(
const PlanNodeId& id,
RowTypePtr outputType,
VectorSerde::Kind serdeKind,
PlanNodePtr source) {
std::vector<TypedExprPtr> noKeys;
return std::make_shared<PartitionedOutputNode>(
id,
Kind::kArbitrary,
noKeys,
1,
false,
std::make_shared<GatherPartitionFunctionSpec>(),
std::move(outputType),
serdeKind,
std::move(source));
}

// static
std::shared_ptr<PartitionedOutputNode> PartitionedOutputNode::single(
const PlanNodeId& id,
RowTypePtr outputType,
VectorSerde::Kind serdeKind,
PlanNodePtr source) {
std::vector<TypedExprPtr> noKeys;
return std::make_shared<PartitionedOutputNode>(
id,
Kind::kPartitioned,
noKeys,
1,
false,
std::make_shared<GatherPartitionFunctionSpec>(),
std::move(outputType),
serdeKind,
std::move(source));
}

void EnforceSingleRowNode::addDetails(std::stringstream& /* stream */) const {
// Nothing to add.
}
Expand Down Expand Up @@ -2091,6 +2207,9 @@ void PartitionedOutputNode::addDetails(std::stringstream& stream) const {
if (replicateNullsAndAny_) {
stream << " replicate nulls and any";
}

stream << " ";
addVectorSerdeKind(serdeKind_, stream);
}

folly::dynamic PartitionedOutputNode::serialize() const {
Expand All @@ -2100,6 +2219,7 @@ folly::dynamic PartitionedOutputNode::serialize() const {
obj["keys"] = ISerializable::serialize(keys_);
obj["replicateNullsAndAny"] = replicateNullsAndAny_;
obj["partitionFunctionSpec"] = partitionFunctionSpec_->serialize();
obj["serdeKind"] = VectorSerde::kindName(serdeKind_);
obj["outputType"] = outputType_->serialize();
return obj;
}
Expand All @@ -2117,6 +2237,7 @@ PlanNodePtr PartitionedOutputNode::create(
ISerializable::deserialize<PartitionFunctionSpec>(
obj["partitionFunctionSpec"], context),
deserializeRowType(obj["outputType"]),
VectorSerde::kindByName(obj["serdeKind"].asString()),
deserializeSingleSource(obj, context));
}

Expand Down
Loading

0 comments on commit 10180f5

Please sign in to comment.