-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(iceberg): Add support for writing iceberg tables #10996
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for meta-velox canceled.
|
dcb40ae
to
02e6d32
Compare
|
||
virtual ~IcebergInsertTableHandle() = default; | ||
|
||
std::shared_ptr<const VeloxIcebergSchema> schema() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the delay. I was occupied with other work, which limited my progress on this PR over the past couple of months.
The current changes do not utilize the schema and partitionSpec, but they will be necessary when adding support for partition transforms. At present, the changes only handle identity partitions for Iceberg tables, and the required partition and schema information is available in the table handle. However, since the table handle does not include details about partition transforms, we will need the Iceberg schema and partitionSpec to support them.
I will be looking into adding support for partition transforms next.
#include "velox/connectors/hive/iceberg/IcebergDataSink.h" | ||
|
||
#include <utility> | ||
#include "velox/common/base/Counters.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are lots of headers unused. Please remove.
02e6d32
to
9dde9fb
Compare
9dde9fb
to
ff22387
Compare
return; | ||
} | ||
|
||
dataChannels_ = getDataChannels(partitionChannels_, inputType_->size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use insertTableHandle number of channels. partitionChannels_ come from it. I'm not sure inputType_ size is always the same as insertTableHandle's.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used inputType_ since we are already using it in HiveDataSink, I can verify how inputType_ is derived and whether it will always be the same as insertTableHandle columns.
const std::vector<column_index_t>& partitionChannels, | ||
const column_index_t childrenSize) const { | ||
// Create a vector of all possible channels | ||
std::vector<column_index_t> dataChannels(childrenSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add dataChannels_ field to IcebergDataSink than allocating a vector every time appendData() is called.
@@ -808,7 +786,7 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const { | |||
return HiveWriterId{partitionId, bucketId}; | |||
} | |||
|
|||
void HiveDataSink::splitInputRowsAndEnsureWriters() { | |||
void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment out input (RowVectorPtr /* input */) {
insertTableHandle_); | ||
|
||
for (auto i = 0; i < partitionChannels_.size(); ++i) { | ||
auto type = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I pushed it by mistake in this commit, it was part of the changes for writing tables with partition transforms. I will remove it from this commit.
icebergInsertTableHandle->inputColumns()[partitionChannels_[i]] | ||
->dataType(); | ||
auto block = input->childAt(partitionChannels_[i]); | ||
partitionValues.insert(partitionValues.begin() + i, block->toString(row)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionValues[i] = block->toString(row);
partitionValues.insert(partitionValues.begin() + i, block->toString(row)); | ||
} | ||
|
||
partitionData_.insert( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionData_[index] = std::make_shared(partitionValues);
|
||
void IcebergDataSink::extendBuffersForPartitionedTables() { | ||
// Extends the buffer used for partition rows calculations. | ||
partitionSizes_.emplace_back(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace the 3 lines to call HiveDataSink::extendBuffersForPartitionedTables()
f3565a6
to
e3aa199
Compare
}; | ||
|
||
class PartitionData { | ||
private: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the private fields to the end.
|
||
#include "velox/common/base/Fs.h" | ||
#include "velox/connectors/hive/TableHandle.h" | ||
#include "velox/exec/OperatorUtils.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used
e3aa199
to
967c556
Compare
FWIW, there's a new project to add C++ support for iceberg: |
Introduce IcebergPageSink
967c556
to
a1ee3c5
Compare
Introduce IcebergPageSink