Skip to content

Commit

Permalink
fix(interactive): Support Nested Map in Compiler & Runtime to Facilit…
Browse files Browse the repository at this point in the history
…ate Queries like `select('a', 'b').by(valueMap())` (#3839)

Co-authored-by: BingqingLyu <[email protected]>
Co-authored-by: xiaolei.zl <[email protected]>
Co-authored-by: Longbin Lai <[email protected]>
  • Loading branch information
4 people authored Jun 7, 2024
1 parent 8bd72c3 commit 5590f02
Show file tree
Hide file tree
Showing 14 changed files with 359 additions and 94 deletions.
14 changes: 10 additions & 4 deletions flex/codegen/src/hqps/hqps_project_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,16 @@ std::string project_key_values_to_string(
auto& key_value = mappings[i];
auto& key = key_value.key();
CHECK(key.item_case() == common::Value::kStr);
auto& value = key_value.value();
auto key_value_str = project_key_value_to_string(ctx, key.str(), value);
if (!key_value_str.empty()) {
key_value_strs.emplace_back(key_value_str);
if (key_value.has_val()) {
auto& value = key_value.val();
auto key_value_str = project_key_value_to_string(ctx, key.str(), value);
if (!key_value_str.empty()) {
key_value_strs.emplace_back(key_value_str);
}
} else if (key_value.has_nested()) {
LOG(FATAL) << "Nested key value not supported yet";
} else {
LOG(FATAL) << "Unknown key value type";
}
}

Expand Down
2 changes: 1 addition & 1 deletion flex/engines/hqps_db/core/operator/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ void template_set_key_value(results::KeyValues* map,
for (auto& kv : key_value) {
auto cur_kv = map->add_key_values();
cur_kv->mutable_key()->set_str(kv.first);
auto value = cur_kv->mutable_value();
auto value = cur_kv->mutable_element();
set_any_to_element(kv.second, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,24 @@ private OuterExpression.Expression visitMapValueConstructor(RexCall call) {
key instanceof RexLiteral,
"key type of 'MAP_VALUE_CONSTRUCTOR' should be 'literal', but is "
+ key.getClass());
Preconditions.checkArgument(
value instanceof RexGraphVariable,
"value type of 'MAP_VALUE_CONSTRUCTOR' should be 'variable', but is "
+ value.getClass());
varMapBuilder.addKeyVals(
OuterExpression.ExprOpr valueOpr = value.accept(this).getOperators(0);
OuterExpression.VariableKeyValue.Builder keyValueBuilder =
OuterExpression.VariableKeyValue.newBuilder()
.setKey(key.accept(this).getOperators(0).getConst())
.setValue(value.accept(this).getOperators(0).getVar())
.build());
.setKey(key.accept(this).getOperators(0).getConst());
switch (valueOpr.getItemCase()) {
case VAR:
keyValueBuilder.setVal(valueOpr.getVar());
break;
case MAP:
keyValueBuilder.setNested(valueOpr.getMap());
break;
default:
throw new IllegalArgumentException(
"can not convert value ["
+ value
+ "] of 'MAP_VALUE_CONSTRUCTOR' to physical plan");
}
varMapBuilder.addKeyVals(keyValueBuilder);
}
return OuterExpression.Expression.newBuilder()
.addOperators(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alibaba.graphscope.common.result.Utils;
import com.alibaba.graphscope.gaia.proto.Common;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.alibaba.graphscope.gremlin.exception.GremlinResultParserException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -119,6 +120,30 @@ protected AnyValue parseEntry(IrResult.Entry entry, @Nullable RelDataType dataTy
}
}

protected AnyValue parseElement(IrResult.KeyValues.KeyValue value, RelDataType type) {
switch (value.getValueCase()) {
case ELEMENT:
return parseElement(value.getElement(), type);
case NESTED:
if (type instanceof ArbitraryMapType) {
return parseKeyValues(
value.getNested(),
((ArbitraryMapType) type).getKeyTypes(),
((ArbitraryMapType) type).getValueTypes());
} else {
return parseKeyValues(
value.getNested(), type.getKeyType(), type.getValueType());
}
default:
throw new GremlinResultParserException(
"keyValue ["
+ value
+ "] has invalid value type ["
+ value.getValueCase()
+ "]");
}
}

protected AnyValue parseElement(IrResult.Element element, @Nullable RelDataType dataType) {
switch (element.getInnerCase()) {
case VERTEX:
Expand Down Expand Up @@ -195,9 +220,7 @@ protected AnyValue parseKeyValues(
.getKeyValuesList()
.forEach(
entry -> {
valueMap.put(
entry.getKey().getStr(),
parseElement(entry.getValue(), valueType));
valueMap.put(entry.getKey().getStr(), parseElement(entry, valueType));
});
return VirtualValues.fromMap(valueMap, valueMap.size(), 0);
}
Expand All @@ -216,8 +239,7 @@ protected AnyValue parseKeyValues(
Map<String, AnyValue> valueMap = Maps.newLinkedHashMap();
for (int i = 0; i < entries.size(); ++i) {
IrResult.KeyValues.KeyValue entry = entries.get(i);
valueMap.put(
entry.getKey().getStr(), parseElement(entry.getValue(), valueTypes.get(i)));
valueMap.put(entry.getKey().getStr(), parseElement(entry, valueTypes.get(i)));
}
return VirtualValues.fromMap(valueMap, valueMap.size(), 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,27 @@ public static Object parseEntry(IrResult.Entry entry) {
k -> {
valueMap.put(
ParserUtils.parseCommonValue(k.getKey()),
ParserUtils.parseElement(k.getValue()));
ParserUtils.parseElement(k));
});
return valueMap;
default:
throw new GremlinResultParserException("invalid " + entry.getInnerCase().name());
}
}

public static Object parseElement(IrResult.KeyValues.KeyValue value) {
switch (value.getValueCase()) {
case ELEMENT:
return parseElement(value.getElement());
case NESTED:
return parseEntry(IrResult.Entry.newBuilder().setMap(value.getNested()).build());
default:
throw new GremlinResultParserException(
"keyValue ["
+ value
+ "] has invalid value type ["
+ value.getValueCase()
+ "]");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private Map<Object, Object> parseKeyValues(
.getKeyValuesList()
.forEach(
entry -> {
Object value = parseElement(entry.getValue(), valueType);
Object value = parseElement(entry, valueType);
if (value != null) {
valueMap.put(parseMapKey(entry.getKey().getStr()), value);
}
Expand All @@ -183,7 +183,7 @@ private Map<Object, Object> parseKeyValues(
Map<Object, Object> valueMap = Maps.newLinkedHashMap();
for (int i = 0; i < entries.size(); ++i) {
IrResult.KeyValues.KeyValue entry = entries.get(i);
Object value = parseElement(entry.getValue(), valueTypes.get(i));
Object value = parseElement(entry, valueTypes.get(i));
if (value != null) {
valueMap.put(parseMapKey(entry.getKey().getStr()), value);
}
Expand Down Expand Up @@ -219,6 +219,30 @@ private Object parseElement(IrResult.Element element, RelDataType type) {
}
}

private Object parseElement(IrResult.KeyValues.KeyValue value, RelDataType type) {
switch (value.getValueCase()) {
case ELEMENT:
return parseElement(value.getElement(), type);
case NESTED:
if (type instanceof ArbitraryMapType) {
return parseKeyValues(
value.getNested(),
((ArbitraryMapType) type).getKeyTypes(),
((ArbitraryMapType) type).getValueTypes());
} else {
return parseKeyValues(
value.getNested(), type.getKeyType(), type.getValueType());
}
default:
throw new GremlinResultParserException(
"keyValue ["
+ value
+ "] has invalid value type ["
+ value.getValueCase()
+ "]");
}
}

private Vertex parseVertex(IrResult.Vertex vertex, RelDataType type) {
return new DetachedVertex(
vertex.getId(),
Expand Down
9 changes: 8 additions & 1 deletion interactive_engine/executor/ir/core/src/plan/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,14 @@ fn preprocess_expression(
common_pb::expr_opr::Item::Map(key_values) => {
for key_val in &mut key_values.key_vals {
if let Some(value) = key_val.value.as_mut() {
preprocess_var(value, meta, plan_meta, false)?;
match value {
common_pb::variable_key_value::Value::Val(val) => {
preprocess_var(val, meta, plan_meta, false)?;
}
common_pb::variable_key_value::Value::Nested(_) => {
return Err(IrError::Unsupported("nested value in Map".to_string()));
}
}
}
}
count = 0;
Expand Down
46 changes: 27 additions & 19 deletions interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,32 @@ impl TryFrom<common_pb::Variable> for Operand {
}
}

impl TryFrom<common_pb::VariableKeyValues> for Operand {
type Error = ParsePbError;

fn try_from(key_vals: common_pb::VariableKeyValues) -> Result<Self, Self::Error> {
let mut vec = Vec::with_capacity(key_vals.key_vals.len());
for key_val in key_vals.key_vals {
let (_key, _value) = (key_val.key, key_val.value);
let key = if let Some(key) = _key {
Object::try_from(key)?
} else {
return Err(ParsePbError::from("empty key provided in Map"));
};
let value = if let Some(value) = _value {
match value {
common_pb::variable_key_value::Value::Val(val) => Operand::try_from(val)?,
common_pb::variable_key_value::Value::Nested(nested) => Operand::try_from(nested)?,
}
} else {
return Err(ParsePbError::from("empty value provided in Map"));
};
vec.push((key, value));
}
Ok(Self::Map(vec))
}
}

impl TryFrom<common_pb::ExprOpr> for Operand {
type Error = ParsePbError;

Expand All @@ -605,25 +631,7 @@ impl TryFrom<common_pb::ExprOpr> for Operand {
}
Ok(Self::VarMap(vec))
}

Map(key_vals) => {
let mut vec = Vec::with_capacity(key_vals.key_vals.len());
for key_val in key_vals.key_vals {
let (_key, _value) = (key_val.key, key_val.value);
let key = if let Some(key) = _key {
Object::try_from(key)?
} else {
return Err(ParsePbError::from("empty key provided in Map"));
};
let value = if let Some(value) = _value {
Operand::try_from(value)?
} else {
return Err(ParsePbError::from("empty value provided in Map"));
};
vec.push((key, value));
}
Ok(Self::Map(vec))
}
Map(key_vals) => Operand::try_from(key_vals),
_ => Err(ParsePbError::ParseError("invalid operators for an Operand".to_string())),
}
} else {
Expand Down
11 changes: 9 additions & 2 deletions interactive_engine/executor/ir/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,16 @@ message VariableKeys {

message VariableKeyValue {
common.Value key = 1;
Variable value = 2;
oneof value {
Variable val = 2;
VariableKeyValues nested = 3;
}
}

// A nested kv projection, which is used to project a nested structure of key-value pairs
// e.g., to project [value(a.name), value(a.age)], or project [key_value('name', a.name), key_value('age', a.age)],
// or more complex nested projection,
// e.g., to project [key_value('tagA', nested{[key_value('name', a.name), key_value('age', a.age)]}), key_value('tagB', nested{key_value(b.name), key_value(b.age)})]
message VariableKeyValues {
repeated VariableKeyValue key_vals = 1;
}
Expand Down Expand Up @@ -203,7 +210,7 @@ message ExprOpr {
DynamicParam param = 9;
Case case = 10;
Extract extract = 11;
// TODO: the new definition for var_map, that allows user-given key name. Will remove the old var_map soon.
// TODO: the new definition for var_map, that allows user-given key name, and nested maps. Will remove the old var_map finally.
VariableKeyValues map = 13;
TimeInterval time_interval = 14;
DateTimeMinus date_time_minus = 15;
Expand Down
5 changes: 4 additions & 1 deletion interactive_engine/executor/ir/proto/results.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ message Collection {
message KeyValues {
message KeyValue {
common.Value key = 1;
Element value = 2;
oneof value {
Element element = 2;
KeyValues nested = 3;
}
}
repeated KeyValue key_values = 1;
}
Expand Down
26 changes: 15 additions & 11 deletions interactive_engine/executor/ir/runtime/src/process/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,17 +597,21 @@ impl TryFrom<result_pb::Entry> for DynEntry {
let mut map = BTreeMap::new();
for key_val in kv.key_values {
let key = key_val.key.unwrap();
let val_inner = key_val.value.unwrap().inner.unwrap();
// currently, convert kv into Object::KV
let key_obj: Object = Object::try_from(key)?;
let val_obj: Object = match val_inner {
result_pb::element::Inner::Object(obj) => Object::try_from(obj)?,
_ => Err(ParsePbError::Unsupported(format!(
"unsupported kvs value inner {:?}",
val_inner,
)))?,
};
map.insert(key_obj, val_obj);
let value_inner = key_val.value.unwrap();
match value_inner {
result_pb::key_values::key_value::Value::Element(val) => {
let val_inner = val.inner.unwrap();
let val_obj: Object = match val_inner {
result_pb::element::Inner::Object(obj) => Object::try_from(obj)?,
_ => Err(ParsePbError::Unsupported(format!(
"unsupported kvs value inner {:?}",
val_inner,
)))?,
};
map.insert(Object::try_from(key)?, val_obj);
}
result_pb::key_values::key_value::Value::Nested(_) => todo!(),
}
}
Ok(DynEntry::new(Object::KV(map)))
}
Expand Down
Loading

0 comments on commit 5590f02

Please sign in to comment.