Skip to content

Commit

Permalink
fix(interactive): Support Join of Path Expand in Compiler & Runtime (#…
Browse files Browse the repository at this point in the history
…3747)

This PR mainly supports `join` of path expand, and the main features
implemented on the compiler side include:
1. Splitting path_expand and tagging the split left and right paths with
different aliases respectively.
2. After completing the path join, in order to ensure the consistency of
the data model with the original path, an `ARRAY_CONCAT` operation is
added to connect the left and right paths, and the path after `CONCAT`
is tagged with the original alias.
3. Add end-to-end tests (gremlin & cypher) for st-path to verify the
correctness of the compiler and runtime functionality implementations.
---------

Co-authored-by: bingqing.lbq <[email protected]>
Co-authored-by: Zhang Lei <[email protected]>
Co-authored-by: Longbin Lai <[email protected]>
  • Loading branch information
4 people authored May 10, 2024
1 parent ee675e1 commit 7fa1d09
Show file tree
Hide file tree
Showing 26 changed files with 1,255 additions and 71 deletions.
2 changes: 2 additions & 0 deletions flex/codegen/src/hqps/hqps_path_expand_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ std::string result_opt_pb_2_str(
return "gs::ResultOpt::EndV";
case physical::PathExpand::ResultOpt::PathExpand_ResultOpt_ALL_V:
return "gs::ResultOpt::AllV";
case physical::PathExpand::ResultOpt::PathExpand_ResultOpt_ALL_V_E:
return "gs::ResultOpt::AllVE";
default:
throw std::runtime_error("unknown result_opt_pb");
}
Expand Down
5 changes: 3 additions & 2 deletions flex/engines/hqps_db/core/params.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,9 @@ enum PathOpt {
};

enum ResultOpt {
EndV = 0, // Get the end vertex of path. i.e. [3],[4]
AllV = 1, // Get all the vertex on path. i.e. [1,2,3],[1,2,4]
EndV = 0, // Get the end vertex of path. i.e. [3],[4]
AllV = 1, // Get all the vertex on path. i.e. [1,2,3],[1,2,4]
AllVE = 2, // Get all the vertex and edge on path. i.e. [1,2,3,4]
};

