Skip to content

Commit

Permalink
Merge pull request #916 from ctripcorp/bugfix/keeper_fsync
Browse files Browse the repository at this point in the history
avoid concurrent master reconnecting
  • Loading branch information
LanternLee authored Dec 27, 2024
2 parents c5033b5 + 5e9705b commit b206cae
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI;
Expand Down Expand Up @@ -90,7 +91,7 @@ public abstract class AbstractRedisMasterReplication extends AbstractLifecycle i

protected long connectedTime;

protected Channel masterChannel;
protected volatile Channel masterChannel;

private NioEventLoopGroup nioEventLoopGroup;

Expand All @@ -104,6 +105,8 @@ public abstract class AbstractRedisMasterReplication extends AbstractLifecycle i

protected KeeperResourceManager resourceManager;

private AtomicInteger runningReconnect = new AtomicInteger(0);

public AbstractRedisMasterReplication(RedisKeeperServer redisKeeperServer, RedisMaster redisMaster, NioEventLoopGroup nioEventLoopGroup,
ScheduledExecutorService scheduled, KeeperResourceManager resourceManager) {

Expand Down Expand Up @@ -184,16 +187,22 @@ protected void scheduleReconnect(int timeMilli) {
logger.info("[scheduleReconnect][do not connect, is stopped!!]{}", redisMaster.masterEndPoint());
return;
}
if (!runningReconnect.compareAndSet(0, 1)) {
logger.info("[scheduleReconnect][multi reconnect, skip] {}", runningReconnect.get());
return;
}
scheduled.schedule(new AbstractExceptionLogTask() {
@Override
public void doRun() {
try {
runningReconnect.set(0);
connectWithMaster();
} catch (Throwable th) {
logger.error("[scheduleReconnect]" + AbstractRedisMasterReplication.this, th);
}
}
}, timeMilli, TimeUnit.MILLISECONDS);
}, Math.max(10, timeMilli), TimeUnit.MILLISECONDS);
// at least wait 10ms so that runningReconnect checking can avoid concurrent reconnecting
}

protected abstract void doConnect(Bootstrap b);
Expand Down Expand Up @@ -229,6 +238,13 @@ protected void onReceiveMessage(int messageLength) {
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public void masterConnected(Channel channel) {
logger.info("[masterConnected] {}", ChannelUtil.getDesc(channel));
if (null != masterChannel) {
logger.info("[masterConnected][unexpected connected channel][{}][{}] reset channels", ChannelUtil.getDesc(masterChannel), ChannelUtil.getDesc(channel));
masterChannel.close();
channel.close();
return;
}

connectedTime = System.currentTimeMillis();
this.masterChannel = channel;
Expand Down Expand Up @@ -319,7 +335,13 @@ public void operationComplete(ChannelFuture future) throws Exception {
@Override
public void masterDisconnected(Channel channel) {

logger.info("[masterDisconnected]{}", channel);
if (channel.equals(this.masterChannel)) {
logger.info("[masterDisconnected]{}", channel);
this.masterChannel = null;
} else {
logger.info("[masterDisconnected][unexpected disconnected channel][{}][{}] ignore",
ChannelUtil.getDesc(masterChannel), ChannelUtil.getDesc(channel));
}
dumpFail(new PsyncMasterDisconnectedException(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import static com.ctrip.xpipe.redis.keeper.impl.AbstractRedisMasterReplication.KEY_MASTER_CONNECT_RETRY_DELAY_SECONDS;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -73,7 +74,7 @@ public void beforeDefaultRedisMasterReplicationTest() throws Exception {

MockitoAnnotations.initMocks(this);

nioEventLoopGroup = new NioEventLoopGroup();
nioEventLoopGroup = new NioEventLoopGroup(1);

AbstractRedisMasterReplication.DEFAULT_REPLICATION_TIMEOUT_MILLI = replTimeoutMilli;
defaultRedisMasterReplication = new DefaultRedisMasterReplication(redisMaster, redisKeeperServer, nioEventLoopGroup,
Expand Down Expand Up @@ -197,6 +198,58 @@ public void testReconnectAfterTryConnectThroughException() throws Exception {
.count() == 2);
}

@Test
public void testMultiReconnect() throws Exception {
AbstractRedisMasterReplication.DEFAULT_REPLICATION_TIMEOUT_MILLI = 10000;
Server server = startEmptyServer();
when(redisMaster.masterEndPoint()).thenReturn(new DefaultEndPoint("127.0.0.1", server.getPort()));

DefaultRedisMasterReplication mockReplication = new DefaultRedisMasterReplication(redisMaster, redisKeeperServer, nioEventLoopGroup,
scheduled, proxyResourceManager) {
@Override
protected void doInitialize() throws Exception {
}

@Override
protected void doStart() throws Exception {
}
};
mockReplication.setMasterConnectRetryDelaySeconds(0);
mockReplication.initialize();
mockReplication.start();

mockReplication.scheduleReconnect(0);
mockReplication.scheduleReconnect(0);
Thread.sleep(1000);
Assert.assertEquals(1, server.getConnected());
}

@Test
public void testMultiConnect() throws Exception {
AbstractRedisMasterReplication.DEFAULT_REPLICATION_TIMEOUT_MILLI = 10000;
Server server = startEmptyServer();
when(redisMaster.masterEndPoint()).thenReturn(new DefaultEndPoint("127.0.0.1", server.getPort()));

DefaultRedisMasterReplication mockReplication = new DefaultRedisMasterReplication(redisMaster, redisKeeperServer, nioEventLoopGroup,
scheduled, proxyResourceManager) {
@Override
protected void doInitialize() throws Exception {
}

@Override
protected void doStart() throws Exception {
}
};
mockReplication.setMasterConnectRetryDelaySeconds(0);
mockReplication.initialize();
mockReplication.start();

mockReplication.connectWithMaster();
mockReplication.connectWithMaster();
Thread.sleep(1000);
Assert.assertEquals(1, server.getConnected());
}

@After
public void afterDefaultRedisMasterReplicationTest() throws Exception {
nioEventLoopGroup.shutdownGracefully();
Expand Down

0 comments on commit b206cae

Please sign in to comment.