Skip to content

Commit

Permalink
[kv] Improve PrefixLookup implementation (#222)
Browse files Browse the repository at this point in the history
In the previous commit, the bucket key is only allowed to be a prefix of primary key, but we should allow it to be a subset of primary key. Besides, this commit fixes various bugs around PrefixLookup.
  • Loading branch information
wuchong committed Jan 3, 2025
1 parent 8b14329 commit 2c3fff4
Show file tree
Hide file tree
Showing 36 changed files with 757 additions and 370 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@
@Internal
public abstract class AbstractLookup<T> {

private final TableBucket tableBucket;
private final byte[] key;

public AbstractLookup(byte[] key) {
public AbstractLookup(TableBucket tableBucket, byte[] key) {
this.tableBucket = tableBucket;
this.key = key;
}

public byte[] key() {
return key;
}

public abstract TableBucket tableBucket();
public TableBucket tableBucket() {
return tableBucket;
}

public abstract LookupType lookupType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,13 @@
@Internal
public class Lookup extends AbstractLookup<byte[]> {

private final TableBucket tableBucket;
private final CompletableFuture<byte[]> future;

Lookup(TableBucket tableBucket, byte[] key) {
super(key);
this.tableBucket = tableBucket;
super(tableBucket, key);
this.future = new CompletableFuture<>();
}

@Override
public TableBucket tableBucket() {
return tableBucket;
}

@Override
public LookupType lookupType() {
return LookupType.LOOKUP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public CompletableFuture<byte[]> lookup(TableBucket tableBucket, byte[] keyBytes

public CompletableFuture<List<byte[]>> prefixLookup(
long tableId, int bucketId, byte[] keyBytes) {
// TODO index lookup support partition table.
// TODO prefix lookup support partition table (#266)
PrefixLookup prefixLookup = new PrefixLookup(new TableBucket(tableId, bucketId), keyBytes);
lookupQueue.appendLookup(prefixLookup);
return prefixLookup.future();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@

/**
* A queue that buffers the pending lookup operations and provides a list of {@link Lookup} when
* call method {@link #drain(LookupType)}.
* call method {@link #drain()}.
*/
@ThreadSafe
@Internal
class LookupQueue {

private volatile boolean closed;
// Buffering both the Lookup and IndexLookup.
// buffering both the Lookup and PrefixLookup.
private final ArrayBlockingQueue<AbstractLookup<?>> lookupQueue;
private final int maxBatchSize;
private final long batchTimeoutMs;
private final long batchTimeoutNanos;

LookupQueue(Configuration conf) {
this.lookupQueue =
new ArrayBlockingQueue<>(conf.get(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE));
this.maxBatchSize = conf.get(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE);
this.batchTimeoutMs = conf.get(ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT).toMillis();
this.batchTimeoutNanos = conf.get(ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT).toNanos();
this.closed = false;
}

Expand All @@ -68,20 +68,23 @@ boolean hasUnDrained() {

/** Drain a batch of {@link Lookup}s from the lookup queue. */
List<AbstractLookup<?>> drain() throws Exception {
long start = System.currentTimeMillis();
final long startNanos = System.nanoTime();
List<AbstractLookup<?>> lookupOperations = new ArrayList<>(maxBatchSize);
int count = 0;
while (true) {
if (System.currentTimeMillis() - start > batchTimeoutMs) {
long waitNanos = batchTimeoutNanos - (System.nanoTime() - startNanos);
if (waitNanos <= 0) {
break;
}

AbstractLookup<?> lookup = lookupQueue.poll(300, TimeUnit.MILLISECONDS);
AbstractLookup<?> lookup = lookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS);
if (lookup == null) {
break;
}
count++;
lookupOperations.add(lookup);
count++;
int transferred = lookupQueue.drainTo(lookupOperations, maxBatchSize - count);
count += transferred;
if (count >= maxBatchSize) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@
import com.alibaba.fluss.client.table.Table;
import com.alibaba.fluss.row.InternalRow;

import javax.annotation.Nullable;

import java.util.Objects;

/**
* The result of {@link Table#lookup(InternalRow)}.
*
* @since 0.1
*/
@PublicEvolving
public final class LookupResult {
private final InternalRow row;
private final @Nullable InternalRow row;

public LookupResult(InternalRow row) {
public LookupResult(@Nullable InternalRow row) {
this.row = row;
}

public InternalRow getRow() {
public @Nullable InternalRow getRow() {
return row;
}

Expand All @@ -45,17 +49,18 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
LookupResult that = (LookupResult) o;
return row.equals(that.row);

LookupResult lookupResult = (LookupResult) o;
return Objects.equals(row, lookupResult.row);
}

@Override
public int hashCode() {
return row.hashCode();
return Objects.hash(row);
}

@Override
public String toString() {
return "LookupResult{" + "row=" + row + '}';
return "LookupResult{row=" + row + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,36 +110,34 @@ private void sendLookups(List<AbstractLookup<?>> lookups) {
if (lookups.isEmpty()) {
return;
}
// group by <leader, lookup batches> to lookup batches
// group by <leader, lookup type> to lookup batches
Map<Tuple2<Integer, LookupType>, List<AbstractLookup<?>>> lookupBatches =
groupByLeaderAndType(lookups);
// now, send the batches
lookupBatches.forEach(
(destinationAndType, batch) ->
sendLookups(destinationAndType.f0, destinationAndType.f1, batch));
(destAndType, batch) -> sendLookups(destAndType.f0, destAndType.f1, batch));
}

private Map<Tuple2<Integer, LookupType>, List<AbstractLookup<?>>> groupByLeaderAndType(
List<AbstractLookup<?>> lookups) {
// <leader, LookupType> -> lookup batches
Map<Tuple2<Integer, LookupType>, List<AbstractLookup<?>>> lookupBatchesByLeader =
new HashMap<>();
for (AbstractLookup<?> abstractLookup : lookups) {
for (AbstractLookup<?> lookup : lookups) {
int leader;
// lookup the leader node
TableBucket tb = abstractLookup.tableBucket();
TableBucket tb = lookup.tableBucket();
try {
// TODO this can be a re-triable operation. We should retry here instead of
// throwing exception.
leader = metadataUpdater.leaderFor(tb);
} catch (Exception e) {
abstractLookup.future().completeExceptionally(e);
lookup.future().completeExceptionally(e);
continue;
}
lookupBatchesByLeader
.computeIfAbsent(
Tuple2.of(leader, abstractLookup.lookupType()), k -> new ArrayList<>())
.add(abstractLookup);
.computeIfAbsent(Tuple2.of(leader, lookup.lookupType()), k -> new ArrayList<>())
.add(lookup);
}
return lookupBatchesByLeader;
}
Expand All @@ -157,11 +155,10 @@ private void sendLookups(
}
}

private void sendLookupRequest(
TabletServerGateway gateway, List<AbstractLookup<?>> lookupBatches) {
private void sendLookupRequest(TabletServerGateway gateway, List<AbstractLookup<?>> lookups) {
// table id -> (bucket -> lookups)
Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new HashMap<>();
for (AbstractLookup<?> abstractLookup : lookupBatches) {
for (AbstractLookup<?> abstractLookup : lookups) {
Lookup lookup = (Lookup) abstractLookup;
TableBucket tb = lookup.tableBucket();
long tableId = tb.getTableId();
Expand All @@ -181,10 +178,10 @@ private void sendLookupRequest(
}

private void sendPrefixLookupRequest(
TabletServerGateway gateway, List<AbstractLookup<?>> indexLookupBatches) {
TabletServerGateway gateway, List<AbstractLookup<?>> prefixLookups) {
// table id -> (bucket -> lookups)
Map<Long, Map<TableBucket, PrefixLookupBatch>> lookupByTableId = new HashMap<>();
for (AbstractLookup<?> abstractLookup : indexLookupBatches) {
for (AbstractLookup<?> abstractLookup : prefixLookups) {
PrefixLookup prefixLookup = (PrefixLookup) abstractLookup;
TableBucket tb = prefixLookup.tableBucket();
long tableId = tb.getTableId();
Expand All @@ -195,12 +192,12 @@ private void sendPrefixLookupRequest(
}

lookupByTableId.forEach(
(tableId, indexLookupBatch) ->
(tableId, prefixLookupBatch) ->
sendPrefixLookupRequestAndHandleResponse(
gateway,
makePrefixLookupRequest(tableId, indexLookupBatch.values()),
makePrefixLookupRequest(tableId, prefixLookupBatch.values()),
tableId,
indexLookupBatch));
prefixLookupBatch));
}

private void sendLookupRequestAndHandleResponse(
Expand Down Expand Up @@ -307,7 +304,7 @@ private void handleLookupResponse(
private void handlePrefixLookupResponse(
long tableId,
PrefixLookupResponse prefixLookupResponse,
Map<TableBucket, PrefixLookupBatch> indexLookupsByBucket) {
Map<TableBucket, PrefixLookupBatch> prefixLookupsByBucket) {
for (PbPrefixLookupRespForBucket pbRespForBucket :
prefixLookupResponse.getBucketsRespsList()) {
TableBucket tableBucket =
Expand All @@ -318,7 +315,7 @@ private void handlePrefixLookupResponse(
: null,
pbRespForBucket.getBucketId());

PrefixLookupBatch prefixLookupBatch = indexLookupsByBucket.get(tableBucket);
PrefixLookupBatch prefixLookupBatch = prefixLookupsByBucket.get(tableBucket);
if (pbRespForBucket.hasErrorCode()) {
// TODO for re-triable error, we should retry here instead of throwing exception.
ApiError error = ApiError.fromErrorMessage(pbRespForBucket);
Expand All @@ -328,10 +325,10 @@ private void handlePrefixLookupResponse(
error.formatErrMsg());
prefixLookupBatch.completeExceptionally(error.exception());
} else {
List<List<byte[]>> result = new ArrayList<>();
List<List<byte[]>> result = new ArrayList<>(pbRespForBucket.getValueListsCount());
for (int i = 0; i < pbRespForBucket.getValueListsCount(); i++) {
PbValueList pbValueList = pbRespForBucket.getValueListAt(i);
List<byte[]> keyResult = new ArrayList<>();
List<byte[]> keyResult = new ArrayList<>(pbValueList.getValuesCount());
for (int j = 0; j < pbValueList.getValuesCount(); j++) {
keyResult.add(pbValueList.getValueAt(j));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,13 @@
*/
@Internal
public class PrefixLookup extends AbstractLookup<List<byte[]>> {
private final TableBucket tableBucket;
private final CompletableFuture<List<byte[]>> future;

public PrefixLookup(TableBucket tableBucket, byte[] prefixKey) {
super(prefixKey);
this.tableBucket = tableBucket;
PrefixLookup(TableBucket tableBucket, byte[] prefixKey) {
super(tableBucket, prefixKey);
this.future = new CompletableFuture<>();
}

@Override
public TableBucket tableBucket() {
return tableBucket;
}

@Override
public CompletableFuture<List<byte[]>> future() {
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void complete(List<List<byte[]>> values) {
new FlussRuntimeException(
String.format(
"The number of values return by prefix lookup request is not equal to the number of "
+ "index lookups send. Got %d values, but expected %d.",
+ "prefix lookups send. Got %d values, but expected %d.",
values.size(), prefixLookups.size())));
} else {
for (int i = 0; i < values.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* The result of {@link Table#prefixLookup(InternalRow)}}.
*
* @since 0.1
* @since 0.6
*/
public class PrefixLookupResult {
private final List<InternalRow> rowList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,14 @@ public CompletableFuture<LookupResult> lookup(InternalRow key) {
}
// encoding the key row using a compacted way consisted with how the key is encoded when put
// a row
byte[] keyBytes = primaryKeyEncoder.encode(key);
byte[] lookupBucketKeyBytes = bucketKeyEncoder.encode(key);
byte[] pkBytes = primaryKeyEncoder.encode(key);
byte[] bkBytes = bucketKeyEncoder.encode(key);
Long partitionId = keyRowPartitionGetter == null ? null : getPartitionId(key);
int bucketId = getBucketId(lookupBucketKeyBytes, key);
int bucketId = getBucketId(bkBytes, key);
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
return lookupClientSupplier
.get()
.lookup(tableBucket, keyBytes)
.lookup(tableBucket, pkBytes)
.thenApply(
valueBytes -> {
InternalRow row =
Expand All @@ -206,14 +206,15 @@ public CompletableFuture<LookupResult> lookup(InternalRow key) {
}

@Override
public CompletableFuture<PrefixLookupResult> prefixLookup(InternalRow prefixKey) {
public CompletableFuture<PrefixLookupResult> prefixLookup(InternalRow bucketKey) {
if (!hasPrimaryKey) {
throw new FlussRuntimeException(
String.format("None-pk table %s don't support prefix lookup", tablePath));
}
// TODO: add checks the bucket key is prefix of primary key

byte[] prefixKeyBytes = bucketKeyEncoder.encode(prefixKey);
int bucketId = getBucketId(prefixKeyBytes, prefixKey);
byte[] prefixKeyBytes = bucketKeyEncoder.encode(bucketKey);
int bucketId = getBucketId(prefixKeyBytes, bucketKey);
return lookupClientSupplier
.get()
.prefixLookup(tableId, bucketId, prefixKeyBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,13 @@ public interface Table extends AutoCloseable {
* the prefix schema would also be [a, b]. This pattern can use PrefixLookup to lookup by prefix
* scan.
*
* @param prefixKey the given prefix key to do prefix lookup.
* <p>TODO: currently, the interface only support bucket key as the prefix key to lookup.
* Generalize the prefix lookup to support any prefix key including bucket key.
*
* @param bucketKey the given bucket key to do prefix lookup.
* @return the result of prefix lookup.
*/
CompletableFuture<PrefixLookupResult> prefixLookup(InternalRow prefixKey);
CompletableFuture<PrefixLookupResult> prefixLookup(InternalRow bucketKey);

/**
* Extracts limit number of rows from the given table bucket.
Expand Down
Loading

0 comments on commit 2c3fff4

Please sign in to comment.