enum Interval {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.rex.RexVariable;
import org.apache.calcite.sql.fun.SqlLibraryOperators;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.commons.lang3.ObjectUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -470,7 +471,9 @@ public RelNode visit(GraphExtendIntersect intersect) {
Map<DataKey, DataValue> edgeDetails = getGlogueEdgeDetails(glogueEdge);
this.details =
createSubDetails(
glogueEdge.getSrcPattern(), glogueEdge.getSrcToTargetOrderMapping());
glogueEdge.getSrcPattern(),
glogueEdge.getSrcToTargetOrderMapping(),
null);
ExtendStep extendStep = glogueEdge.getExtendStep();
List<ExtendEdge> extendEdges = extendStep.getExtendEdges();
RelNode child = visitChildren(intersect).getInput(0);
Expand Down Expand Up @@ -525,9 +528,15 @@ public RelNode visit(GraphJoinDecomposition decomposition) {
Map<DataKey, DataValue> parentVertexDetails =
getJointVertexDetails(jointVertices, buildOrderMap);
Map<DataKey, DataValue> probeDetails =
createSubDetails(decomposition.getProbePattern(), probeOrderMap);
createSubDetails(
decomposition.getProbePattern(),
probeOrderMap,
new ParentPattern(decomposition.getParentPatten(), 0));
Map<DataKey, DataValue> buildDetails =
createSubDetails(decomposition.getBuildPattern(), buildOrderMap);
createSubDetails(
decomposition.getBuildPattern(),
buildOrderMap,
new ParentPattern(decomposition.getParentPatten(), 1));
this.details = probeDetails;
RelNode newLeft = visitChild(decomposition, 0, decomposition.getLeft()).getInput(0);
this.details = buildDetails;
Expand All @@ -541,10 +550,84 @@ public RelNode visit(GraphJoinDecomposition decomposition) {
buildOrderMap,
decomposition.getBuildPattern());
// here we assume all inputs of the join come from different sources
return builder.push(newLeft)
.push(newRight)
.join(JoinRelType.INNER, joinCondition)
.build();
builder.push(newLeft).push(newRight).join(JoinRelType.INNER, joinCondition);
// Handling special cases in decomposition:
// When a join splitting is based on path expand, to ensure the consistency of the data
// model with the original semantics,
// we perform a concat operation on the left path and right path after the path join,
// that is, merging them back into the original, un-split path.
List<RexNode> concatExprs = Lists.newArrayList();
List<String> concatAliases = Lists.newArrayList();
jointVertices.forEach(
joint -> {
PatternVertex probeJointVertex =
decomposition
.getProbePattern()
.getVertexByOrder(joint.getLeftOrderId());
PatternVertex buildJointVertex =
decomposition
.getBuildPattern()
.getVertexByOrder(joint.getRightOrderId());
Set<PatternEdge> probeEdges =
decomposition.getProbePattern().getEdgesOf(probeJointVertex);
Set<PatternEdge> buildEdges =
decomposition.getBuildPattern().getEdgesOf(buildJointVertex);
if (probeEdges.size() == 1 && buildEdges.size() == 1) {
PatternEdge probeEdge = probeEdges.iterator().next();
DataValue probeValue =
probeDetails.get(
new EdgeDataKey(
decomposition
.getProbePattern()
.getVertexOrder(
probeEdge.getSrcVertex()),
decomposition
.getProbePattern()
.getVertexOrder(
probeEdge.getDstVertex()),
probeEdge.isBoth()
? PatternDirection.BOTH
: PatternDirection.OUT));
PatternEdge buildEdge = buildEdges.iterator().next();
DataValue buildValue =
buildDetails.get(
new EdgeDataKey(
decomposition
.getBuildPattern()
.getVertexOrder(
buildEdge.getSrcVertex()),
decomposition
.getBuildPattern()
.getVertexOrder(
buildEdge.getDstVertex()),
buildEdge.isBoth()
? PatternDirection.BOTH
: PatternDirection.OUT));
if (probeValue != null
&& probeValue.getParentAlias() != null
&& buildValue != null
&& buildValue.getParentAlias() != null
&& probeValue
.getParentAlias()
.equals(buildValue.getParentAlias())) {
String probeAlias = probeValue.getAlias();
String buildAlias = buildValue.getAlias();
concatExprs.add(
builder.call(
SqlLibraryOperators.ARRAY_CONCAT,
builder.variable(probeAlias),
builder.variable(buildAlias)));
concatAliases.add(probeValue.getParentAlias());
}
}
});
if (!concatExprs.isEmpty()) {
// TODO(yihe.zxl): there are additional optimization opportunities here by employing
// projects with append=false, the left path and right path prior to merging can be
// removed.
builder.project(concatExprs, concatAliases, true);
}
return builder.build();
}

private RexNode createJoinFilter(
Expand Down Expand Up @@ -909,8 +992,20 @@ private Map<DataKey, DataValue> getGlogueEdgeDetails(GlogueExtendIntersectEdge e
return edgeDetails;
}

private class ParentPattern {
private final Pattern pattern;
private final int subId;

public ParentPattern(Pattern pattern, int subId) {
this.pattern = pattern;
this.subId = subId;
}
}

private Map<DataKey, DataValue> createSubDetails(
Pattern subPattern, Map<Integer, Integer> orderMappings) {
Pattern subPattern,
Map<Integer, Integer> orderMappings,
@Nullable ParentPattern parentPattern) {
Map<DataKey, DataValue> newDetails = Maps.newHashMap();
subPattern
.getVertexSet()
Expand All @@ -933,19 +1028,58 @@ private Map<DataKey, DataValue> createSubDetails(
int newDstOrderId = subPattern.getVertexOrder(k.getDstVertex());
Integer oldSrcOrderId = orderMappings.get(newSrcOrderId);
Integer oldDstOrderId = orderMappings.get(newDstOrderId);
PatternDirection direction =
k.isBoth() ? PatternDirection.BOTH : PatternDirection.OUT;
EdgeDataKey oldKey = null;
boolean splitPathExpand = false;
if (oldSrcOrderId != null && oldDstOrderId != null) {
PatternDirection direction =
k.isBoth()
? PatternDirection.BOTH
: PatternDirection.OUT;
EdgeDataKey oldKey =
oldKey =
new EdgeDataKey(
oldSrcOrderId, oldDstOrderId, direction);
EdgeDataKey newKey =
new EdgeDataKey(
newSrcOrderId, newDstOrderId, direction);
} else if (parentPattern != null) {
// here we use a hack way to find the original edge key in the
// parent pattern for the split path expand,
// in <JoinDecompositionRule>, we guarantee the split edge id
// consistent with the parent's, here we just use the split edge
// id to find the original edge in the parent pattern.
Pattern pattern = parentPattern.pattern;
for (PatternEdge edge : pattern.getEdgeSet()) {
if (k.getId() == edge.getId()) {
oldKey =
new EdgeDataKey(
pattern.getVertexOrder(
edge.getSrcVertex()),
pattern.getVertexOrder(
edge.getDstVertex()),
direction);
splitPathExpand = true;
break;
}
}
}
if (oldKey != null) {
DataValue value = details.get(oldKey);
if (value != null) {
EdgeDataKey newKey =
new EdgeDataKey(
newSrcOrderId, newDstOrderId, direction);
if (splitPathExpand
&& !AliasInference.isDefaultAlias(
value.getAlias())) {
// assign a new alias tagged with subId for the split
// path expand, i.e. if the original path expand is
// '[a:KNOWS*6..7]',
// after splitting, we get left split path expand
// '[a$p_0:KNOWS*3..4]', and right split path expand
// '[a$p_1:KNOWS*3..4]',
value =
new DataValue(
value.getAlias()
+ "$p_"
+ parentPattern.subId,
value.getFilter(),
value.getAlias());
}
newDetails.put(newKey, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private List<GraphJoinDecomposition> initDecompositions(GraphPattern pattern) {
new GraphJoinDecomposition(
pattern.getCluster(),
pattern.getTraitSet(),
pattern.getPattern(),
probePattern,
buildPattern,
Lists.newArrayList(
Expand Down Expand Up @@ -268,6 +269,7 @@ private List<GraphJoinDecomposition> getDecompositions(
return new GraphJoinDecomposition(
parent.getCluster(),
parent.getTraitSet(),
parent.getParentPatten(),
probeClone,
buildClone,
newJointVertices,
Expand Down Expand Up @@ -372,22 +374,26 @@ private PatternEdge createNewEdge(
PatternVertex newSrc,
PatternVertex newDst,
PathExpandRange newRange) {
int randomId = UUID.randomUUID().hashCode();
// Here, by setting the ID of the split edge to the same as the original edge,
// the intention is to enable finding the details info of the previous edge.
// This way, the final <GraphLogicalPathExpand> operator can include alias and filter
// details.
int newEdgeId = oldEdge.getId();
ElementDetails newDetails =
new ElementDetails(oldEdge.getElementDetails().getSelectivity(), newRange);
return (oldEdge instanceof SinglePatternEdge)
? new SinglePatternEdge(
newSrc,
newDst,
oldEdge.getEdgeTypeIds().get(0),
randomId,
newEdgeId,
oldEdge.isBoth(),
newDetails)
: new FuzzyPatternEdge(
newSrc,
newDst,
oldEdge.getEdgeTypeIds(),
randomId,
newEdgeId,
oldEdge.isBoth(),
newDetails);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@
public class DataValue {
private final String alias;
private final @Nullable RexNode filter;
// indicate whether the value corresponds to a split path expand, the parentAlias here records
// the alias
// associated with the original path expand before the splitting.
private final @Nullable String parentAlias;

public DataValue(String alias, RexNode filter) {
this(alias, filter, null);
}

public DataValue(String alias, RexNode filter, String parentAlias) {
this.alias = alias;
this.filter = filter;
this.parentAlias = parentAlias;
}

public String getAlias() {
Expand All @@ -36,6 +45,10 @@ public String getAlias() {
return filter;
}

public @Nullable String getParentAlias() {
return parentAlias;
}

@Override
public String toString() {
return "DataValue{" + "alias='" + alias + '\'' + ", filter=" + filter + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,23 @@
public class GraphJoinDecomposition extends BiRel {
private final List<JoinVertexPair> joinVertexPairs;
private final OrderMappings orderMappings;
private final Pattern parentPatten;
private final Pattern probePattern;
private final Pattern buildPattern;
private final JoinRelType joinType;

public GraphJoinDecomposition(
RelOptCluster cluster,
RelTraitSet traitSet,
Pattern parentPattern,
Pattern probePattern,
Pattern buildPattern,
List<JoinVertexPair> joinVertexPairs,
OrderMappings orderMappings) {
this(
cluster,
traitSet,
parentPattern,
probePattern,
buildPattern,
joinVertexPairs,
Expand All @@ -59,6 +62,7 @@ public GraphJoinDecomposition(
public GraphJoinDecomposition(
RelOptCluster cluster,
RelTraitSet traitSet,
Pattern parentPattern,
Pattern probePattern,
Pattern buildPattern,
List<JoinVertexPair> joinVertexPairs,
Expand All @@ -67,6 +71,7 @@ public GraphJoinDecomposition(
this(
cluster,
traitSet,
parentPattern,
new GraphPattern(cluster, traitSet, probePattern),
probePattern,
new GraphPattern(cluster, traitSet, buildPattern),
Expand All @@ -79,6 +84,7 @@ public GraphJoinDecomposition(
protected GraphJoinDecomposition(
RelOptCluster cluster,
RelTraitSet traitSet,
Pattern parentPattern,
RelNode left,
Pattern leftPattern,
RelNode right,
Expand All @@ -87,6 +93,7 @@ protected GraphJoinDecomposition(
OrderMappings orderMappings,
JoinRelType joinType) {
super(cluster, traitSet, left, right);
this.parentPatten = parentPattern;
this.joinVertexPairs = joinVertexPairs;
this.orderMappings = orderMappings;
this.probePattern = leftPattern;
Expand All @@ -102,6 +109,10 @@ public OrderMappings getOrderMappings() {
return orderMappings;
}

public Pattern getParentPatten() {
return parentPatten;
}

public Pattern getProbePattern() {
return probePattern;
}
Expand Down Expand Up @@ -182,6 +193,7 @@ public GraphJoinDecomposition copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new GraphJoinDecomposition(
getCluster(),
traitSet,
parentPatten,
inputs.get(0),
this.probePattern,
inputs.get(1),
Expand Down
Loading

0 comments on commit 7fa1d09

Please sign in to comment.