From eac837497bcfa6d99acd10a2af2e1184e7feed1e Mon Sep 17 00:00:00 2001 From: Ke Date: Fri, 27 Dec 2024 16:26:37 -0800 Subject: [PATCH] feat: Add TextWriter --- velox/connectors/hive/CMakeLists.txt | 1 + velox/connectors/hive/HiveConnectorUtil.cpp | 5 +- velox/dwio/CMakeLists.txt | 1 + velox/dwio/text/CMakeLists.txt | 23 ++ velox/dwio/text/RegisterTextWriter.cpp | 29 +++ velox/dwio/text/RegisterTextWriter.h | 25 +++ velox/dwio/text/tests/CMakeLists.txt | 26 +++ .../tests/writer/BufferedWriterSinkTest.cpp | 75 +++++++ velox/dwio/text/tests/writer/CMakeLists.txt | 32 +++ .../dwio/text/tests/writer/FileReaderUtil.cpp | 58 +++++ velox/dwio/text/tests/writer/FileReaderUtil.h | 25 +++ .../dwio/text/tests/writer/TextWriterTest.cpp | 172 +++++++++++++++ velox/dwio/text/writer/BufferedWriterSink.cpp | 85 +++++++ velox/dwio/text/writer/BufferedWriterSink.h | 55 +++++ velox/dwio/text/writer/CMakeLists.txt | 24 ++ velox/dwio/text/writer/TextWriter.cpp | 207 ++++++++++++++++++ velox/dwio/text/writer/TextWriter.h | 102 +++++++++ 17 files changed, 944 insertions(+), 1 deletion(-) create mode 100644 velox/dwio/text/CMakeLists.txt create mode 100644 velox/dwio/text/RegisterTextWriter.cpp create mode 100644 velox/dwio/text/RegisterTextWriter.h create mode 100644 velox/dwio/text/tests/CMakeLists.txt create mode 100644 velox/dwio/text/tests/writer/BufferedWriterSinkTest.cpp create mode 100644 velox/dwio/text/tests/writer/CMakeLists.txt create mode 100644 velox/dwio/text/tests/writer/FileReaderUtil.cpp create mode 100644 velox/dwio/text/tests/writer/FileReaderUtil.h create mode 100644 velox/dwio/text/tests/writer/TextWriterTest.cpp create mode 100644 velox/dwio/text/writer/BufferedWriterSink.cpp create mode 100644 velox/dwio/text/writer/BufferedWriterSink.h create mode 100644 velox/dwio/text/writer/CMakeLists.txt create mode 100644 velox/dwio/text/writer/TextWriter.cpp create mode 100644 velox/dwio/text/writer/TextWriter.h diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 2e137dbd6c293..87234e4732597 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -44,6 +44,7 @@ velox_link_libraries( velox_dwio_orc_reader velox_dwio_parquet_reader velox_dwio_parquet_writer + velox_dwio_text_writer_register velox_file velox_hive_partition_function velox_type_tz diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index dcbe91146a697..60c200d1d7c52 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -1049,8 +1049,11 @@ void updateWriterOptionsFromHiveConfig( case dwio::common::FileFormat::NIMBLE: // No-op for now. break; + case dwio::common::FileFormat::TEXT: + // No-op for now. + break; default: - VELOX_UNSUPPORTED("{}", fileFormat); + VELOX_UNSUPPORTED("Unsupported file format: {}", fileFormat); } } diff --git a/velox/dwio/CMakeLists.txt b/velox/dwio/CMakeLists.txt index efcb3c06bebe3..d4a879e8f6803 100644 --- a/velox/dwio/CMakeLists.txt +++ b/velox/dwio/CMakeLists.txt @@ -35,3 +35,4 @@ add_subdirectory(catalog) add_subdirectory(dwrf) add_subdirectory(orc) add_subdirectory(parquet) +add_subdirectory(text) diff --git a/velox/dwio/text/CMakeLists.txt b/velox/dwio/text/CMakeLists.txt new file mode 100644 index 0000000000000..844a12ffd6012 --- /dev/null +++ b/velox/dwio/text/CMakeLists.txt @@ -0,0 +1,23 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if(${VELOX_BUILD_TESTING}) + add_subdirectory(tests) +endif() + +add_subdirectory(writer) + +velox_add_library(velox_dwio_text_writer_register RegisterTextWriter.cpp) + +velox_link_libraries(velox_dwio_text_writer_register velox_dwio_text_writer) diff --git a/velox/dwio/text/RegisterTextWriter.cpp b/velox/dwio/text/RegisterTextWriter.cpp new file mode 100644 index 0000000000000..6ecbfdd864cdd --- /dev/null +++ b/velox/dwio/text/RegisterTextWriter.cpp @@ -0,0 +1,29 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/writer/TextWriter.h" + +namespace facebook::velox::text { + +void registerTextWriterFactory() { + dwio::common::registerWriterFactory(std::make_shared()); +} + +void unregisterTextWriterFactory() { + dwio::common::unregisterWriterFactory(dwio::common::FileFormat::TEXT); +} + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/RegisterTextWriter.h b/velox/dwio/text/RegisterTextWriter.h new file mode 100644 index 0000000000000..08ac312d74704 --- /dev/null +++ b/velox/dwio/text/RegisterTextWriter.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::text { + +void registerTextWriterFactory(); + +void unregisterTextWriterFactory(); + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/tests/CMakeLists.txt b/velox/dwio/text/tests/CMakeLists.txt new file mode 100644 index 0000000000000..34a05424d3662 --- /dev/null +++ b/velox/dwio/text/tests/CMakeLists.txt @@ -0,0 +1,26 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set(TEST_LINK_LIBS + velox_dwio_common_test_utils + velox_vector_test_lib + velox_exec_test_lib + velox_temp_path + GTest::gtest + GTest::gtest_main + GTest::gmock + gflags::gflags + glog::glog) + +add_subdirectory(writer) diff --git a/velox/dwio/text/tests/writer/BufferedWriterSinkTest.cpp b/velox/dwio/text/tests/writer/BufferedWriterSinkTest.cpp new file mode 100644 index 0000000000000..c308108af5b73 --- /dev/null +++ b/velox/dwio/text/tests/writer/BufferedWriterSinkTest.cpp @@ -0,0 +1,75 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "velox/common/base/Fs.h" +#include "velox/common/file/FileSystems.h" +#include "velox/dwio/text/tests/writer/FileReaderUtil.h" +#include "velox/dwio/text/writer/TextWriter.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +namespace facebook::velox::text { + +class BufferedWriterSinkTest : public testing::Test, + public velox::test::VectorTestBase { + public: + void SetUp() override { + velox::filesystems::registerLocalFileSystem(); + dwio::common::LocalFileSink::registerFactory(); + rootPool_ = memory::memoryManager()->addRootPool("BufferedWriterSinkTest"); + leafPool_ = rootPool_->addLeafChild("BufferedWriterSinkTest"); + tempPath_ = exec::test::TempDirectoryPath::create(); + } + + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; + std::shared_ptr tempPath_; +}; + +TEST_F(BufferedWriterSinkTest, write) { + auto filePath = fs::path( + fmt::format("{}/test_buffered_writer.txt", tempPath_->getPath())); + auto sink = std::make_unique( + filePath, dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto bufferedWriterSink = std::make_unique( + std::move(sink), rootPool_->addLeafChild("bufferedWriterSinkTest"), 15); + bufferedWriterSink->write("hello world", 10); + bufferedWriterSink->write("this is writer", 10); + bufferedWriterSink->close(); + std::string result = readFile(filePath); + EXPECT_EQ(result.size(), 20); +} + +TEST_F(BufferedWriterSinkTest, abort) { + auto filePath = + fs::path(fmt::format("{}/test_buffered_abort.txt", tempPath_->getPath())); + auto sink = std::make_unique( + filePath, dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto bufferedWriterSink = std::make_unique( + std::move(sink), rootPool_->addLeafChild("bufferedWriterSinkTest"), 15); + bufferedWriterSink->write("hello world", 10); + bufferedWriterSink->write("this is writer", 10); + bufferedWriterSink->abort(); + std::string result = readFile(filePath); + EXPECT_EQ(result.size(), 10); +} +} // namespace facebook::velox::text diff --git a/velox/dwio/text/tests/writer/CMakeLists.txt b/velox/dwio/text/tests/writer/CMakeLists.txt new file mode 100644 index 0000000000000..ec3b2127b132a --- /dev/null +++ b/velox/dwio/text/tests/writer/CMakeLists.txt @@ -0,0 +1,32 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_text_writer_test + TextWriterTest.cpp BufferedWriterSinkTest.cpp FileReaderUtil.cpp) + +add_test( + NAME velox_text_writer_test + COMMAND velox_text_writer_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) + +target_link_libraries( + velox_text_writer_test + velox_dwio_text_writer + velox_dwio_common_test_utils + velox_link_libs + Boost::regex + Folly::folly + ${TEST_LINK_LIBS} + GTest::gtest + fmt::fmt) diff --git a/velox/dwio/text/tests/writer/FileReaderUtil.cpp b/velox/dwio/text/tests/writer/FileReaderUtil.cpp new file mode 100644 index 0000000000000..db5a62f964984 --- /dev/null +++ b/velox/dwio/text/tests/writer/FileReaderUtil.cpp @@ -0,0 +1,58 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/tests/writer/FileReaderUtil.h" + +namespace facebook::velox::text { + +std::string readFile(const std::string& name) { + std::ifstream file(name); + std::string line; + + std::stringstream ss; + while (std::getline(file, line)) { + ss << line; + } + return ss.str(); +} + +std::vector> parseTextFile(const std::string& name) { + std::ifstream file(name); + std::string line; + std::vector> table; + + while (std::getline(file, line)) { + std::vector row = splitTextLine(line, TextFileTraits::kSOH); + table.push_back(row); + } + return table; +} + +std::vector splitTextLine(const std::string& str, char delimiter) { + std::vector result; + std::size_t start = 0; + std::size_t end = str.find(delimiter); + + while (end != std::string::npos) { + result.push_back(str.substr(start, end - start)); + start = end + 1; + end = str.find(delimiter, start); + } + + result.push_back(str.substr(start)); // Add the last part + return result; +} +} // namespace facebook::velox::text diff --git a/velox/dwio/text/tests/writer/FileReaderUtil.h b/velox/dwio/text/tests/writer/FileReaderUtil.h new file mode 100644 index 0000000000000..42113c1a4bcdf --- /dev/null +++ b/velox/dwio/text/tests/writer/FileReaderUtil.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/Fs.h" +#include "velox/common/file/FileSystems.h" +#include "velox/dwio/text/writer/TextWriter.h" + +namespace facebook::velox::text { +std::string readFile(const std::string& name); +std::vector> parseTextFile(const std::string& name); +std::vector splitTextLine(const std::string& str, char delimiter); +} // namespace facebook::velox::text diff --git a/velox/dwio/text/tests/writer/TextWriterTest.cpp b/velox/dwio/text/tests/writer/TextWriterTest.cpp new file mode 100644 index 0000000000000..93652560ce28e --- /dev/null +++ b/velox/dwio/text/tests/writer/TextWriterTest.cpp @@ -0,0 +1,172 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/writer/TextWriter.h" +#include +#include "velox/common/base/Fs.h" +#include "velox/common/file/FileSystems.h" +#include "velox/dwio/text/tests/writer/FileReaderUtil.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +namespace facebook::velox::text { +// TODO: add fuzzer test once text reader is move in OSS +class TextWriterTest : public testing::Test, + public velox::test::VectorTestBase { + public: + void SetUp() override { + velox::filesystems::registerLocalFileSystem(); + dwio::common::LocalFileSink::registerFactory(); + rootPool_ = memory::memoryManager()->addRootPool("TextWriterTests"); + leafPool_ = rootPool_->addLeafChild("TextWriterTests"); + tempPath_ = exec::test::TempDirectoryPath::create(); + } + + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + constexpr static float kInf = std::numeric_limits::infinity(); + constexpr static double kNaN = std::numeric_limits::quiet_NaN(); + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; + std::shared_ptr tempPath_; +}; + +TEST_F(TextWriterTest, write) { + auto schema = + ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"}, + {BOOLEAN(), + TINYINT(), + SMALLINT(), + INTEGER(), + BIGINT(), + REAL(), + DOUBLE(), + TIMESTAMP(), + VARCHAR(), + VARBINARY()}); + auto data = makeRowVector( + {"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"}, + { + makeConstant(true, 3), + makeFlatVector({1, 2, 3}), + makeFlatVector({1, 2, 3}), // TODO null + makeFlatVector({1, 2, 3}), + makeFlatVector({1, 2, 3}), + makeFlatVector({1.1, kInf, 3.1}), + makeFlatVector({1.1, kNaN, 3.1}), + makeFlatVector( + 3, [](auto i) { return Timestamp(i, i * 1'000'000); }), + makeFlatVector({"hello", "world", "cpp"}, VARCHAR()), + makeFlatVector({"hello", "world", "cpp"}, VARBINARY()), + }); + + WriterOptions writerOptions; + writerOptions.memoryPool = rootPool_.get(); + auto filePath = + fs::path(fmt::format("{}/test_text_writer.txt", tempPath_->getPath())); + auto sink = std::make_unique( + filePath, dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto writer = std::make_unique( + schema, + std::move(sink), + std::make_shared(writerOptions)); + writer->write(data); + writer->close(); + + std::vector> result = parseTextFile(filePath); + EXPECT_EQ(result.size(), 3); + EXPECT_EQ(result[0].size(), 10); + // bool type + EXPECT_EQ(result[0][0], "true"); + EXPECT_EQ(result[1][0], "true"); + EXPECT_EQ(result[2][0], "true"); + + // tinyint + EXPECT_EQ(result[0][1], "1"); + EXPECT_EQ(result[1][1], "2"); + EXPECT_EQ(result[2][1], "3"); + + // smallint + EXPECT_EQ(result[0][2], "1"); + EXPECT_EQ(result[1][2], "2"); + EXPECT_EQ(result[2][2], "3"); + + // int + EXPECT_EQ(result[0][3], "1"); + EXPECT_EQ(result[1][3], "2"); + EXPECT_EQ(result[2][3], "3"); + + // bigint + EXPECT_EQ(result[0][4], "1"); + EXPECT_EQ(result[1][4], "2"); + EXPECT_EQ(result[2][4], "3"); + + // float + EXPECT_EQ(result[0][5], "1.100000"); + EXPECT_EQ(result[1][5], "Infinity"); + EXPECT_EQ(result[2][5], "3.100000"); + + // double + EXPECT_EQ(result[0][6], "1.100000"); + EXPECT_EQ(result[1][6], "NaN"); + EXPECT_EQ(result[2][6], "3.100000"); + + // timestamp + EXPECT_EQ(result[0][7], "1970-01-01 00:00:00.000"); + EXPECT_EQ(result[1][7], "1970-01-01 00:00:01.001"); + EXPECT_EQ(result[2][7], "1970-01-01 00:00:02.002"); + + // varchar + EXPECT_EQ(result[0][8], "hello"); + EXPECT_EQ(result[1][8], "world"); + EXPECT_EQ(result[2][8], "cpp"); + + // varbinary + EXPECT_EQ(result[0][9], "aGVsbG8="); + EXPECT_EQ(result[1][9], "d29ybGQ="); + EXPECT_EQ(result[2][9], "Y3Bw"); +} + +TEST_F(TextWriterTest, abort) { + auto schema = ROW({"c0", "c1"}, {BIGINT(), BOOLEAN()}); + auto data = makeRowVector( + {"c0", "c1"}, + { + makeFlatVector({1, 2, 3}), + makeConstant(true, 3), + }); + + WriterOptions writerOptions; + writerOptions.memoryPool = rootPool_.get(); + writerOptions.defaultFlushCount = 10; + auto filePath = fs::path( + fmt::format("{}/test_text_writer_abort.txt", tempPath_->getPath())); + auto sink = std::make_unique( + filePath, dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto writer = std::make_unique( + schema, + std::move(sink), + std::make_shared(writerOptions)); + writer->write(data); + writer->abort(); + + std::string result = readFile(filePath); + EXPECT_EQ(result.size(), 8); +} +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/BufferedWriterSink.cpp b/velox/dwio/text/writer/BufferedWriterSink.cpp new file mode 100644 index 0000000000000..fb8e2ecd5fc99 --- /dev/null +++ b/velox/dwio/text/writer/BufferedWriterSink.cpp @@ -0,0 +1,85 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/writer/BufferedWriterSink.h" + +namespace facebook::velox::text { + +BufferedWriterSink::BufferedWriterSink( + std::unique_ptr sink, + std::shared_ptr pool, + uint64_t flushBufferSize) + : sink_(std::move(sink)), + pool_(std::move(pool)), + flushBufferSize_(flushBufferSize), + buf_ (std::make_unique>(*pool_)) { + reserveBuffer(); +} + +BufferedWriterSink::~BufferedWriterSink() { + VELOX_CHECK_EQ( + buf_->size(), + 0, + "Unexpected buffer data on BufferedWriterSink destruction"); +} + +void BufferedWriterSink::write(char value) { + write(&value, 1); +} + +void BufferedWriterSink::write(const char* data, uint64_t size) { + // TODO Add logic for when size is larger than flushCount_ + VELOX_CHECK_GE( + flushBufferSize_, + size, + "write data size exceeds flush buffer size limit"); + + if (buf_->size() + size > flushBufferSize_) { + flush(); + } + buf_->append(buf_->size(), data, size); +} + +void BufferedWriterSink::flush() { + if (buf_->size() == 0) { + return; + } + + sink_->write(std::move(*buf_)); + reserveBuffer(); +} + +void BufferedWriterSink::abort() { + // TODO Add a flag to indicate sink is aborted to + // prevent flush and write operations after aborted. + buf_->clear(); + sink_->close(); +} + +void BufferedWriterSink::close() { + flush(); + buf_->clear(); + sink_->close(); +} + +void BufferedWriterSink::reserveBuffer() { + VELOX_CHECK_NOT_NULL(buf_); + VELOX_CHECK_EQ(buf_->size(), 0); + VELOX_CHECK_EQ(buf_->capacity(), 0); + buf_->reserve(flushBufferSize_); +} + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/BufferedWriterSink.h b/velox/dwio/text/writer/BufferedWriterSink.h new file mode 100644 index 0000000000000..a725806983a4e --- /dev/null +++ b/velox/dwio/text/writer/BufferedWriterSink.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/common/FileSink.h" + +namespace facebook::velox::text { + +/// Takes character(s) and writes into a 'sink'. +/// It buffers the characters(s) in memory before flushing to the sink. +/// The upper limit character count is specified by 'flushBufferSize'. +class BufferedWriterSink { + public: + BufferedWriterSink( + std::unique_ptr sink, + std::shared_ptr pool, + uint64_t flushBufferSize); + + ~BufferedWriterSink(); + + void write(char value); + void write(const char* data, uint64_t size); + void flush(); + /// Discard the data in buffer and close the buffer and fileSink. + void abort(); + /// Flush the data in buffer to fileSink and close the buffer and fileSink. + void close(); + + private: + void reserveBuffer(); + + const std::unique_ptr sink_; + const std::shared_ptr pool_; + // The buffer size limit and triggers flush if exceeds this limit. + const uint64_t flushBufferSize_; + const std::unique_ptr> buf_; + // TODO: add a flag to indicate sink is aborted to prevent flush and write + // operations after aborted +}; + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/CMakeLists.txt b/velox/dwio/text/writer/CMakeLists.txt new file mode 100644 index 0000000000000..ebfb78c629055 --- /dev/null +++ b/velox/dwio/text/writer/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +velox_add_library(velox_dwio_text_writer TextWriter.cpp BufferedWriterSink.cpp) + +velox_link_libraries( + velox_dwio_text_writer + velox_dwio_arrow_parquet_writer_lib + velox_dwio_arrow_parquet_writer_util_lib + velox_dwio_common + velox_arrow_bridge + arrow + fmt::fmt) diff --git a/velox/dwio/text/writer/TextWriter.cpp b/velox/dwio/text/writer/TextWriter.cpp new file mode 100644 index 0000000000000..2b3c79ab9a90e --- /dev/null +++ b/velox/dwio/text/writer/TextWriter.cpp @@ -0,0 +1,207 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/writer/TextWriter.h" + +#include +#include "velox/common/base/Pointers.h" +#include "velox/common/encode/Base64.h" +#include "velox/exec/MemoryReclaimer.h" + +namespace facebook::velox::text { +template +std::optional toTextStr(T val) { + return std::optional(std::to_string(val)); +} + +template <> +std::optional toTextStr(bool val) { + return val ? std::optional("true") : std::optional("false"); +} + +template <> +std::optional toTextStr(float val) { + if (std::isnan(val)) { + return std::optional("NaN"); + } else if (std::isinf(val)) { + return std::optional("Infinity"); + } else { + return {std::to_string(val)}; + } +} + +template <> +std::optional toTextStr(double val) { + if (std::isnan(val)) { + return std::optional("NaN"); + } else if (std::isinf(val)) { + return std::optional("Infinity"); + } else { + return {std::to_string(val)}; + } +} + +template <> +std::optional toTextStr(Timestamp val) { + TimestampToStringOptions options; + options.dateTimeSeparator = ' '; + options.precision = TimestampPrecision::kMilliseconds; + return {val.toString(options)}; +} + +TextWriter::TextWriter( + RowTypePtr schema, + std::unique_ptr sink, + const std::shared_ptr& options) + : schema_(std::move(schema)), + bufferedWriterSink_(std::make_unique( + std::move(sink), + options->memoryPool->addLeafChild(fmt::format( + "{}.text_writer_node.{}", + options->memoryPool->name(), + folly::to(folly::Random::rand64()))), + options->defaultFlushCount)) {} + +void TextWriter::write(const VectorPtr& data) { + VELOX_CHECK_EQ( + data->encoding(), + VectorEncoding::Simple::ROW, + "Text writer expects row vector input"); + VELOX_CHECK( + data->type()->equivalent(*schema_), + "The file schema type should be equal with the input row vector type."); + const RowVector* dataRowVector = data->as(); + + std::vector> decodedColumnVectors; + const auto numColumns = dataRowVector->childrenSize(); + for (size_t column = 0; column < numColumns; ++column) { + auto decodedColumnVector = std::make_shared(DecodedVector( + *dataRowVector->childAt(column), + SelectivityVector(dataRowVector->size()))); + decodedColumnVectors.push_back(std::move(decodedColumnVector)); + } + + for (vector_size_t row = 0; row < data->size(); ++row) { + for (size_t column = 0; column < numColumns; ++column) { + if (column != 0) { + bufferedWriterSink_->write(TextFileTraits::kSOH); + } + writeCellValue( + decodedColumnVectors.at(column), schema_->childAt(column), row); + } + bufferedWriterSink_->write(TextFileTraits::kNewLine); + } +} + +void TextWriter::flush() { + bufferedWriterSink_->flush(); +} + +void TextWriter::close() { + bufferedWriterSink_->close(); +} + +void TextWriter::abort() { + bufferedWriterSink_->abort(); +} + +void TextWriter::writeCellValue( + const std::shared_ptr& decodedColumnVector, + const TypePtr& type, + vector_size_t row) { + std::optional dataStr; + std::optional dataSV; + + if (decodedColumnVector->isNullAt(row)) { + bufferedWriterSink_->write( + TextFileTraits::kNullData.data(), TextFileTraits::kNullData.length()); + return; + } + switch (type->kind()) { + case TypeKind::BOOLEAN: + dataStr = + toTextStr(folly::to(decodedColumnVector->valueAt(row))); + break; + case TypeKind::TINYINT: + dataStr = toTextStr(decodedColumnVector->valueAt(row)); + break; + case TypeKind::SMALLINT: + dataStr = toTextStr(decodedColumnVector->valueAt(row)); + break; + case TypeKind::INTEGER: + dataStr = toTextStr(decodedColumnVector->valueAt(row)); + break; + case TypeKind::BIGINT: + dataStr = toTextStr(decodedColumnVector->valueAt(row)); + break; + case TypeKind::REAL: + dataStr = toTextStr(decodedColumnVector->valueAt(row)); + break; + case TypeKind::DOUBLE: + dataStr = toTextStr(decodedColumnVector->valueAt(row)); + break; + case TypeKind::TIMESTAMP: + dataStr = toTextStr(decodedColumnVector->valueAt(row)); + break; + case TypeKind::VARCHAR: + dataSV = std::optional(decodedColumnVector->valueAt(row)); + break; + case TypeKind::VARBINARY: { + auto data = decodedColumnVector->valueAt(row); + dataStr = + std::optional(encoding::Base64::encode(data.data(), data.size())); + break; + } + // TODO Add support for complex types + case TypeKind::ARRAY: + [[fallthrough]]; + case TypeKind::MAP: + [[fallthrough]]; + case TypeKind::ROW: + [[fallthrough]]; + case TypeKind::UNKNOWN: + [[fallthrough]]; + default: + VELOX_NYI("{} is not supported yet in TextWriter", type->kind()); + } + + if (dataStr.has_value()) { + VELOX_CHECK(!dataSV.has_value()); + bufferedWriterSink_->write( + dataStr.value().data(), dataStr.value().length()); + return; + } + + VELOX_CHECK(dataSV.has_value()); + bufferedWriterSink_->write(dataSV.value().data(), dataSV.value().size()); +} + +std::unique_ptr TextWriterFactory::createWriter( + std::unique_ptr sink, + const std::shared_ptr& options) { + auto textOptions = std::dynamic_pointer_cast(options); + VELOX_CHECK_NOT_NULL( + textOptions, "Text writer factory expected a Text WriterOptions object."); + return std::make_unique( + asRowType(options->schema), std::move(sink), textOptions); +} + +std::unique_ptr +TextWriterFactory::createWriterOptions() { + return std::make_unique(); +} + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/TextWriter.h b/velox/dwio/text/writer/TextWriter.h new file mode 100644 index 0000000000000..f355933ab3add --- /dev/null +++ b/velox/dwio/text/writer/TextWriter.h @@ -0,0 +1,102 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/compression/Compression.h" +#include "velox/common/config/Config.h" +#include "velox/dwio/common/DataBuffer.h" +#include "velox/dwio/common/FileSink.h" +#include "velox/dwio/common/FlushPolicy.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/common/Writer.h" +#include "velox/dwio/common/WriterFactory.h" +#include "velox/dwio/text/writer/BufferedWriterSink.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::text { + +struct WriterOptions : public dwio::common::WriterOptions { + int64_t defaultFlushCount = 10 << 10; +}; + +// TODO: move to a separate file to be shared with text reader once it is in oss +class TextFileTraits { + public: + //// The following constants defined the delimiters used by TextFile format. + /// Each row is separated by 'kNewLine'. + /// Each column is separated by 'kSOH' within each row. + + /// String for null data. + static inline const std::string kNullData = "\\N"; + + /// Delimiter between columns. + static const char kSOH = '\x01'; + + /// Delimiter between rows. + static const char kNewLine = '\n'; +}; + +/// Encodes Velox vectors in TextFormat and writes into a FileSink. +class TextWriter : public dwio::common::Writer { + public: + /// Constructs a writer with output to a 'sink'. + /// @param schema specifies the file's overall schema, and it is always + /// non-null. + /// @param sink output sink + /// @param options writer options + TextWriter( + RowTypePtr schema, + std::unique_ptr sink, + const std::shared_ptr& options); + + ~TextWriter() override = default; + + void write(const VectorPtr& data) override; + + void flush() override; + + bool finish() override { + close(); + return true; + } + + void close() override; + + void abort() override; + + private: + void writeCellValue( + const std::shared_ptr& decodedColumnVector, + const TypePtr& type, + vector_size_t row); + + const RowTypePtr schema_; + const std::unique_ptr bufferedWriterSink_; +}; + +class TextWriterFactory : public dwio::common::WriterFactory { + public: + TextWriterFactory() : WriterFactory(dwio::common::FileFormat::TEXT) {} + + std::unique_ptr createWriter( + std::unique_ptr sink, + const std::shared_ptr& options) override; + + std::unique_ptr createWriterOptions() override; +}; + +} // namespace facebook::velox::text