Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Fix config 'netty.server.max-queued-requests' don't work which cause server receive unlimited requests error #276

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

package com.alibaba.fluss.rpc.netty.server;

import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.rpc.protocol.ApiKeys;
import com.alibaba.fluss.utils.types.Tuple2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.ThreadSafe;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

/** A blocking queue channel that can receive requests and send responses. */
Expand All @@ -32,27 +36,34 @@ public final class RequestChannel {

private final BlockingQueue<RpcRequest> requestQueue;

/**
* This queue is used to handle replica synchronization requests ({@link FetchLogRequest} from
* follower) between different machines in the Fluss cluster. The reason for using a separate
* queue is to prevent client read/write requests from affecting replica synchronization, which
* could otherwise impact the stability of the cluster.
*/
private final BlockingQueue<RpcRequest> replicaSyncQueue;

private boolean pollFromReplicaSyncQueue;

public RequestChannel(int queueCapacity) {
this.requestQueue =
new PriorityBlockingQueue<>(
queueCapacity,
(req1, req2) -> {
// less value will be popped first
int res = Integer.compare(req2.getPriority(), req1.getPriority());
// if priority is same, we want to keep FIFO
if (res == 0 && req1 != req2) {
res = (req1.getRequestId() < req2.getRequestId() ? -1 : 1);
}
return res;
});
int eachQueueCapacity = queueCapacity / 2;
this.requestQueue = new ArrayBlockingQueue<>(eachQueueCapacity);
this.replicaSyncQueue = new ArrayBlockingQueue<>(eachQueueCapacity);
this.pollFromReplicaSyncQueue = true;
}

/**
* Send a request to be handled, potentially blocking until there is room in the queue for the
* request.
*/
public void putRequest(RpcRequest request) throws Exception {
requestQueue.put(request);
if (request.getApiKey() == ApiKeys.FETCH_LOG.id
&& ((FetchLogRequest) request.getMessage()).getFollowerServerId() >= 0) {
replicaSyncQueue.put(request);
} else {
requestQueue.put(request);
}
}

/**
Expand All @@ -70,16 +81,36 @@ public void putShutdownRequest() throws Exception {
* element is available.
*/
public RpcRequest pollRequest(long timeoutMs) {
long startTime = System.nanoTime();
long remainingTime = timeoutMs;
Tuple2<BlockingQueue<RpcRequest>, BlockingQueue<RpcRequest>> queuePair =
pollFromReplicaSyncQueue
? Tuple2.of(replicaSyncQueue, requestQueue)
: Tuple2.of(requestQueue, replicaSyncQueue);
pollFromReplicaSyncQueue = !pollFromReplicaSyncQueue;
try {
return requestQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
while (remainingTime > 0) {
RpcRequest request = queuePair.f0.poll(1, TimeUnit.MILLISECONDS);
if (request != null) {
return request;
}

request = queuePair.f1.poll(1, TimeUnit.MILLISECONDS);
if (request != null) {
return request;
}

remainingTime = timeoutMs - (System.nanoTime() - startTime);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while polling requests from channel queue.", e);
return null;
}
return null;
}

/** Get the number of requests in the queue. */
int requestsCount() {
return requestQueue.size();
public int requestsCount() {
return requestQueue.size() + replicaSyncQueue.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
public class RequestChannelTest {

@Test
void testRequestPriority() throws Exception {
void testDifferentRequestQueue() throws Exception {
RequestChannel channel = new RequestChannel(100);

// 1. request with same priority score. Use FIFO.
// 1. push normal requests. Use FIFO.
List<RpcRequest> rpcRequests = new ArrayList<>();
// push rpc requests
for (int i = 0; i < 100; i++) {
// push rpc requests. For normal requests, currently, the queue size is 50, capacity/2.
for (int i = 0; i < 50; i++) {
RpcRequest rpcRequest =
new RpcRequest(
ApiKeys.GET_TABLE.id,
Expand All @@ -53,36 +53,49 @@ void testRequestPriority() throws Exception {
channel.putRequest(rpcRequest);
rpcRequests.add(rpcRequest);
}
assertThat(channel.requestsCount()).isEqualTo(50);
// pop rpc requests
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 50; i++) {
RpcRequest gotRequest = channel.pollRequest(100);
assertThat(gotRequest).isEqualTo(rpcRequests.get(i));
}

// 2. request with different priority score. Should be ordered by priority score.
RpcRequest rpcRequest1 =
new RpcRequest(
ApiKeys.GET_TABLE.id,
(short) 0,
3,
null,
new GetTableRequest(),
new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)),
null);
RpcRequest rpcRequest2 =
new RpcRequest(
ApiKeys.FETCH_LOG.id,
(short) 0,
100,
null,
new FetchLogRequest().setMaxBytes(100).setFollowerServerId(2),
new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)),
null);
channel.putRequest(rpcRequest1);
channel.putRequest(rpcRequest2);
RpcRequest rpcRequest = channel.pollRequest(100);
assertThat(rpcRequest).isEqualTo(rpcRequest2);
rpcRequest = channel.pollRequest(100);
assertThat(rpcRequest).isEqualTo(rpcRequest1);
rpcRequests.clear();
// 2. push normal requests and replicaSync requests together. The same type of requests
// should be popped out in FIFO. Normal requests and replicaSync requests will be processed
// one by one.
for (int i = 0; i < 100; i++) {
RpcRequest rpcRequest;
if (i % 2 == 0) {
rpcRequest =
new RpcRequest(
ApiKeys.FETCH_LOG.id,
(short) 0,
100,
null,
new FetchLogRequest().setMaxBytes(100).setFollowerServerId(2),
new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)),
null);
} else {
rpcRequest =
new RpcRequest(
ApiKeys.GET_TABLE.id,
(short) 0,
3,
null,
new GetTableRequest(),
new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)),
null);
}
channel.putRequest(rpcRequest);
rpcRequests.add(rpcRequest);
}

assertThat(channel.requestsCount()).isEqualTo(100);
// pop rpc requests
for (int i = 0; i < 100; i++) {
RpcRequest gotRequest = channel.pollRequest(100);
assertThat(gotRequest).isEqualTo(rpcRequests.get(i));
}
}
}