diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 5218a9ff4db1e..a77a4c56bc5d2 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -142,7 +142,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string& filter); JoinFuzzer::PlanWithSplits makeMergeJoinPlan( core::JoinType joinType, @@ -150,7 +151,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string& filter); // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. // If withFilter is true, uses the equality filter between probeKeys and @@ -162,7 +164,7 @@ class JoinFuzzer { const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - bool withFilter = true); + const std::string& filter); // Makes the default query plan with table scan as inputs for both probe and // build sides. @@ -175,7 +177,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string& filter); JoinFuzzer::PlanWithSplits makeMergeJoinPlanWithTableScan( core::JoinType joinType, @@ -185,7 +188,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string& filter); // Returns a PlanWithSplits for NestedLoopJoin with inputs from TableScan // nodes. If withFilter is true, uses the equiality filter between probeKeys @@ -199,13 +203,14 @@ class JoinFuzzer { const std::vector& probeSplits, const std::vector& buildSplits, const std::vector& outputColumns, - bool withFilter = true); + const std::string& filter); void makeAlternativePlans( const core::PlanNodePtr& plan, const std::vector& probeInput, const std::vector& buildInput, - std::vector& plans); + std::vector& plans, + const std::string& filter); // Makes the query plan from 'planWithTableScan' with grouped execution mode. // Correspondingly, it replaces the table scan input splits with grouped ones. @@ -249,7 +254,8 @@ class JoinFuzzer { const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - std::vector& altPlans); + std::vector& altPlans, + const std::string& filter = ""); // Splits the input into groups by partitioning on the join keys. std::vector> splitInputByGroup( @@ -597,6 +603,12 @@ std::optional tryFlipJoinType(core::JoinType joinType) { // Returns a plan with flipped join sides of the input hash join node. If the // join type doesn't allow flipping, returns a nullptr. core::PlanNodePtr tryFlipJoinSides(const core::HashJoinNode& joinNode) { + // Null-aware right semi project join doesn't support filter. + if (joinNode.filter() && + joinNode.joinType() == core::JoinType::kLeftSemiProject && + joinNode.isNullAware()) { + return nullptr; + } auto flippedJoinType = tryFlipJoinType(joinNode.joinType()); if (!flippedJoinType.has_value()) { return nullptr; @@ -688,7 +700,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); auto plan = PlanBuilder(planNodeIdGenerator) @@ -697,7 +710,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( probeKeys, buildKeys, PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - /*filter=*/"", + filter, outputColumns, joinType, nullAware) @@ -714,7 +727,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; @@ -728,7 +742,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( .tableScan(buildType) .capturePlanNodeId(buildScanId) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType, nullAware) @@ -819,7 +833,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); return JoinFuzzer::PlanWithSplits{PlanBuilder(planNodeIdGenerator) .values(probeInput) @@ -831,7 +846,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( .values(buildInput) .orderBy(buildKeys, false) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType) .planNode()}; @@ -844,10 +859,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - bool withFilter) { + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); - const std::string filter = - withFilter ? makeJoinFilter(probeKeys, buildKeys) : ""; return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .values(probeInput) @@ -863,7 +876,8 @@ void JoinFuzzer::makeAlternativePlans( const core::PlanNodePtr& plan, const std::vector& probeInput, const std::vector& buildInput, - std::vector& plans) { + std::vector& plans, + const std::string& filter) { auto joinNode = std::dynamic_pointer_cast(plan); VELOX_CHECK_NOT_NULL(joinNode); @@ -888,7 +902,7 @@ void JoinFuzzer::makeAlternativePlans( .localPartitionRoundRobin( makeSources(buildInput, planNodeIdGenerator)) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType, joinNode->isNullAware()) @@ -897,7 +911,13 @@ void JoinFuzzer::makeAlternativePlans( // Use OrderBy + MergeJoin if (core::MergeJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeMergeJoinPlan( - joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns); + joinType, + probeKeys, + buildKeys, + probeInput, + buildInput, + outputColumns, + filter); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits.plan, plans); @@ -905,8 +925,18 @@ void JoinFuzzer::makeAlternativePlans( // Use NestedLoopJoin. if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { + std::string joinCondition = filter.empty() + ? makeJoinFilter(probeKeys, buildKeys) + : fmt::format( + "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); auto planWithSplits = makeNestedLoopJoinPlan( - joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns); + joinType, + probeKeys, + buildKeys, + probeInput, + buildInput, + outputColumns, + joinCondition); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits.plan, plans); @@ -957,7 +987,7 @@ RowVectorPtr JoinFuzzer::testCrossProduct( probeInput, buildInput, outputColumns, - /*withFilter*/ false); + /*filter=*/""); const auto expected = execute(plan, /*injectSpill=*/false); // If OOM injection is not enabled verify the results against Reference query @@ -992,7 +1022,7 @@ RowVectorPtr JoinFuzzer::testCrossProduct( probeScanSplits, buildScanSplits, outputColumns, - /*withFilter*/ false)); + /*filter=*/"")); } addFlippedJoinPlan(plan.plan, altPlans); @@ -1014,7 +1044,26 @@ void JoinFuzzer::verify(core::JoinType joinType) { const auto numKeys = nullAware ? 1 : randInt(1, 5); // Pick number and types of join keys. - const std::vector keyTypes = generateJoinKeyTypes(numKeys); + std::vector keyTypes = generateJoinKeyTypes(numKeys); + std::string filter; + // Add boolean/integer semi-filter 10% of the time. + if (vectorFuzzer_.coinToss(0.1)) { + // Null-aware joins allow only one join key. + if (nullAware) { + keyTypes.clear(); + } + if (vectorFuzzer_.coinToss(0.5)) { + keyTypes.push_back(BOOLEAN()); + filter = vectorFuzzer_.coinToss(0.5) + ? fmt::format("t{} = true", keyTypes.size() - 1) + : fmt::format("u{} = true", keyTypes.size() - 1); + } else { + keyTypes.push_back(INTEGER()); + filter = vectorFuzzer_.coinToss(0.5) + ? fmt::format("t{} % {} = 0", keyTypes.size() - 1, randInt(1, 9)) + : fmt::format("u{} % {} = 0", keyTypes.size() - 1, randInt(1, 9)); + } + } std::vector probeKeys = makeNames("t", keyTypes.size()); std::vector buildKeys = makeNames("u", keyTypes.size()); @@ -1094,7 +1143,8 @@ void JoinFuzzer::verify(core::JoinType joinType) { buildKeys, probeInput, buildInput, - outputColumns); + outputColumns, + filter); const auto expected = execute(defaultPlan, /*injectSpill=*/false); @@ -1110,7 +1160,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { {expected}), "Velox and Reference results don't match"); - LOG(INFO) << "Result matches with referenc DB."; + LOG(INFO) << "Result matches with reference DB."; stats_.numVerified++; } } @@ -1123,11 +1173,13 @@ void JoinFuzzer::verify(core::JoinType joinType) { buildKeys, flatProbeInput, flatBuildInput, - outputColumns)); + outputColumns, + filter)); - makeAlternativePlans(defaultPlan.plan, probeInput, buildInput, altPlans); makeAlternativePlans( - defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans); + defaultPlan.plan, probeInput, buildInput, altPlans, filter); + makeAlternativePlans( + defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans, filter); addPlansWithTableScan( tableScanDir->getPath(), @@ -1138,7 +1190,8 @@ void JoinFuzzer::verify(core::JoinType joinType) { flatProbeInput, flatBuildInput, outputColumns, - altPlans); + altPlans, + filter); for (auto i = 0; i < altPlans.size(); ++i) { LOG(INFO) << "Testing plan #" << i; @@ -1153,7 +1206,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { } if (FLAGS_enable_spill) { - // Spilling for right semi project doesn't work yet. + // Spilling for right semi project doesn't work yet.ƒverify() if (auto hashJoin = std::dynamic_pointer_cast( altPlans[i].plan)) { if (hashJoin->isRightSemiProjectJoin()) { @@ -1190,7 +1243,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan( const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; @@ -1208,7 +1262,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan( .capturePlanNodeId(buildScanId) .orderBy(buildKeys, false) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType) .planNode(), @@ -1226,13 +1280,11 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlanWithTableScan( const std::vector& probeSplits, const std::vector& buildSplits, const std::vector& outputColumns, - bool withFilter) { + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - const std::string filter = - withFilter ? makeJoinFilter(probeKeys, buildKeys) : ""; return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .tableScan(probeType) @@ -1260,7 +1312,8 @@ void JoinFuzzer::addPlansWithTableScan( const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - std::vector& altPlans) { + std::vector& altPlans, + const std::string& filter) { VELOX_CHECK(!tableDir.empty()); if (!isTableScanSupported(probeInput[0]->type()) || @@ -1286,7 +1339,8 @@ void JoinFuzzer::addPlansWithTableScan( buildKeys, probeScanSplits, buildScanSplits, - outputColumns); + outputColumns, + filter); plansWithTableScan.push_back(defaultPlan); auto joinNode = @@ -1336,7 +1390,8 @@ void JoinFuzzer::addPlansWithTableScan( buildKeys, probeScanSplits, buildScanSplits, - outputColumns); + outputColumns, + filter); altPlans.push_back(planWithSplits); addFlippedJoinPlan( @@ -1350,6 +1405,10 @@ void JoinFuzzer::addPlansWithTableScan( // Add ungrouped NestedLoopJoin with TableScan. if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { + std::string joinCondition = filter.empty() + ? makeJoinFilter(probeKeys, buildKeys) + : fmt::format( + "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); auto planWithSplits = makeNestedLoopJoinPlanWithTableScan( joinType, probeType, @@ -1358,7 +1417,8 @@ void JoinFuzzer::addPlansWithTableScan( buildKeys, probeScanSplits, buildScanSplits, - outputColumns); + outputColumns, + joinCondition); altPlans.push_back(planWithSplits); addFlippedJoinPlan(