Skip to content

Commit

Permalink
refactor(parquet): Use velox parquet reader in StatisticsTest and Met…
Browse files Browse the repository at this point in the history
…adatatest
  • Loading branch information
jkhaliqi committed Nov 20, 2024
1 parent c13b8ed commit d5f7423
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 70 deletions.
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/Metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ ColumnChunkMetaDataPtr::getColumnStatistics(
thriftColumnChunkPtr(ptr_)->meta_data.statistics, *type, numRows);
};

thrift::ColumnMetaData ColumnChunkMetaDataPtr::getColumnMetadata() {
VELOX_CHECK(hasStatistics());
return thriftColumnChunkPtr(ptr_)->meta_data;
}

int64_t ColumnChunkMetaDataPtr::dataPageOffset() const {
return thriftColumnChunkPtr(ptr_)->meta_data.data_page_offset;
}
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/parquet/reader/Metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/dwio/common/Statistics.h"
#include "velox/dwio/common/compression/Compression.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"

namespace facebook::velox::parquet {

Expand All @@ -42,6 +43,8 @@ class ColumnChunkMetaDataPtr {
const TypePtr type,
int64_t numRows);

thrift::ColumnMetaData getColumnMetadata();

/// Number of values.
int64_t numValues() const;

Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1166,4 +1166,8 @@ FileMetaDataPtr ParquetReader::fileMetaData() const {
return readerBase_->fileMetaData();
}

const thrift::FileMetaData& ParquetReader::thriftFileMetaData() const {
return readerBase_->thriftFileMetaData();
}

} // namespace facebook::velox::parquet
2 changes: 2 additions & 0 deletions velox/dwio/parquet/reader/ParquetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class ParquetReader : public dwio::common::Reader {

FileMetaDataPtr fileMetaData() const;

const thrift::FileMetaData& thriftFileMetaData() const;

private:
std::shared_ptr<ReaderBase> readerBase_;
};
Expand Down
19 changes: 19 additions & 0 deletions velox/dwio/parquet/tests/writer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,22 @@ target_link_libraries(
${TEST_LINK_LIBS}
GTest::gtest
fmt::fmt)

add_executable(velox_dwio_parquet_writer_test MetadataTest.cpp
StatisticsTest.cpp)

add_test(
NAME velox_dwio_parquet_writer_test
COMMAND velox_dwio_parquet_writer_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})

target_link_libraries(
velox_dwio_parquet_writer_test
velox_dwio_arrow_parquet_writer_test_lib
GTest::gmock
GTest::gtest
GTest::gtest_main
arrow
arrow_testing
velox_dwio_native_parquet_reader
velox_temp_path)
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
#include "velox/dwio/parquet/writer/arrow/Statistics.h"
#include "velox/dwio/parquet/writer/arrow/ThriftInternal.h"
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/tests/FileReader.h"
#include "velox/dwio/parquet/writer/arrow/tests/TestUtil.h"

#include "velox/dwio/parquet/reader/ParquetReader.h"
#include "velox/exec/tests/utils/TempFilePath.h"

#include <arrow/io/api.h>

