Skip to content

Commit

Permalink
feat(fuzzer): Update makeAlternatePlans to generate multi-join plans (f…
Browse files Browse the repository at this point in the history
…acebookincubator#11942)

Summary:

Change makeAlternatePlans functions to produce cascading multi-joins.

Differential Revision: D67607654
  • Loading branch information
Daniel Hunte authored and facebook-github-bot committed Jan 7, 2025
1 parent d1165a4 commit 4c03bd8
Showing 1 changed file with 88 additions and 40 deletions.
128 changes: 88 additions & 40 deletions velox/exec/fuzzer/JoinFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,14 @@ class JoinFuzzer {

void makeAlternativePlans(
const PlanWithSplits& plan,
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
std::vector<JoinFuzzer::PlanWithSplits>& plans,
const std::string& filter);
const std::vector<core::JoinType> joinTypes,
const std::vector<bool> nullAwareList,
const std::vector<std::vector<std::string>>& probeKeysList,
const std::vector<std::vector<std::string>>& buildKeysList,
const std::vector<std::vector<RowVectorPtr>>& inputs,
const std::vector<std::vector<std::string>>& outputColumnsList,
const std::vector<std::string>& filterList,
std::vector<JoinFuzzer::PlanWithSplits>& plans);

// Runs one test iteration from query plans generations, executions and result
// verifications.
Expand Down Expand Up @@ -909,63 +913,90 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan(

void JoinFuzzer::makeAlternativePlans(
const PlanWithSplits& plan,
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
std::vector<JoinFuzzer::PlanWithSplits>& plans,
const std::string& filter) {
const std::vector<core::JoinType> joinTypes,
const std::vector<bool> nullAwareList,
const std::vector<std::vector<std::string>>& probeKeysList,
const std::vector<std::vector<std::string>>& buildKeysList,
const std::vector<std::vector<RowVectorPtr>>& inputs,
const std::vector<std::vector<std::string>>& outputColumnsList,
const std::vector<std::string>& filterList,
std::vector<JoinFuzzer::PlanWithSplits>& plans) {
auto joinNode =
std::dynamic_pointer_cast<const core::HashJoinNode>(plan.plan);
VELOX_CHECK_NOT_NULL(joinNode);

// Flip join sides.
addFlippedJoinPlan<core::HashJoinNode>(plan, plans);

// Parallelize probe and build sides.
const auto probeKeys = fieldNames(joinNode->leftKeys());
const auto buildKeys = fieldNames(joinNode->rightKeys());
const auto outputColumns = joinNode->outputType()->names();
const auto joinType = joinNode->joinType();

// Add plan with local partition round robin for inputs.
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
plans.push_back(JoinFuzzer::PlanWithSplits{
PlanBuilder partitionPlan =
PlanBuilder(planNodeIdGenerator)
.localPartitionRoundRobin(
makeSources(probeInput, planNodeIdGenerator))
.localPartitionRoundRobin(makeSources(inputs[0], planNodeIdGenerator))
.hashJoin(
probeKeys,
buildKeys,
probeKeysList[0],
buildKeysList[0],
PlanBuilder(planNodeIdGenerator)
.localPartitionRoundRobin(
makeSources(buildInput, planNodeIdGenerator))
makeSources(inputs[1], planNodeIdGenerator))
.planNode(),
filter,
outputColumns,
joinType,
joinNode->isNullAware())
.planNode()});
filterList[0],
outputColumnsList[0],
joinTypes[0],
nullAwareList[0]);
for (int i = 1; i < inputs.size() - 1; i++) {
partitionPlan.hashJoin(
probeKeysList[i],
buildKeysList[i],
PlanBuilder(planNodeIdGenerator)
.localPartitionRoundRobin(
makeSources(inputs[i + 1], planNodeIdGenerator))
.planNode(),
filterList[i],
outputColumnsList[i],
joinTypes[i],
nullAwareList[i]);
}
plans.push_back(PlanWithSplits{partitionPlan.planNode()});

bool mergeJoinSupported = true;
bool NLJSupported = true;
for (const core::JoinType& joinType : joinTypes) {
if (!core::NestedLoopJoinNode::isSupported(joinType)) {
NLJSupported = false;
}
if (!core::MergeJoinNode::isSupported(joinType)) {
mergeJoinSupported = false;
}
}
// Use OrderBy + MergeJoin
if (core::MergeJoinNode::isSupported(joinNode->joinType())) {
if (mergeJoinSupported) {
auto planWithSplits = makeMergeJoinPlan(
{joinType},
{probeKeys},
{buildKeys},
{probeInput, buildInput},
{outputColumns},
{filter});
joinTypes,
probeKeysList,
buildKeysList,
inputs,
outputColumnsList,
filterList);
plans.push_back(planWithSplits);

addFlippedJoinPlan<core::MergeJoinNode>(planWithSplits, plans);
}

// Use NestedLoopJoin.
if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) {
std::string joinCondition = filter.empty()
? makeJoinFilter(probeKeys, buildKeys)
: fmt::format(
"{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter);
if (NLJSupported) {
std::vector<std::string> joinConditionList;
for (int i = 0; i < probeKeysList.size(); i++) {
joinConditionList.push_back(
filterList[0].empty()
? makeJoinFilter(probeKeysList[i], buildKeysList[i])
: fmt::format(
"{} AND {}",
makeJoinFilter(probeKeysList[i], buildKeysList[i]),
filterList[i]));
}
auto planWithSplits = makeNestedLoopJoinPlan(
{joinType}, {probeInput, buildInput}, {outputColumns}, {joinCondition});
joinTypes, inputs, outputColumnsList, joinConditionList);
plans.push_back(planWithSplits);

addFlippedJoinPlan<core::NestedLoopJoinNode>(planWithSplits, plans);
Expand Down Expand Up @@ -1185,9 +1216,26 @@ void JoinFuzzer::verify(core::JoinType joinType) {
{outputColumns},
{filter}));

makeAlternativePlans(defaultPlan, probeInput, buildInput, altPlans, filter);
makeAlternativePlans(
defaultPlan, flatProbeInput, flatBuildInput, altPlans, filter);
defaultPlan,
{joinType},
{nullAware},
{probeKeys},
{buildKeys},
{probeInput, buildInput},
{outputColumns},
{filter},
altPlans);
makeAlternativePlans(
defaultPlan,
{joinType},
{nullAware},
{probeKeys},
{buildKeys},
{flatProbeInput, flatBuildInput},
{outputColumns},
{filter},
altPlans);

addPlansWithTableScan(
tableScanDir->getPath(),
Expand Down

0 comments on commit 4c03bd8

Please sign in to comment.