Skip to content

Commit

Permalink
fix(parquet): Support converted type in timestamp reader (#11964)
Browse files Browse the repository at this point in the history
Summary:
Gluten observed a UT failure at the code location below. The cause is that the
timestamp schema element lacks a logical type and is composed of type, name,
and converted_type. This PR allows parsing converted type from schema elements
and uses it in the timestamp reader to determine the timestamp precision if
logical type is not supplied.

https://github.com/facebookincubator/velox/blob/87c558e094043c5451659c59d38ffc1bd6de781a/velox/dwio/parquet/reader/TimestampColumnReader.h#L99

Pull Request resolved: #11964

Reviewed By: xiaoxmeng

Differential Revision: D67718622

Pulled By: pedroerp

fbshipit-source-id: 948e597447e43444abb2f5f56f17a44fbd0a16ec
  • Loading branch information
rui-mo authored and facebook-github-bot committed Dec 30, 2024
1 parent 09cbb2e commit 7cbfce0
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 39 deletions.
18 changes: 18 additions & 0 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::move(name),
std::nullopt,
std::nullopt,
std::nullopt,
maxRepeat + 1,
maxDefine,
isOptional,
Expand Down Expand Up @@ -438,6 +439,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::move(name),
std::nullopt,
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine,
isOptional,
Expand Down Expand Up @@ -468,6 +470,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::move(name),
std::nullopt,
std::nullopt,
std::nullopt,
maxRepeat + 1,
maxDefine,
isOptional,
Expand Down Expand Up @@ -504,6 +507,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::move(name),
std::nullopt,
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine,
isOptional,
Expand Down Expand Up @@ -534,6 +538,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
"dummy",
std::nullopt,
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine,
isOptional,
Expand All @@ -547,6 +552,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::move(name),
std::nullopt,
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine,
isOptional,
Expand All @@ -571,6 +577,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::move(name),
std::nullopt,
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine,
isOptional,
Expand All @@ -595,6 +602,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
"dummy",
std::nullopt,
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine,
isOptional,
Expand All @@ -608,6 +616,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::move(name),
std::nullopt,
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine,
isOptional,
Expand All @@ -625,6 +634,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::move(name),
std::nullopt,
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine,
isOptional,
Expand All @@ -644,6 +654,12 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
schemaElement.__isset.logicalType
? std::optional<thrift::LogicalType>(schemaElement.logicalType)
: std::nullopt;
const std::optional<thrift::ConvertedType::type> convertedType =
schemaElement.__isset.converted_type
? std::optional<thrift::ConvertedType::type>(
schemaElement.converted_type)
: std::nullopt;

auto leafTypePtr = std::make_unique<ParquetTypeWithId>(
veloxType,
std::move(children),
Expand All @@ -653,6 +669,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
name,
schemaElement.type,
logicalType_,
convertedType,
maxRepeat,
maxDefine,
isOptional,
Expand All @@ -676,6 +693,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::move(name),
std::nullopt,
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine - 1,
isOptional,
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/parquet/reader/ParquetTypeWithId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ ParquetTypeWithId::moveChildren() const&& {
auto name = parquetChild->name_;
auto parquetType = parquetChild->parquetType_;
auto logicalType = parquetChild->logicalType_;
auto convertedType = parquetChild->convertedType_;
auto maxRepeat = parquetChild->maxRepeat_;
auto maxDefine = parquetChild->maxDefine_;
auto isOptional = parquetChild->isOptional_;
Expand All @@ -61,6 +62,7 @@ ParquetTypeWithId::moveChildren() const&& {
std::move(name),
parquetType,
std::move(logicalType),
std::move(convertedType),
maxRepeat,
maxDefine,
isOptional,
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/parquet/reader/ParquetTypeWithId.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ParquetTypeWithId : public dwio::common::TypeWithId {
std::string name,
std::optional<thrift::Type::type> parquetType,
std::optional<thrift::LogicalType> logicalType,
std::optional<thrift::ConvertedType::type> convertedType,
uint32_t maxRepeat,
uint32_t maxDefine,
bool isOptional,
Expand All @@ -54,6 +55,7 @@ class ParquetTypeWithId : public dwio::common::TypeWithId {
name_(name),
parquetType_(parquetType),
logicalType_(std::move(logicalType)),
convertedType_(convertedType),
maxRepeat_(maxRepeat),
maxDefine_(maxDefine),
isOptional_(isOptional),
Expand Down Expand Up @@ -84,6 +86,7 @@ class ParquetTypeWithId : public dwio::common::TypeWithId {
const std::string name_;
const std::optional<thrift::Type::type> parquetType_;
const std::optional<thrift::LogicalType> logicalType_;
const std::optional<thrift::ConvertedType::type> convertedType_;
const uint32_t maxRepeat_;
const uint32_t maxDefine_;
const bool isOptional_;
Expand Down
86 changes: 47 additions & 39 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@
namespace facebook::velox::parquet {
namespace {

Timestamp toInt64Timestamp(int64_t value, const thrift::TimeUnit& unit) {
if (unit.__isset.MILLIS) {
return Timestamp::fromMillis(value);
Timestamp toInt64Timestamp(int64_t value, TimestampPrecision filePrecision) {
switch (filePrecision) {
case TimestampPrecision::kMilliseconds:
return Timestamp::fromMillis(value);
case TimestampPrecision::kMicroseconds:
return Timestamp::fromMicros(value);
case TimestampPrecision::kNanoseconds:
return Timestamp::fromNanos(value);
default:
VELOX_UNREACHABLE();
}
if (unit.__isset.MICROS) {
return Timestamp::fromMicros(value);
}
if (unit.__isset.NANOS) {
return Timestamp::fromNanos(value);
}
VELOX_UNREACHABLE();
}

Timestamp toInt96Timestamp(const int128_t& value) {
Expand All @@ -58,14 +58,14 @@ class ParquetTimestampRange final : public common::TimestampRange {
const Timestamp& lower,
const Timestamp& upper,
bool nullAllowed,
const thrift::TimeUnit& timestampUnit)
TimestampPrecision filePrecision)
: TimestampRange(lower, upper, nullAllowed),
timestampUnit_(timestampUnit) {}
filePrecision_(filePrecision) {}

bool testInt128(const int128_t& value) const final {
Timestamp ts;
if constexpr (std::is_same_v<T, int64_t>) {
ts = toInt64Timestamp(value, timestampUnit_);
ts = toInt64Timestamp(value, filePrecision_);
} else if constexpr (std::is_same_v<T, int128_t>) {
ts = toInt96Timestamp(value);
}
Expand All @@ -74,7 +74,7 @@ class ParquetTimestampRange final : public common::TimestampRange {

private:
// Only used when T is int64_t.
const thrift::TimeUnit timestampUnit_;
const TimestampPrecision filePrecision_;
};

} // namespace
Expand All @@ -91,26 +91,33 @@ class TimestampColumnReader : public IntegerColumnReader {
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(requestedType, fileType, params, scanSpec),
timestampPrecision_(params.timestampPrecision()) {
requestedPrecision_(params.timestampPrecision()) {
if constexpr (std::is_same_v<T, int64_t>) {
const auto logicalType =
std::static_pointer_cast<const ParquetTypeWithId>(fileType_)
->logicalType_;
VELOX_CHECK(logicalType);
VELOX_CHECK(logicalType->__isset.TIMESTAMP);
timestampUnit_ = logicalType->TIMESTAMP.unit;

if (timestampUnit_.__isset.MILLIS) {
needsConversion_ =
timestampPrecision_ != TimestampPrecision::kMilliseconds;
} else if (timestampUnit_.__isset.MICROS) {
needsConversion_ =
timestampPrecision_ != TimestampPrecision::kMicroseconds;
} else if (timestampUnit_.__isset.NANOS) {
needsConversion_ =
timestampPrecision_ != TimestampPrecision::kNanoseconds;
const auto typeWithId =
std::static_pointer_cast<const ParquetTypeWithId>(fileType_);
if (auto logicalType = typeWithId->logicalType_) {
VELOX_CHECK(logicalType->__isset.TIMESTAMP);
const auto unit = logicalType->TIMESTAMP.unit;
if (unit.__isset.MILLIS) {
filePrecision_ = TimestampPrecision::kMilliseconds;
} else if (unit.__isset.MICROS) {
filePrecision_ = TimestampPrecision::kMicroseconds;
} else if (unit.__isset.NANOS) {
filePrecision_ = TimestampPrecision::kNanoseconds;
} else {
VELOX_UNREACHABLE();
}
} else if (auto convertedType = typeWithId->convertedType_) {
if (convertedType == thrift::ConvertedType::type::TIMESTAMP_MILLIS) {
filePrecision_ = TimestampPrecision::kMilliseconds;
} else if (
convertedType == thrift::ConvertedType::type::TIMESTAMP_MICROS) {
filePrecision_ = TimestampPrecision::kMicroseconds;
} else {
VELOX_UNREACHABLE();
}
} else {
VELOX_UNREACHABLE();
VELOX_NYI("Logical type and converted type are not provided.");
}
}
}
Expand All @@ -136,13 +143,13 @@ class TimestampColumnReader : public IntegerColumnReader {

const int128_t encoded = reinterpret_cast<int128_t&>(rawValues[i]);
if constexpr (std::is_same_v<T, int64_t>) {
rawValues[i] = toInt64Timestamp(encoded, timestampUnit_);
rawValues[i] = toInt64Timestamp(encoded, filePrecision_);
if (needsConversion_) {
rawValues[i] = rawValues[i].toPrecision(timestampPrecision_);
rawValues[i] = rawValues[i].toPrecision(requestedPrecision_);
}
} else if constexpr (std::is_same_v<T, int128_t>) {
rawValues[i] =
toInt96Timestamp(encoded).toPrecision(timestampPrecision_);
toInt96Timestamp(encoded).toPrecision(requestedPrecision_);
}
}
}
Expand All @@ -158,7 +165,7 @@ class TimestampColumnReader : public IntegerColumnReader {
ExtractValues extractValues) {
if (auto* range = dynamic_cast<common::TimestampRange*>(filter)) {
ParquetTimestampRange<T> newRange{
range->lower(), range->upper(), range->nullAllowed(), timestampUnit_};
range->lower(), range->upper(), range->nullAllowed(), filePrecision_};
this->readWithVisitor(
rows,
dwio::common::ColumnVisitor<
Expand Down Expand Up @@ -191,10 +198,11 @@ class TimestampColumnReader : public IntegerColumnReader {
private:
// The requested precision can be specified from HiveConfig to read timestamp
// from Parquet.
const TimestampPrecision timestampPrecision_;
const TimestampPrecision requestedPrecision_;

// The precision of int64_t timestamp in Parquet. Only set when T is int64_t.
TimestampPrecision filePrecision_;

// Only set when T is int64_t.
thrift::TimeUnit timestampUnit_;
// Whether Int64 Timestamp needs to be converted to the requested precision.
bool needsConversion_ = false;
};
Expand Down
Binary file not shown.
31 changes: 31 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,37 @@ TEST_F(ParquetTableScanTest, timestampInt96Plain) {
testTimestampRead(options);
}

TEST_F(ParquetTableScanTest, timestampConvertedType) {
auto stringToTimestamp = [](std::string_view view) {
return util::fromTimestampString(
view.data(), view.size(), util::TimestampParseMode::kPrestoCast)
.thenOrThrow(folly::identity, [&](const Status& status) {
VELOX_USER_FAIL("{}", status.message());
});
};
std::vector<std::string_view> expected = {
"1970-01-01 00:00:00.010",
"1970-01-01 00:00:00.010",
"1970-01-01 00:00:00.010",
};
std::vector<Timestamp> values;
values.reserve(expected.size());
for (auto view : expected) {
values.emplace_back(stringToTimestamp(view));
}

const auto vector = makeRowVector(
{"time"},
{
makeFlatVector<Timestamp>(values),
});
const auto schema = asRowType(vector->type());
const auto path = getExampleFilePath("tmmillis_i64.parquet");
loadData(path, schema, vector);

assertSelectWithFilter({"time"}, {}, "", "SELECT time from tmp");
}

TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) {
// Write timestamp data into parquet.
constexpr int kSize = 10;
Expand Down

0 comments on commit 7cbfce0

Please sign in to comment.