diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestChannel.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestChannel.java index 430f18be..4dccc88e 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestChannel.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestChannel.java @@ -16,13 +16,17 @@ package com.alibaba.fluss.rpc.netty.server; +import com.alibaba.fluss.rpc.messages.FetchLogRequest; +import com.alibaba.fluss.rpc.protocol.ApiKeys; +import com.alibaba.fluss.utils.types.Tuple2; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.concurrent.ThreadSafe; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; /** A blocking queue channel that can receive requests and send responses. */ @@ -32,19 +36,21 @@ public final class RequestChannel { private final BlockingQueue requestQueue; + /** + * This queue is used to handle replica synchronization requests ({@link FetchLogRequest} from + * follower) between different machines in the Fluss cluster. The reason for using a separate + * queue is to prevent client read/write requests from affecting replica synchronization, which + * could otherwise impact the stability of the cluster. + */ + private final BlockingQueue replicaSyncQueue; + + private boolean pollFromReplicaSyncQueue; + public RequestChannel(int queueCapacity) { - this.requestQueue = - new PriorityBlockingQueue<>( - queueCapacity, - (req1, req2) -> { - // less value will be popped first - int res = Integer.compare(req2.getPriority(), req1.getPriority()); - // if priority is same, we want to keep FIFO - if (res == 0 && req1 != req2) { - res = (req1.getRequestId() < req2.getRequestId() ? -1 : 1); - } - return res; - }); + int eachQueueCapacity = queueCapacity / 2; + this.requestQueue = new ArrayBlockingQueue<>(eachQueueCapacity); + this.replicaSyncQueue = new ArrayBlockingQueue<>(eachQueueCapacity); + this.pollFromReplicaSyncQueue = true; } /** @@ -52,7 +58,12 @@ public RequestChannel(int queueCapacity) { * request. */ public void putRequest(RpcRequest request) throws Exception { - requestQueue.put(request); + if (request.getApiKey() == ApiKeys.FETCH_LOG.id + && ((FetchLogRequest) request.getMessage()).getFollowerServerId() >= 0) { + replicaSyncQueue.put(request); + } else { + requestQueue.put(request); + } } /** @@ -70,16 +81,36 @@ public void putShutdownRequest() throws Exception { * element is available. */ public RpcRequest pollRequest(long timeoutMs) { + long startTime = System.nanoTime(); + long remainingTime = timeoutMs; + Tuple2, BlockingQueue> queuePair = + pollFromReplicaSyncQueue + ? Tuple2.of(replicaSyncQueue, requestQueue) + : Tuple2.of(requestQueue, replicaSyncQueue); + pollFromReplicaSyncQueue = !pollFromReplicaSyncQueue; try { - return requestQueue.poll(timeoutMs, TimeUnit.MILLISECONDS); + while (remainingTime > 0) { + RpcRequest request = queuePair.f0.poll(1, TimeUnit.MILLISECONDS); + if (request != null) { + return request; + } + + request = queuePair.f1.poll(1, TimeUnit.MILLISECONDS); + if (request != null) { + return request; + } + + remainingTime = timeoutMs - (System.nanoTime() - startTime); + } } catch (InterruptedException e) { LOG.warn("Interrupted while polling requests from channel queue.", e); return null; } + return null; } /** Get the number of requests in the queue. */ - int requestsCount() { - return requestQueue.size(); + public int requestsCount() { + return requestQueue.size() + replicaSyncQueue.size(); } } diff --git a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java index 1139be25..c68098e8 100644 --- a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java +++ b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java @@ -34,13 +34,13 @@ public class RequestChannelTest { @Test - void testRequestPriority() throws Exception { + void testDifferentRequestQueue() throws Exception { RequestChannel channel = new RequestChannel(100); - // 1. request with same priority score. Use FIFO. + // 1. push normal requests. Use FIFO. List rpcRequests = new ArrayList<>(); - // push rpc requests - for (int i = 0; i < 100; i++) { + // push rpc requests. For normal requests, currently, the queue size is 50, capacity/2. + for (int i = 0; i < 50; i++) { RpcRequest rpcRequest = new RpcRequest( ApiKeys.GET_TABLE.id, @@ -53,36 +53,49 @@ void testRequestPriority() throws Exception { channel.putRequest(rpcRequest); rpcRequests.add(rpcRequest); } + assertThat(channel.requestsCount()).isEqualTo(50); // pop rpc requests - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 50; i++) { RpcRequest gotRequest = channel.pollRequest(100); assertThat(gotRequest).isEqualTo(rpcRequests.get(i)); } - // 2. request with different priority score. Should be ordered by priority score. - RpcRequest rpcRequest1 = - new RpcRequest( - ApiKeys.GET_TABLE.id, - (short) 0, - 3, - null, - new GetTableRequest(), - new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)), - null); - RpcRequest rpcRequest2 = - new RpcRequest( - ApiKeys.FETCH_LOG.id, - (short) 0, - 100, - null, - new FetchLogRequest().setMaxBytes(100).setFollowerServerId(2), - new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)), - null); - channel.putRequest(rpcRequest1); - channel.putRequest(rpcRequest2); - RpcRequest rpcRequest = channel.pollRequest(100); - assertThat(rpcRequest).isEqualTo(rpcRequest2); - rpcRequest = channel.pollRequest(100); - assertThat(rpcRequest).isEqualTo(rpcRequest1); + rpcRequests.clear(); + // 2. push normal requests and replicaSync requests together. The same type of requests + // should be popped out in FIFO. Normal requests and replicaSync requests will be processed + // one by one. + for (int i = 0; i < 100; i++) { + RpcRequest rpcRequest; + if (i % 2 == 0) { + rpcRequest = + new RpcRequest( + ApiKeys.FETCH_LOG.id, + (short) 0, + 100, + null, + new FetchLogRequest().setMaxBytes(100).setFollowerServerId(2), + new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)), + null); + } else { + rpcRequest = + new RpcRequest( + ApiKeys.GET_TABLE.id, + (short) 0, + 3, + null, + new GetTableRequest(), + new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)), + null); + } + channel.putRequest(rpcRequest); + rpcRequests.add(rpcRequest); + } + + assertThat(channel.requestsCount()).isEqualTo(100); + // pop rpc requests + for (int i = 0; i < 100; i++) { + RpcRequest gotRequest = channel.pollRequest(100); + assertThat(gotRequest).isEqualTo(rpcRequests.get(i)); + } } }