Skip to content

Commit

Permalink
feat(interactive): Add support for loading graph with odps table as d…
Browse files Browse the repository at this point in the history
…ata source (#3305)

- Implement `ODPSFragmentLoader` to support loading vertex/edge from
odps table: `odps://project/table_name`
- As `ODPSFragmentLoader` and `CSVFragmentLoader` all read raw data into
`arrow::table`, we extract the common part to form an
`AbstractArrowFragmentLoader`, which provides the following interface.
```c++
class IRecordBatchSupplier {
 public:
  // Will be called until GetNextBatch() returns NullPtr.
  virtual std::shared_ptr<arrow::RecordBatch> GetNextBatch() = 0;
};

class AbstractArrowFragmentLoader : public IFragmentLoader {
  void AddVerticesRecordBatch(
      label_t v_label_id, const std::vector<std::string>& input_paths,
      std::function<std::shared_ptr<IRecordBatchSupplier>(
          label_t, const std::string&, const LoadingConfig&)>
          supplier_creator);

  void AddEdgesRecordBatch(
      label_t src_label_id, label_t dst_label_id, label_t edge_label_id,
      const std::vector<std::string>& input_paths,
      std::function<std::shared_ptr<IRecordBatchSupplier>(
          label_t, label_t, label_t, const std::string&, const LoadingConfig&)>
          supplier_creator);
};
```

The `ODPSFragmentLoader` and `CSVFragmentLoader` just inherit this
abstract class and call `AddVerticesRecordBatch` and
`AddEdgesRecordBatch` with lambda function indicate how to procedure
`RecordBatch` from each `input_path`. For `CSVFragmentLoader` we
procedure RecordBatches with arrow readers; For `ODPSFragmentLoader`, we
produce RecordBatches with ODPSReadClient.

- For customized FragmentLoader, use can specify the path to lib via
`FLEX_OTHER_LOADERS` and call `Register()` function when customized
FragmentLoader class is initialized.
For example, builtin `CSVFragmentLoader` is registered for `scheme=file`
and `format=csv`.
```c++
const bool CSVFragmentLoader::registered_ = LoaderFactory::Register(
    "file", "csv",
    static_cast<LoaderFactory::loader_initializer_t>(&CSVFragmentLoader::Make));
```

- ODPS related code is placed at `flex/third_party/odps/include`. These
code will be opensourced by odps team but not yet ready, we just copy to
here to make our `ODPSFragmentLoader` works. After the `odps-cpp-sdk` is
opensourced, we will replace the dependency with a git submodule.


Fix #3396
  • Loading branch information
zhanglei1949 authored Dec 5, 2023
1 parent 3c7ace2 commit ac31a91
Show file tree
Hide file tree
Showing 29 changed files with 6,941 additions and 916 deletions.
8 changes: 7 additions & 1 deletion docs/flex/interactive/data_import.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

In our guide on [using custom graph data](./custom_graph_data.md), we introduced the basics of importing graph data using a simple YAML configuration. This section delves deeper, providing a thorough exploration of the extensive configuration options available for data import.

## Supported data source

Currently we only support import data to graph from local `csv` files or `odps` table. See configuration `loading_config.data_source.scheme`.

## Sample Configuration for the "Modern" Graph

To illustrate, let's examine the `examples/modern_import_full.yaml` file. This configuration is designed for importing the "modern" graph and showcases the full range of configuration possibilities. We'll dissect each configuration item in the sections that follow.

``` yaml
loading_config:
data_source:
scheme: file
location: /home/modern_graph/
format:
metadata:
Expand Down Expand Up @@ -101,7 +106,8 @@ The table below offers a detailed breakdown of each configuration item. In this
| -------- | -------- | -------- |-------- |
| **loading_config** | N/A | Loading configurations | Yes |
| loading_config.data_source | N/A | Place that maintains the raw data | Yes |
|loading_config.data_source.location | N/A | Path to the data source in the container, which must be mapped from the host machine while intializing the service | Yes
| loading_config.data_source.location | N/A | Path to the data source in the container, which must be mapped from the host machine while intializing the service | Yes
| loading_config.scheme | file | The source of input data. Currently only `file` and `odps` are supported | No |
| loading_config.format | N/A | The format of the raw data in CSV | Yes |
| loading_config.format.metadata | N/A | Mainly for configuring the options for reading CSV | Yes |
| loading_config.format.metadata.delimiter | '\|' | Delimiter used to split a row of data | Yes |
Expand Down
5 changes: 5 additions & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ option(BUILD_HQPS "Whether to build HighQPS Engine" ON)
option(BUILD_TEST "Whether to build test" ON)
option(BUILD_DOC "Whether to build doc" ON)
option(USE_MMAPALLOC "Whether to use mmap allocator." OFF)
option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" ON)

include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../)

Expand Down Expand Up @@ -104,6 +105,10 @@ if (NOT EXISTS ${CMAKE_SOURCE_DIR}/third_party/nlohmann-json/single_include/nloh
endif()
include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/third_party/nlohmann-json/single_include)

if (BUILD_ODPS_FRAGMENT_LOADER)
include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/third_party/odps/include)
endif()

add_subdirectory(utils)
add_subdirectory(codegen)
add_subdirectory(storages)
Expand Down
2 changes: 2 additions & 0 deletions flex/engines/graph_db/grin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ endif ()

# include nlohmann json
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../../../third_party/nlohmann-json/single_include)
# include odps cpp sdk
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../../../third_party/odps/include/)

find_package(yaml-cpp REQUIRED)
include_directories(SYSTEM ${yaml-cpp_INCLUDE_DIRS})
Expand Down
9 changes: 8 additions & 1 deletion flex/storages/rt_mutable_graph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
find_package(yaml-cpp REQUIRED)
include_directories(SYSTEM ${yaml-cpp_INCLUDE_DIRS})

file(GLOB_RECURSE RT_MUTABLE_GRAPH_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
if (BUILD_ODPS_FRAGMENT_LOADER)
file(GLOB_RECURSE RT_MUTABLE_GRAPH_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
else()
# exclude odps_fragment_loader.cc
message(STATUS "exclude odps_fragment_loader.cc")
file(GLOB_RECURSE RT_MUTABLE_GRAPH_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
list(REMOVE_ITEM RT_MUTABLE_GRAPH_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/loader/odps_fragment_loader.cc")
endif()
add_library(flex_rt_mutable_graph SHARED ${RT_MUTABLE_GRAPH_SRC_FILES})
target_link_libraries(flex_rt_mutable_graph ${LIBGRAPELITE_LIBRARIES} ${YAML_CPP_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})

Expand Down
Loading

0 comments on commit ac31a91

Please sign in to comment.