From 3aed98514d5603a89070e3f149c93bb9c1291873 Mon Sep 17 00:00:00 2001 From: sl_li Date: Fri, 29 Nov 2024 17:47:28 +0800 Subject: [PATCH 1/2] avoid concurrent master reconnecting --- .../impl/AbstractRedisMasterReplication.java | 21 ++++++- .../DefaultRedisMasterReplicationTest.java | 55 ++++++++++++++++++- 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java index 1da0359b88..006fa35868 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java @@ -53,6 +53,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI; @@ -90,7 +91,7 @@ public abstract class AbstractRedisMasterReplication extends AbstractLifecycle i protected long connectedTime; - protected Channel masterChannel; + protected volatile Channel masterChannel; private NioEventLoopGroup nioEventLoopGroup; @@ -104,6 +105,8 @@ public abstract class AbstractRedisMasterReplication extends AbstractLifecycle i protected KeeperResourceManager resourceManager; + private AtomicInteger runningReconnect = new AtomicInteger(0); + public AbstractRedisMasterReplication(RedisKeeperServer redisKeeperServer, RedisMaster redisMaster, NioEventLoopGroup nioEventLoopGroup, ScheduledExecutorService scheduled, KeeperResourceManager resourceManager) { @@ -184,16 +187,22 @@ protected void scheduleReconnect(int timeMilli) { logger.info("[scheduleReconnect][do not connect, is stopped!!]{}", redisMaster.masterEndPoint()); return; } + if (!runningReconnect.compareAndSet(0, 1)) { + logger.info("[scheduleReconnect][multi reconnect, skip] {}", runningReconnect.get()); + return; + } scheduled.schedule(new AbstractExceptionLogTask() { @Override public void doRun() { try { + runningReconnect.set(0); connectWithMaster(); } catch (Throwable th) { logger.error("[scheduleReconnect]" + AbstractRedisMasterReplication.this, th); } } - }, timeMilli, TimeUnit.MILLISECONDS); + }, Math.max(10, timeMilli), TimeUnit.MILLISECONDS); + // at least wait 10ms so that runningReconnect checking can avoid concurrent reconnecting } protected abstract void doConnect(Bootstrap b); @@ -229,6 +238,13 @@ protected void onReceiveMessage(int messageLength) { @SuppressWarnings({"unchecked", "rawtypes"}) @Override public void masterConnected(Channel channel) { + logger.info("[masterConnected] {}", ChannelUtil.getDesc(channel)); + if (null != masterChannel) { + logger.info("[masterConnected][unexpected connected channel][{}][{}] reset channels", ChannelUtil.getDesc(masterChannel), ChannelUtil.getDesc(channel)); + masterChannel.close(); + channel.close(); + return; + } connectedTime = System.currentTimeMillis(); this.masterChannel = channel; @@ -320,6 +336,7 @@ public void operationComplete(ChannelFuture future) throws Exception { public void masterDisconnected(Channel channel) { logger.info("[masterDisconnected]{}", channel); + this.masterChannel = null; dumpFail(new PsyncMasterDisconnectedException(channel)); } diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisMasterReplicationTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisMasterReplicationTest.java index 693342d9b9..a8552caf7c 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisMasterReplicationTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisMasterReplicationTest.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import static com.ctrip.xpipe.redis.keeper.impl.AbstractRedisMasterReplication.KEY_MASTER_CONNECT_RETRY_DELAY_SECONDS; import static org.mockito.Mockito.*; @@ -73,7 +74,7 @@ public void beforeDefaultRedisMasterReplicationTest() throws Exception { MockitoAnnotations.initMocks(this); - nioEventLoopGroup = new NioEventLoopGroup(); + nioEventLoopGroup = new NioEventLoopGroup(1); AbstractRedisMasterReplication.DEFAULT_REPLICATION_TIMEOUT_MILLI = replTimeoutMilli; defaultRedisMasterReplication = new DefaultRedisMasterReplication(redisMaster, redisKeeperServer, nioEventLoopGroup, @@ -197,6 +198,58 @@ public void testReconnectAfterTryConnectThroughException() throws Exception { .count() == 2); } + @Test + public void testMultiReconnect() throws Exception { + AbstractRedisMasterReplication.DEFAULT_REPLICATION_TIMEOUT_MILLI = 10000; + Server server = startEmptyServer(); + when(redisMaster.masterEndPoint()).thenReturn(new DefaultEndPoint("127.0.0.1", server.getPort())); + + DefaultRedisMasterReplication mockReplication = new DefaultRedisMasterReplication(redisMaster, redisKeeperServer, nioEventLoopGroup, + scheduled, proxyResourceManager) { + @Override + protected void doInitialize() throws Exception { + } + + @Override + protected void doStart() throws Exception { + } + }; + mockReplication.setMasterConnectRetryDelaySeconds(0); + mockReplication.initialize(); + mockReplication.start(); + + mockReplication.scheduleReconnect(0); + mockReplication.scheduleReconnect(0); + Thread.sleep(1000); + Assert.assertEquals(1, server.getConnected()); + } + + @Test + public void testMultiConnect() throws Exception { + AbstractRedisMasterReplication.DEFAULT_REPLICATION_TIMEOUT_MILLI = 10000; + Server server = startEmptyServer(); + when(redisMaster.masterEndPoint()).thenReturn(new DefaultEndPoint("127.0.0.1", server.getPort())); + + DefaultRedisMasterReplication mockReplication = new DefaultRedisMasterReplication(redisMaster, redisKeeperServer, nioEventLoopGroup, + scheduled, proxyResourceManager) { + @Override + protected void doInitialize() throws Exception { + } + + @Override + protected void doStart() throws Exception { + } + }; + mockReplication.setMasterConnectRetryDelaySeconds(0); + mockReplication.initialize(); + mockReplication.start(); + + mockReplication.connectWithMaster(); + mockReplication.connectWithMaster(); + Thread.sleep(1000); + Assert.assertEquals(1, server.getConnected()); + } + @After public void afterDefaultRedisMasterReplicationTest() throws Exception { nioEventLoopGroup.shutdownGracefully(); From 5e9705b4d934b44bdb950aa80bc9876f9ab8db8a Mon Sep 17 00:00:00 2001 From: sl_li Date: Thu, 26 Dec 2024 18:57:19 +0800 Subject: [PATCH 2/2] avoid reset masterChannel for unknown channel, it may happen after multi channel connected at the same time --- .../keeper/impl/AbstractRedisMasterReplication.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java index 006fa35868..856f718f3a 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java @@ -335,8 +335,13 @@ public void operationComplete(ChannelFuture future) throws Exception { @Override public void masterDisconnected(Channel channel) { - logger.info("[masterDisconnected]{}", channel); - this.masterChannel = null; + if (channel.equals(this.masterChannel)) { + logger.info("[masterDisconnected]{}", channel); + this.masterChannel = null; + } else { + logger.info("[masterDisconnected][unexpected disconnected channel][{}][{}] ignore", + ChannelUtil.getDesc(masterChannel), ChannelUtil.getDesc(channel)); + } dumpFail(new PsyncMasterDisconnectedException(channel)); }