Skip to content

Commit

Permalink
Refactor TopNRowNumber::getOutputFromMemory
Browse files Browse the repository at this point in the history
  • Loading branch information
aditi-pandit committed Nov 6, 2024
1 parent 30fc13e commit 12b66d3
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 59 deletions.
82 changes: 35 additions & 47 deletions velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ void TopNRowNumber::noMoreInput() {
spillConfig_->readBufferSize, pool(), &spillStats_);
} else {
outputRows_.resize(outputBatchSize_);
// Set the first partition to output.
currentPartition_ = nextPartition();
}
}

Expand All @@ -313,14 +315,14 @@ void TopNRowNumber::updateEstimatedOutputRowSize() {

TopNRowNumber::TopRows* TopNRowNumber::nextPartition() {
if (!table_) {
if (!currentPartition_) {
currentPartition_ = 0;
if (!currentPartitionNumber_) {
currentPartitionNumber_ = 0;
return singlePartition_.get();
}
return nullptr;
}

if (!currentPartition_) {
if (!currentPartitionNumber_) {
numPartitions_ = table_->listAllRows(
&partitionIt_,
partitions_.size(),
Expand All @@ -331,38 +333,28 @@ TopNRowNumber::TopRows* TopNRowNumber::nextPartition() {
return nullptr;
}

currentPartition_ = 0;
currentPartitionNumber_ = 0;
} else {
++currentPartition_.value();
if (currentPartition_ >= numPartitions_) {
currentPartition_.reset();
++currentPartitionNumber_.value();
if (currentPartitionNumber_ >= numPartitions_) {
currentPartitionNumber_.reset();
return nextPartition();
}
}

return &currentPartition();
}

TopNRowNumber::TopRows& TopNRowNumber::currentPartition() {
VELOX_CHECK(currentPartition_.has_value());

if (!table_) {
return *singlePartition_;
}

return partitionAt(partitions_[currentPartition_.value()]);
return &partitionAt(partitions_[currentPartitionNumber_.value()]);
}

void TopNRowNumber::appendPartitionRows(
TopRows& partition,
vector_size_t start,
vector_size_t size,
vector_size_t numRows,
vector_size_t outputOffset,
FlatVector<int64_t>* rowNumbers) {
// Append 'size' partition rows in reverse order starting from 'start' row.
auto rowNumber = partition.rows.size() - start;
for (auto i = 0; i < size; ++i) {
const auto index = outputOffset + size - i - 1;
// The partition.rows priority queue pops rows in order of reverse
// row numbers.
auto rowNumber = partition.rows.size();
for (auto i = 0; i < numRows; ++i) {
auto index = outputOffset + i;
if (rowNumbers) {
rowNumbers->set(index, rowNumber--);
}
Expand All @@ -385,6 +377,9 @@ RowVectorPtr TopNRowNumber::getOutput() {

// We may have input accumulated in 'data_'.
if (data_->numRows() > 0) {
if (!currentPartition_) {
currentPartition_ = nextPartition();
}
return getOutputFromMemory();
}

Expand Down Expand Up @@ -425,37 +420,30 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() {
}

vector_size_t offset = 0;
if (remainingRowsInPartition_ > 0) {
auto& partition = currentPartition();
auto start = partition.rows.size() - remainingRowsInPartition_;
const auto numRows =
std::min<vector_size_t>(outputBatchSize_, remainingRowsInPartition_);
appendPartitionRows(partition, start, numRows, offset, rowNumbers);
offset += numRows;
remainingRowsInPartition_ -= numRows;
}

// Continue to output as many remaining partitions as possible.
while (offset < outputBatchSize_) {
auto* partition = nextPartition();
if (!partition) {
if (!currentPartition_) {
break;
}

auto numRows = partition->rows.size();
if (offset + numRows > outputBatchSize_) {
remainingRowsInPartition_ = offset + numRows - outputBatchSize_;

// Add a subset of partition rows.
numRows -= remainingRowsInPartition_;
appendPartitionRows(*partition, 0, numRows, offset, rowNumbers);
offset += numRows;
auto numOutputRowsLeft = outputBatchSize_ - offset;
if (currentPartition_->rows.size() > numOutputRowsLeft) {
// Only a partial partition can be output in this getOutput() call.
// Output as many rows as possible.
appendPartitionRows(
*currentPartition_, numOutputRowsLeft, offset, rowNumbers);
offset += numOutputRowsLeft;
break;
}

// Add all partition rows.
appendPartitionRows(*partition, 0, numRows, offset, rowNumbers);
offset += numRows;
remainingRowsInPartition_ = 0;
auto numPartitionRows = currentPartition_->rows.size();
appendPartitionRows(
*currentPartition_, numPartitionRows, offset, rowNumbers);
offset += numPartitionRows;

// Move to the next partition.
currentPartition_ = nextPartition();
}

if (offset == 0) {
Expand Down
28 changes: 16 additions & 12 deletions velox/exec/TopNRowNumber.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,11 @@ class TopNRowNumber : public Operator {
// partitions left.
TopRows* nextPartition();

// Returns partition that was partially added to the previous output batch.
TopRows& currentPartition();

// Appends partition rows to outputRows_ and optionally populates row
// numbers.
// Appends numRows of partition rows to outputRows_. Note : partition.rows
// tops rows in reverse row number order.
void appendPartitionRows(
TopRows& partition,
vector_size_t start,
vector_size_t size,
vector_size_t numRows,
vector_size_t outputOffset,
FlatVector<int64_t>* rowNumbers);

Expand Down Expand Up @@ -208,25 +204,33 @@ class TopNRowNumber : public Operator {

// Maximum number of rows in the output batch.
vector_size_t outputBatchSize_;
std::vector<char*> outputRows_;

// The below variables are used when outputting from memory.
// Vector of pointers to individual rows in the RowContainer for the current
// output block.
std::vector<char*> outputRows_;
// Number of partitions to fetch from a HashTable in a single listAllRows
// call.
static const size_t kPartitionBatchSize = 100;

BaseHashTable::RowsIterator partitionIt_;
std::vector<char*> partitions_{kPartitionBatchSize};
size_t numPartitions_{0};
std::optional<int32_t> currentPartition_;
vector_size_t remainingRowsInPartition_{0};

// THis is the index of the current partition within partitions_ which is
// obtained from the HashTable iterator.
std::optional<int32_t> currentPartitionNumber_;
// This is the currentPartition being output. It is possible that the
// partition is output across multiple output blocks.
TopNRowNumber::TopRows* currentPartition_{nullptr};

// The below variables are used when outputting from the spiller.
// Spiller for contents of the 'data_'.
std::unique_ptr<Spiller> spiller_;

// Used to sort-merge spilled data.
std::unique_ptr<TreeOfLosers<SpillMergeStream>> merge_;

// Row number for the first row in the next output batch.
// Row number for the first row in the next output batch from the spiller.
int32_t nextRowNumber_{0};
};
} // namespace facebook::velox::exec

0 comments on commit 12b66d3

Please sign in to comment.