Skip to content

Commit

Permalink
refactor arrow reader to parquet reader in MetadataTest.cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
jkhaliqi committed Nov 15, 2024
1 parent 7437093 commit 8aa9724
Show file tree
Hide file tree
Showing 9 changed files with 847 additions and 41 deletions.
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/Metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ ColumnChunkMetaDataPtr::getColumnStatistics(
thriftColumnChunkPtr(ptr_)->meta_data.statistics, *type, numRows);
};

thrift::ColumnMetaData ColumnChunkMetaDataPtr::getColumnMetadata() {
VELOX_CHECK(hasStatistics());
return thriftColumnChunkPtr(ptr_)->meta_data;
}

int64_t ColumnChunkMetaDataPtr::dataPageOffset() const {
return thriftColumnChunkPtr(ptr_)->meta_data.data_page_offset;
}
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/parquet/reader/Metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/dwio/common/Statistics.h"
#include "velox/dwio/common/compression/Compression.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"

namespace facebook::velox::parquet {

Expand All @@ -42,6 +43,8 @@ class ColumnChunkMetaDataPtr {
const TypePtr type,
int64_t numRows);

thrift::ColumnMetaData getColumnMetadata();

/// Number of values.
int64_t numValues() const;

Expand Down
42 changes: 42 additions & 0 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <thrift/protocol/TCompactProtocol.h> //@manual

#include <arrow/io/api.h>
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"
#include "velox/dwio/parquet/reader/StructColumnReader.h"
#include "velox/dwio/parquet/thrift/ThriftTransport.h"
Expand Down Expand Up @@ -1166,4 +1167,45 @@ FileMetaDataPtr ParquetReader::fileMetaData() const {
return readerBase_->fileMetaData();
}

const thrift::FileMetaData& ParquetReader::thriftFileMetaData() const {
return readerBase_->thriftFileMetaData();
}

void WriteBufferToFile(
const std::shared_ptr<arrow::Buffer>& buffer,
const std::string& file_path) {
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
::arrow::Result<std::shared_ptr<::arrow::io::FileOutputStream>>
file_out_result = ::arrow::io::FileOutputStream::Open(file_path);
if (!file_out_result.ok()) {
std::cerr << "Error opening file for writing: "
<< file_out_result.status().ToString() << std::endl;
return;
}
std::shared_ptr<::arrow::io::FileOutputStream> file_out =
file_out_result.ValueOrDie();
const int64_t buffer_size = buffer->size();
int64_t bytes_written = 0;
while (bytes_written < buffer_size) {
const int64_t chunk_size = std::min(
static_cast<int64_t>(1024 * 1024), buffer_size - bytes_written);
std::shared_ptr<arrow::Buffer> chunk;
auto read_result = source->Read(chunk_size);
if (!read_result.ok()) {
std::cerr << "Error reading from buffer: "
<< read_result.status().ToString() << std::endl;
return;
}
chunk = read_result.ValueOrDie();
auto status = file_out->Write(chunk->data(), chunk->size());
if (!status.ok()) {
std::cerr << "Error writing to file: " << status.ToString() << std::endl;
return;
}
bytes_written += chunk->size();
}
std::cout << "Buffer successfully written to " << file_path
<< std::endl; // delete this...
}

} // namespace facebook::velox::parquet
7 changes: 7 additions & 0 deletions velox/dwio/parquet/reader/ParquetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <arrow/io/api.h>
#include "velox/dwio/common/Reader.h"
#include "velox/dwio/common/ReaderFactory.h"
#include "velox/dwio/parquet/reader/Metadata.h"
Expand Down Expand Up @@ -106,6 +107,8 @@ class ParquetReader : public dwio::common::Reader {

FileMetaDataPtr fileMetaData() const;

const thrift::FileMetaData& thriftFileMetaData() const;

private:
std::shared_ptr<ReaderBase> readerBase_;
};
Expand All @@ -125,4 +128,8 @@ void registerParquetReaderFactory();

void unregisterParquetReaderFactory();

void WriteBufferToFile(
const std::shared_ptr<arrow::Buffer>& buffer,
const std::string& file_path);

} // namespace facebook::velox::parquet
22 changes: 22 additions & 0 deletions velox/dwio/parquet/tests/writer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,25 @@ target_link_libraries(
${TEST_LINK_LIBS}
GTest::gtest
fmt::fmt)

add_executable(velox_dwio_parquet_writer_test
MetadataTest.cpp StatisticsTest.cpp FileSerializeTest.cpp)

add_test(
NAME velox_dwio_parquet_writer_test
COMMAND velox_dwio_parquet_writer_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})

target_link_libraries(
velox_dwio_parquet_writer_test
velox_dwio_arrow_parquet_writer_test_lib
GTest::gmock
GTest::gtest
GTest::gtest_main
arrow
arrow_testing
velox_dwio_native_parquet_reader
velox_temp_path)

# target_include_directories( velox_dwio_parquet_writer_test PRIVATE
# ${CMAKE_SOURCE_DIR}/velox/dwio/parquet/reader)
Loading

0 comments on commit 8aa9724

Please sign in to comment.