namespace facebook::velox::parquet::arrow {
namespace metadata {

Expand Down Expand Up @@ -365,6 +369,7 @@ TEST(Metadata, TestKeyValueMetadata) {
}

TEST(Metadata, TestAddKeyValueMetadata) {
memory::MemoryManager::testingSetInstance({});
schema::NodeVector fields;
fields.push_back(schema::Int32("int_col", Repetition::REQUIRED));
auto schema = std::static_pointer_cast<schema::GroupNode>(
Expand Down Expand Up @@ -394,21 +399,46 @@ TEST(Metadata, TestAddKeyValueMetadata) {
file_writer->AddKeyValueMetadata(kv_meta_ignored), ParquetException);

PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);

ASSERT_NE(nullptr, file_reader->metadata());
ASSERT_NE(nullptr, file_reader->metadata()->key_value_metadata());
auto read_kv_meta = file_reader->metadata()->key_value_metadata();
// Write the buffer to a temp file
auto tempFile = exec::test::TempFilePath::create();
auto file_path = tempFile->getPath();
auto local_write_file =
std::make_unique<LocalWriteFile>(file_path, false, false);
auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(buffer);
auto buffer_to_string = buffer_reader->buffer()->ToString();
local_write_file->append(buffer_to_string);
local_write_file->close();
std::shared_ptr<facebook::velox::memory::MemoryPool> rootPool_;
std::shared_ptr<facebook::velox::memory::MemoryPool> leafPool_;
rootPool_ = memory::memoryManager()->addRootPool("MetadataTest");
leafPool_ = rootPool_->addLeafChild("MetadataTest");
dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto input = std::make_unique<dwio::common::BufferedInput>(
std::make_shared<LocalReadFile>(file_path), readerOptions.memoryPool());
auto reader =
std::make_unique<ParquetReader>(std::move(input), readerOptions);
auto thriftMeta = reader->thriftFileMetaData();
ASSERT_NE(0, thriftMeta.key_value_metadata.size());

// Verify keys that were added before file writer was closed are present.
for (int i = 1; i <= 3; ++i) {
auto index = std::to_string(i);
PARQUET_ASSIGN_OR_THROW(auto value, read_kv_meta->Get("test_key_" + index));
auto key = thriftMeta.key_value_metadata[i - 1].key;
auto value = thriftMeta.key_value_metadata[i - 1].value;

auto index = std::to_string(i + 1);
if (i == 3) {
index = std::to_string(1);
}

EXPECT_EQ("test_key_" + index, key);
EXPECT_EQ("test_value_" + index, value);
}
// Verify keys that were added after file writer was closed are not present.
EXPECT_FALSE(read_kv_meta->Contains("test_key_4"));
for (const auto& kv : thriftMeta.key_value_metadata) {
EXPECT_NE("test_key_4", kv.key);
EXPECT_NE("test_value_4", kv.value);
}
}

// TODO: disabled as they require Arrow parquet data dir.
Expand Down Expand Up @@ -467,6 +497,7 @@ TEST(Metadata, TestReadPageIndex) {
*/

TEST(Metadata, TestSortingColumns) {
memory::MemoryManager::testingSetInstance({});
schema::NodeVector fields;
fields.push_back(schema::Int32("sort_col", Repetition::REQUIRED));
fields.push_back(schema::Int32("int_col", Repetition::REQUIRED));
Expand Down Expand Up @@ -498,15 +529,31 @@ TEST(Metadata, TestSortingColumns) {
file_writer->Close();

PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);

ASSERT_NE(nullptr, file_reader->metadata());
ASSERT_EQ(1, file_reader->metadata()->num_row_groups());
auto row_group_reader = file_reader->RowGroup(0);
auto* row_group_read_metadata = row_group_reader->metadata();
ASSERT_NE(nullptr, row_group_read_metadata);
EXPECT_EQ(sorting_columns, row_group_read_metadata->sorting_columns());

auto tempFile = exec::test::TempFilePath::create();
auto file_path = tempFile->getPath();
auto local_write_file =
std::make_unique<LocalWriteFile>(file_path, false, false);
auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(buffer);
auto buffer_to_string = buffer_reader->buffer()->ToString();
local_write_file->append(buffer_to_string);
local_write_file->close();
std::shared_ptr<facebook::velox::memory::MemoryPool> rootPool_;
std::shared_ptr<facebook::velox::memory::MemoryPool> leafPool_;
rootPool_ = memory::memoryManager()->addRootPool("MetadataTest");
leafPool_ = rootPool_->addLeafChild("MetadataTest");
dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto input = std::make_unique<dwio::common::BufferedInput>(
std::make_shared<LocalReadFile>(file_path), readerOptions.memoryPool());
auto reader =
std::make_unique<ParquetReader>(std::move(input), readerOptions);
ASSERT_EQ(1, reader->thriftFileMetaData().row_groups.size());
auto row_groups = reader->thriftFileMetaData().row_groups;
ASSERT_NE(true, row_groups.empty());
auto sc = row_groups[0].sorting_columns;
EXPECT_EQ(sorting_columns[0].column_idx, sc[0].column_idx);
EXPECT_EQ(sorting_columns[0].descending, sc[0].descending);
EXPECT_EQ(sorting_columns[0].nulls_first, sc[0].nulls_first);
}

TEST(ApplicationVersion, Basics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/ubsan.h"

#include "velox/dwio/parquet/reader/ParquetReader.h"
#include "velox/dwio/parquet/writer/arrow/ColumnWriter.h"
#include "velox/dwio/parquet/writer/arrow/FileWriter.h"
#include "velox/dwio/parquet/writer/arrow/Platform.h"
Expand All @@ -45,8 +46,8 @@
#include "velox/dwio/parquet/writer/arrow/ThriftInternal.h"
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/tests/ColumnReader.h"
#include "velox/dwio/parquet/writer/arrow/tests/FileReader.h"
#include "velox/dwio/parquet/writer/arrow/tests/TestUtil.h"
#include "velox/exec/tests/utils/TempFilePath.h"

using arrow::default_memory_pool;
using arrow::MemoryPool;
Expand Down Expand Up @@ -451,6 +452,7 @@ class TestStatistics : public PrimitiveTypedTest<TestType> {
}

void TestFullRoundtrip(int64_t num_values, int64_t null_count) {
memory::MemoryManager::testingSetInstance({});
this->GenerateData(num_values);

// compute statistics for the whole batch
Expand Down Expand Up @@ -494,18 +496,39 @@ class TestStatistics : public PrimitiveTypedTest<TestType> {

ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);
auto rg_reader = file_reader->RowGroup(0);
auto column_chunk = rg_reader->metadata()->ColumnChunk(0);
if (!column_chunk->is_stats_set())
return;
std::shared_ptr<Statistics> stats = column_chunk->statistics();
// check values after serialization + deserialization
EXPECT_EQ(null_count, stats->null_count());
EXPECT_EQ(num_values - null_count, stats->num_values());
// Write the buffer to a tmp file
auto tempFile = exec::test::TempFilePath::create();
auto file_path = tempFile->getPath();
auto local_write_file =
std::make_unique<LocalWriteFile>(file_path, false, false);
auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(buffer);
auto buffer_to_string = buffer_reader->buffer()->ToString();
local_write_file->append(buffer_to_string);
local_write_file->close();
std::shared_ptr<facebook::velox::memory::MemoryPool> rootPool_;
std::shared_ptr<facebook::velox::memory::MemoryPool> leafPool_;
rootPool_ = memory::memoryManager()->addRootPool("StatisticsTest");
leafPool_ = rootPool_->addLeafChild("StatisticsTest");
dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto input = std::make_unique<dwio::common::BufferedInput>(
std::make_shared<LocalReadFile>(file_path), readerOptions.memoryPool());
auto reader =
std::make_unique<ParquetReader>(std::move(input), readerOptions);
auto row_group = reader->fileMetaData().rowGroup(0);
auto col_chunk = row_group.columnChunk(0);
auto column_metadata = col_chunk.getColumnMetadata();
auto stats = column_metadata.statistics;
auto type =
column_metadata
.type; // each type needs mapping if needed std::make_shared<const
// ScalarType<TypeKind::INTEGER>>()
EXPECT_EQ(null_count, stats.null_count);
EXPECT_TRUE(expected_stats->HasMinMax());
EXPECT_EQ(expected_stats->EncodeMin(), stats->EncodeMin());
EXPECT_EQ(expected_stats->EncodeMax(), stats->EncodeMax());
EXPECT_EQ(expected_stats->EncodeMin(), stats.min_value);
EXPECT_EQ(expected_stats->EncodeMax(), stats.max_value);
auto col_stats =
col_chunk.getColumnStatistics(INTEGER(), row_group.numRows());
EXPECT_EQ(num_values - null_count, col_stats->getNumberOfValues());
}
};

Expand Down Expand Up @@ -1014,20 +1037,33 @@ class TestStatisticsSortOrder : public ::testing::Test {

void VerifyParquetStats() {
ASSERT_OK_AND_ASSIGN(auto pbuffer, parquet_sink_->Finish());

auto tempFile = exec::test::TempFilePath::create();
auto file_path = tempFile->getPath();
auto local_write_file =
std::make_unique<LocalWriteFile>(file_path, false, false);
auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(pbuffer);
auto buffer_to_string = buffer_reader->buffer()->ToString();
local_write_file->append(buffer_to_string);
local_write_file->close();
memory::MemoryManager::testingSetInstance({});
std::shared_ptr<facebook::velox::memory::MemoryPool> rootPool_;
std::shared_ptr<facebook::velox::memory::MemoryPool> leafPool_;
rootPool_ = memory::memoryManager()->addRootPool("StatisticsTest");
leafPool_ = rootPool_->addLeafChild("StatisticsTest");
dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto input = std::make_unique<dwio::common::BufferedInput>(
std::make_shared<LocalReadFile>(file_path), readerOptions.memoryPool());
// Create a ParquetReader instance
std::unique_ptr<ParquetFileReader> parquet_reader = ParquetFileReader::Open(
std::make_shared<::arrow::io::BufferReader>(pbuffer));

auto reader =
std::make_unique<ParquetReader>(std::move(input), readerOptions);
// Get the File MetaData
std::shared_ptr<FileMetaData> file_metadata = parquet_reader->metadata();
std::shared_ptr<RowGroupMetaData> rg_metadata = file_metadata->RowGroup(0);
auto row_group = reader->fileMetaData().rowGroup(0);
for (int i = 0; i < static_cast<int>(fields_.size()); i++) {
ARROW_SCOPED_TRACE("Statistics for field #", i);
std::shared_ptr<ColumnChunkMetaData> cc_metadata =
rg_metadata->ColumnChunk(i);
EXPECT_EQ(stats_[i].min(), cc_metadata->statistics()->EncodeMin());
EXPECT_EQ(stats_[i].max(), cc_metadata->statistics()->EncodeMax());
auto col_chunk = row_group.columnChunk(i);
auto cc_metadata = col_chunk.getColumnMetadata();
EXPECT_EQ(stats_[i].min(), cc_metadata.statistics.min_value);
EXPECT_EQ(stats_[i].max(), cc_metadata.statistics.max_value);
}
}

Expand Down Expand Up @@ -1264,34 +1300,6 @@ TEST(TestByteArrayStatisticsFromArrow, LargeStringType) {
TestByteArrayStatisticsFromArrow<::arrow::LargeStringType>();
}

// Ensure UNKNOWN sort order is handled properly
using TestStatisticsSortOrderFLBA = TestStatisticsSortOrder<FLBAType>;

TEST_F(TestStatisticsSortOrderFLBA, UnknownSortOrder) {
this->fields_.push_back(schema::PrimitiveNode::Make(
"Column 0",
Repetition::REQUIRED,
Type::FIXED_LEN_BYTE_ARRAY,
ConvertedType::INTERVAL,
FLBA_LENGTH));
this->SetUpSchema();
this->WriteParquet();

ASSERT_OK_AND_ASSIGN(auto pbuffer, parquet_sink_->Finish());

// Create a ParquetReader instance
std::unique_ptr<ParquetFileReader> parquet_reader = ParquetFileReader::Open(
std::make_shared<::arrow::io::BufferReader>(pbuffer));
// Get the File MetaData
std::shared_ptr<FileMetaData> file_metadata = parquet_reader->metadata();
std::shared_ptr<RowGroupMetaData> rg_metadata = file_metadata->RowGroup(0);
std::shared_ptr<ColumnChunkMetaData> cc_metadata =
rg_metadata->ColumnChunk(0);

// stats should not be set for UNKNOWN sort order
ASSERT_FALSE(cc_metadata->is_stats_set());
}

template <
typename Stats,
typename Array,
Expand Down
2 changes: 0 additions & 2 deletions velox/dwio/parquet/writer/arrow/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ add_executable(
FileDeserializeTest.cpp
FileSerializeTest.cpp
LevelConversionTest.cpp
MetadataTest.cpp
PageIndexTest.cpp
PropertiesTest.cpp
SchemaTest.cpp
StatisticsTest.cpp
TypesTest.cpp)

add_test(velox_dwio_arrow_parquet_writer_test
Expand Down

0 comments on commit d5f7423

Please sign in to comment.