From 12b66d3c7f7560aa00a88a34e035e63c302658eb Mon Sep 17 00:00:00 2001 From: aditi-pandit Date: Tue, 5 Nov 2024 20:57:18 +0530 Subject: [PATCH] Refactor TopNRowNumber::getOutputFromMemory --- velox/exec/TopNRowNumber.cpp | 82 +++++++++++++++--------------------- velox/exec/TopNRowNumber.h | 28 ++++++------ 2 files changed, 51 insertions(+), 59 deletions(-) diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 26ecbd1496312..fa2b36925d2a0 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -289,6 +289,8 @@ void TopNRowNumber::noMoreInput() { spillConfig_->readBufferSize, pool(), &spillStats_); } else { outputRows_.resize(outputBatchSize_); + // Set the first partition to output. + currentPartition_ = nextPartition(); } } @@ -313,14 +315,14 @@ void TopNRowNumber::updateEstimatedOutputRowSize() { TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { if (!table_) { - if (!currentPartition_) { - currentPartition_ = 0; + if (!currentPartitionNumber_) { + currentPartitionNumber_ = 0; return singlePartition_.get(); } return nullptr; } - if (!currentPartition_) { + if (!currentPartitionNumber_) { numPartitions_ = table_->listAllRows( &partitionIt_, partitions_.size(), @@ -331,38 +333,28 @@ TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { return nullptr; } - currentPartition_ = 0; + currentPartitionNumber_ = 0; } else { - ++currentPartition_.value(); - if (currentPartition_ >= numPartitions_) { - currentPartition_.reset(); + ++currentPartitionNumber_.value(); + if (currentPartitionNumber_ >= numPartitions_) { + currentPartitionNumber_.reset(); return nextPartition(); } } - return ¤tPartition(); -} - -TopNRowNumber::TopRows& TopNRowNumber::currentPartition() { - VELOX_CHECK(currentPartition_.has_value()); - - if (!table_) { - return *singlePartition_; - } - - return partitionAt(partitions_[currentPartition_.value()]); + return &partitionAt(partitions_[currentPartitionNumber_.value()]); } void TopNRowNumber::appendPartitionRows( TopRows& partition, - vector_size_t start, - vector_size_t size, + vector_size_t numRows, vector_size_t outputOffset, FlatVector* rowNumbers) { - // Append 'size' partition rows in reverse order starting from 'start' row. - auto rowNumber = partition.rows.size() - start; - for (auto i = 0; i < size; ++i) { - const auto index = outputOffset + size - i - 1; + // The partition.rows priority queue pops rows in order of reverse + // row numbers. + auto rowNumber = partition.rows.size(); + for (auto i = 0; i < numRows; ++i) { + auto index = outputOffset + i; if (rowNumbers) { rowNumbers->set(index, rowNumber--); } @@ -385,6 +377,9 @@ RowVectorPtr TopNRowNumber::getOutput() { // We may have input accumulated in 'data_'. if (data_->numRows() > 0) { + if (!currentPartition_) { + currentPartition_ = nextPartition(); + } return getOutputFromMemory(); } @@ -425,37 +420,30 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() { } vector_size_t offset = 0; - if (remainingRowsInPartition_ > 0) { - auto& partition = currentPartition(); - auto start = partition.rows.size() - remainingRowsInPartition_; - const auto numRows = - std::min(outputBatchSize_, remainingRowsInPartition_); - appendPartitionRows(partition, start, numRows, offset, rowNumbers); - offset += numRows; - remainingRowsInPartition_ -= numRows; - } - + // Continue to output as many remaining partitions as possible. while (offset < outputBatchSize_) { - auto* partition = nextPartition(); - if (!partition) { + if (!currentPartition_) { break; } - auto numRows = partition->rows.size(); - if (offset + numRows > outputBatchSize_) { - remainingRowsInPartition_ = offset + numRows - outputBatchSize_; - - // Add a subset of partition rows. - numRows -= remainingRowsInPartition_; - appendPartitionRows(*partition, 0, numRows, offset, rowNumbers); - offset += numRows; + auto numOutputRowsLeft = outputBatchSize_ - offset; + if (currentPartition_->rows.size() > numOutputRowsLeft) { + // Only a partial partition can be output in this getOutput() call. + // Output as many rows as possible. + appendPartitionRows( + *currentPartition_, numOutputRowsLeft, offset, rowNumbers); + offset += numOutputRowsLeft; break; } // Add all partition rows. - appendPartitionRows(*partition, 0, numRows, offset, rowNumbers); - offset += numRows; - remainingRowsInPartition_ = 0; + auto numPartitionRows = currentPartition_->rows.size(); + appendPartitionRows( + *currentPartition_, numPartitionRows, offset, rowNumbers); + offset += numPartitionRows; + + // Move to the next partition. + currentPartition_ = nextPartition(); } if (offset == 0) { diff --git a/velox/exec/TopNRowNumber.h b/velox/exec/TopNRowNumber.h index 1136422a3afce..90e0602ca461d 100644 --- a/velox/exec/TopNRowNumber.h +++ b/velox/exec/TopNRowNumber.h @@ -99,15 +99,11 @@ class TopNRowNumber : public Operator { // partitions left. TopRows* nextPartition(); - // Returns partition that was partially added to the previous output batch. - TopRows& currentPartition(); - - // Appends partition rows to outputRows_ and optionally populates row - // numbers. + // Appends numRows of partition rows to outputRows_. Note : partition.rows + // tops rows in reverse row number order. void appendPartitionRows( TopRows& partition, - vector_size_t start, - vector_size_t size, + vector_size_t numRows, vector_size_t outputOffset, FlatVector* rowNumbers); @@ -208,8 +204,11 @@ class TopNRowNumber : public Operator { // Maximum number of rows in the output batch. vector_size_t outputBatchSize_; - std::vector outputRows_; + // The below variables are used when outputting from memory. + // Vector of pointers to individual rows in the RowContainer for the current + // output block. + std::vector outputRows_; // Number of partitions to fetch from a HashTable in a single listAllRows // call. static const size_t kPartitionBatchSize = 100; @@ -217,16 +216,21 @@ class TopNRowNumber : public Operator { BaseHashTable::RowsIterator partitionIt_; std::vector partitions_{kPartitionBatchSize}; size_t numPartitions_{0}; - std::optional currentPartition_; - vector_size_t remainingRowsInPartition_{0}; - + // THis is the index of the current partition within partitions_ which is + // obtained from the HashTable iterator. + std::optional currentPartitionNumber_; + // This is the currentPartition being output. It is possible that the + // partition is output across multiple output blocks. + TopNRowNumber::TopRows* currentPartition_{nullptr}; + + // The below variables are used when outputting from the spiller. // Spiller for contents of the 'data_'. std::unique_ptr spiller_; // Used to sort-merge spilled data. std::unique_ptr> merge_; - // Row number for the first row in the next output batch. + // Row number for the first row in the next output batch from the spiller. int32_t nextRowNumber_{0}; }; } // namespace facebook::velox::exec