From 5bbda38e8c25ff62fa66cecc06d887e7fdd54f25 Mon Sep 17 00:00:00 2001 From: lishanglin Date: Thu, 9 Nov 2023 14:30:38 +0800 Subject: [PATCH] support exchange region between clusters (#740) * metaserver observer multi keeper-leader zk path * migrate keeper from clusterId-shardId to replId * fix keeper repl timeout found late * Metaserver support shards exchange between clusters * fix asymmetric cluster delay check down --------- Co-authored-by: lishanglin --- .../com/ctrip/xpipe/utils/log/MDCUtil.java | 24 ++++- .../xpipe-keeper/config/log4j2.xml | 4 +- .../src/main/config/log4j2-uat.xml | 4 +- .../src/main/config/log4j2.xml | 4 +- .../actions/delay/MetricDelayListener.java | 2 + .../impl/ProxyConnectedChecker.java | 1 - .../redis/core/entity/KeeperInstanceMeta.java | 7 +- .../redis/core/entity/KeeperTransMeta.java | 24 ++++- .../container/KeeperContainerService.java | 4 +- .../xpipe/redis/core/meta/MetaZkConfig.java | 9 ++ .../comparator/ClusterMetaComparator.java | 41 +++++++-- .../meta/comparator/DcMetaComparator.java | 43 ++++++++- .../ctrip/xpipe/redis/core/store/ReplId.java | 48 ++++++++++ .../core/store/ReplicationStoreManager.java | 4 +- .../AbstractIntegratedTest.java | 12 ++- .../console/cmd/ServerStartCmd.java | 3 +- .../integratedtest/keeper/KeeperPsync2.java | 2 +- .../integratedtest/keeper/KeeperSingleDc.java | 2 +- .../keeper/KeeperSingleDcWaitForOffset.java | 2 +- .../keeper/XRedisPartialTest.java | 2 +- .../keeper/manul/BadKeeper.java | 2 +- .../keeper/manul/GivenDataKeeper.java | 2 +- .../KeeperCloseConnectionAfterPsync.java | 2 +- .../src/test/resources/log4j2.xml | 4 +- .../xpipe/redis/keeper/RedisKeeperServer.java | 9 +- .../xsync/DefaultXsyncReplication.java | 3 - .../container/KeeperContainerController.java | 24 ++++- .../container/KeeperContainerService.java | 49 ++++------ .../keeper/AbstractSyncCommandHandler.java | 2 +- .../impl/AbstractRedisMasterReplication.java | 4 +- .../keeper/impl/DefaultRedisKeeperServer.java | 59 ++++++------ .../redis/keeper/impl/DefaultRedisSlave.java | 21 +---- .../redis/keeper/impl/XsyncRedisSlave.java | 19 ++++ .../monitor/impl/DefaultKeeperMonitor.java | 2 +- .../monitor/impl/DefaultKeeperStats.java | 14 +-- .../keeper/netty/NettyMasterHandler.java | 6 +- .../redis/keeper/netty/NettySlaveHandler.java | 4 +- .../store/DefaultReplicationStoreManager.java | 91 ++++++++++++++----- .../util/KeeperReplIdAwareThreadFactory.java | 46 ++++++++++ .../redis/keeper/AbstractFakeRedisTest.java | 4 +- .../AbstractRedisKeeperContextTest.java | 22 +++-- .../redis/keeper/AbstractRedisKeeperTest.java | 26 +++--- .../container/KeeperContainerServiceTest.java | 9 +- .../handler/keeper/PsyncHandlerTest.java | 2 +- .../hetero/manual/KeeperServerTest.java | 2 +- .../impl/DefaultRedisKeeperServerTest.java | 8 +- .../impl/GtidRedisKeeperServerTest.java | 2 +- .../fakeredis/FakeRedisExceptionTest.java | 4 +- .../impl/fakeredis/PsyncForKeeperTest.java | 5 +- .../DefaultReplicationStoreManagerTest.java | 19 +--- .../src/test/resources/log4j2.xml | 4 +- .../manager/DefaultApplierManager.java | 6 ++ .../DefaultKeeperContainerService.java | 10 ++ .../elect/DefaultKeeperElectorManager.java | 8 ++ .../keeper/manager/AbstractKeeperCommand.java | 3 + .../keeper/manager/AddKeeperCommand.java | 22 +++++ .../keeper/manager/DefaultKeeperManager.java | 8 +- .../keeper/manager/DeleteKeeperCommand.java | 3 + .../redis/meta/server/meta/CurrentMeta.java | 9 +- .../server/meta/impl/DefaultDcMetaCache.java | 1 + .../meta/server/AbstractMetaServerTest.java | 28 +++++- .../manager/DefaultApplierManagerTest.java | 6 +- .../AbstractKeeperElectorManagerTest.java | 6 ++ .../MultiPathKeeperElectorManagerTest.java | 35 +++++++ .../keeper/manager/AddKeeperCommandTest.java | 26 +++++- .../manager/DefaultKeeperManagerTest.java | 34 ++++++- .../meta/server/meta/CurrentMetaTest.java | 32 +++++++ .../impl/DefaultCurrentMetaManagerTest.java | 1 + .../src/test/resources/meta-test.xml | 12 +++ .../src/test/resources/log4j2.xml | 4 +- 70 files changed, 721 insertions(+), 245 deletions(-) create mode 100644 redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ReplId.java create mode 100644 redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/util/KeeperReplIdAwareThreadFactory.java diff --git a/core/src/main/java/com/ctrip/xpipe/utils/log/MDCUtil.java b/core/src/main/java/com/ctrip/xpipe/utils/log/MDCUtil.java index bcf7aa16b4..09a72b60ec 100644 --- a/core/src/main/java/com/ctrip/xpipe/utils/log/MDCUtil.java +++ b/core/src/main/java/com/ctrip/xpipe/utils/log/MDCUtil.java @@ -11,16 +11,23 @@ */ public class MDCUtil { - protected static String MDC_KEY = "xpipe.cluster.shard"; + protected static String MDC_KEY_CLUSTER_SHARD = "xpipe.cluster.shard"; + + protected static String MDC_KEY_KEEPER_REPL = "xpipe.keeper.repl"; public static void setClusterShard(String cluster, String shard) { - MDC.put(MDC_KEY, StringUtil.makeSimpleName(cluster, shard)); + MDC.put(MDC_KEY_CLUSTER_SHARD, StringUtil.makeSimpleName(cluster, shard)); + } + + public static void setKeeperRepl(String replId) { + + MDC.put(MDC_KEY_KEEPER_REPL, StringUtil.makeSimpleName(replId, null)); } @VisibleForTesting protected static String getClusterShard() { - return MDC.get(MDC_KEY); + return MDC.get(MDC_KEY_CLUSTER_SHARD); } @@ -35,4 +42,15 @@ public void run() { }; } + public static Runnable decorateKeeperReplMDC(Runnable r, String replId) { + return new Runnable() { + + @Override + public void run() { + MDCUtil.setKeeperRepl(replId); + r.run(); + } + }; + } + } diff --git a/redis/dockerPackage/xpipe-keeper/config/log4j2.xml b/redis/dockerPackage/xpipe-keeper/config/log4j2.xml index aa6793ddea..964bcb57b6 100644 --- a/redis/dockerPackage/xpipe-keeper/config/log4j2.xml +++ b/redis/dockerPackage/xpipe-keeper/config/log4j2.xml @@ -8,12 +8,12 @@ - + - + diff --git a/redis/package/redis-keeper-package/src/main/config/log4j2-uat.xml b/redis/package/redis-keeper-package/src/main/config/log4j2-uat.xml index 195bd83671..d820dceb45 100644 --- a/redis/package/redis-keeper-package/src/main/config/log4j2-uat.xml +++ b/redis/package/redis-keeper-package/src/main/config/log4j2-uat.xml @@ -8,12 +8,12 @@ - + - + diff --git a/redis/package/redis-keeper-package/src/main/config/log4j2.xml b/redis/package/redis-keeper-package/src/main/config/log4j2.xml index 800f0ec496..f25cb1c42c 100644 --- a/redis/package/redis-keeper-package/src/main/config/log4j2.xml +++ b/redis/package/redis-keeper-package/src/main/config/log4j2.xml @@ -8,12 +8,12 @@ - + - + diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/delay/MetricDelayListener.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/delay/MetricDelayListener.java index 850cfc2c7f..13cec7c7d0 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/delay/MetricDelayListener.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/delay/MetricDelayListener.java @@ -44,6 +44,8 @@ private MetricData getPoint(DelayActionContext context) { data.addTag("crossRegion", String.valueOf(info.isCrossRegion())); if (context instanceof HeteroDelayActionContext) { data.addTag("srcShardId", String.valueOf(((HeteroDelayActionContext) context).getShardDbId())); + } else { + data.addTag("srcShardId", "-"); } return data; } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java index 1bcdae3e0c..394184c1ab 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java @@ -13,7 +13,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.net.InetSocketAddress; import java.net.Socket; diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/entity/KeeperInstanceMeta.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/entity/KeeperInstanceMeta.java index 6f544347f3..a0186d46be 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/entity/KeeperInstanceMeta.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/entity/KeeperInstanceMeta.java @@ -1,7 +1,6 @@ package com.ctrip.xpipe.redis.core.entity; -import com.ctrip.xpipe.redis.core.store.ClusterId; -import com.ctrip.xpipe.redis.core.store.ShardId; +import com.ctrip.xpipe.redis.core.store.ReplId; /** * @author wenchao.meng @@ -15,8 +14,8 @@ public KeeperInstanceMeta(){ } - public KeeperInstanceMeta(ClusterId clusterId, ShardId shardId, KeeperMeta keeperMeta) { - super(clusterId.id(), shardId.id(), keeperMeta); + public KeeperInstanceMeta(ReplId replId, KeeperMeta keeperMeta) { + super(replId.id(), keeperMeta); } } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/entity/KeeperTransMeta.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/entity/KeeperTransMeta.java index 2159814188..623d88d669 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/entity/KeeperTransMeta.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/entity/KeeperTransMeta.java @@ -13,6 +13,8 @@ public class KeeperTransMeta { private Long clusterDbId; private Long shardDbId; + + private Long replId; private KeeperMeta keeperMeta; @@ -20,8 +22,17 @@ public class KeeperTransMeta { public KeeperTransMeta() {} public KeeperTransMeta(Long clusterDbId, Long shardDbId, KeeperMeta keeperMeta) { + this(clusterDbId, shardDbId, null, keeperMeta); + } + + public KeeperTransMeta(Long replId, KeeperMeta keeperMeta) { + this(null, null, replId, keeperMeta); + } + + public KeeperTransMeta(Long clusterDbId, Long shardDbId, Long replId, KeeperMeta keeperMeta) { this.clusterDbId = clusterDbId; this.shardDbId = shardDbId; + this.replId = replId; this.keeperMeta = keeperMeta; } @@ -49,6 +60,14 @@ public void setShardDbId(Long shardDbId) { this.shardDbId = shardDbId; } + public Long getReplId() { + return replId; + } + + public void setReplId(Long replId) { + this.replId = replId; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -56,16 +75,17 @@ public boolean equals(Object o) { KeeperTransMeta that = (KeeperTransMeta) o; return Objects.equals(clusterDbId, that.clusterDbId) && Objects.equals(shardDbId, that.shardDbId) && + Objects.equals(replId, ((KeeperTransMeta) o).replId) && Objects.equals(keeperMeta, that.keeperMeta); } @Override public int hashCode() { - return Objects.hash(clusterDbId, shardDbId, keeperMeta); + return Objects.hash(clusterDbId, shardDbId, replId, keeperMeta); } @Override public String toString() { - return String.format("[%d,%d-%s:%d]", clusterDbId, shardDbId, keeperMeta.getIp(), keeperMeta.getPort()); + return String.format("[%d,%d-%d-%s:%d]", clusterDbId, shardDbId, replId, keeperMeta.getIp(), keeperMeta.getPort()); } } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/keeper/container/KeeperContainerService.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/keeper/container/KeeperContainerService.java index 6911cf9914..915d595c05 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/keeper/container/KeeperContainerService.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/keeper/container/KeeperContainerService.java @@ -11,7 +11,9 @@ * Aug 2, 2016 */ public interface KeeperContainerService { - + + KeeperInstanceMeta infoPort(int port); + void addKeeper(KeeperTransMeta keeperTransMeta); void addOrStartKeeper(KeeperTransMeta keeperTransMeta); diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/MetaZkConfig.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/MetaZkConfig.java index dee666aade..cfa2fd6e0e 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/MetaZkConfig.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/MetaZkConfig.java @@ -5,6 +5,7 @@ import com.ctrip.xpipe.redis.core.entity.ApplierMeta; import com.ctrip.xpipe.redis.core.entity.KeeperMeta; import com.ctrip.xpipe.redis.core.store.ClusterId; +import com.ctrip.xpipe.redis.core.store.ReplId; import com.ctrip.xpipe.redis.core.store.ShardId; /** @@ -59,6 +60,14 @@ public static String getApplierLeaderLatchPath(String clusterId, String shardId) return path; } + public static String getKeeperLeaderLatchPath(long replId){ + return String.format("%s/repl_%d", getZkLeaderLatchRootPath(), replId); + } + + public static String getKeeperLeaderLatchPath(ReplId replId){ + return String.format("%s/%s", getZkLeaderLatchRootPath(), replId.toString()); + } + public static String getKeeperLeaderLatchPath(long clusterDbId, long shardDbId) { return String.format("%s/cluster_%d/shard_%d", getZkLeaderLatchRootPath(), clusterDbId, shardDbId); } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/comparator/ClusterMetaComparator.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/comparator/ClusterMetaComparator.java index c13af6fd34..82640d44c4 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/comparator/ClusterMetaComparator.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/comparator/ClusterMetaComparator.java @@ -4,7 +4,9 @@ import com.ctrip.xpipe.redis.core.entity.ShardMeta; import org.unidal.tuple.Triple; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * @author wenchao.meng @@ -17,28 +19,51 @@ public class ClusterMetaComparator extends AbstractMetaComparator{ private ClusterMeta future; + private Map currentGlobalShards; + private Map futureGlobalShards; + public ClusterMetaComparator(ClusterMeta current, ClusterMeta future) { this.current = current; this.future = future; } + public void setShardMigrateSupport(Map currentGlobalShards, Map futureGlobalShards) { + this.currentGlobalShards = currentGlobalShards; + this.futureGlobalShards = futureGlobalShards; + } + + private Map extractShardsInCluster(ClusterMeta clusterMeta) { + return clusterMeta.getAllShards().values().stream() + .collect(Collectors.toMap(ShardMeta::getDbId, shardMeta -> shardMeta)); + } + @Override public void compare() { configChanged = checkShallowChange(current, future); - Triple, Set, Set> result = getDiff(current.getAllShards().keySet(), future.getAllShards().keySet()); + Map currentShards = extractShardsInCluster(current); + Map futureShards = extractShardsInCluster(future); + Triple, Set, Set> result = getDiff(currentShards.keySet(), futureShards.keySet()); - for(String shardId : result.getFirst()){ - added.add(future.findFromAllShards(shardId)); + for(Long shardId : result.getFirst()){ + // do redundant addAndStart keeper/applier job for shards migrated in + added.add(futureShards.get(shardId)); } - for(String shardId : result.getLast()){ - removed.add(current.findFromAllShards(shardId)); + for(Long shardId : result.getLast()){ + if (null != futureGlobalShards && futureGlobalShards.containsKey(shardId)) { + // for shard migrated out, only release shard manage job but not keeper + ShardMeta currentMeta = currentShards.get(shardId); + ShardMetaComparator comparator = new ShardMetaComparator(currentMeta, null); + modified.add(comparator); + } else { + removed.add(currentShards.get(shardId)); + } } - for(String shardId : result.getMiddle()){ - ShardMeta currentMeta = current.findFromAllShards(shardId); - ShardMeta futureMeta = future.findFromAllShards(shardId); + for(Long shardId : result.getMiddle()){ + ShardMeta currentMeta = currentShards.get(shardId); + ShardMeta futureMeta = futureShards.get(shardId); if(!reflectionEquals(currentMeta, futureMeta)){ ShardMetaComparator comparator = new ShardMetaComparator(currentMeta, futureMeta); comparator.compare(); diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/comparator/DcMetaComparator.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/comparator/DcMetaComparator.java index 59e187f80a..df95f4a6bf 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/comparator/DcMetaComparator.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/comparator/DcMetaComparator.java @@ -2,10 +2,13 @@ import com.ctrip.xpipe.redis.core.entity.ClusterMeta; import com.ctrip.xpipe.redis.core.entity.DcMeta; +import com.ctrip.xpipe.redis.core.entity.ShardMeta; +import com.google.common.collect.Maps; import org.unidal.tuple.Triple; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -16,6 +19,10 @@ public class DcMetaComparator extends AbstractMetaComparator{ private DcMeta current, future; + + private Map currentShards; + private Map futureShards; + private AtomicBoolean shardMigrateSupport = new AtomicBoolean(false); public static DcMetaComparator buildComparator(DcMeta current, DcMeta future){ @@ -52,11 +59,36 @@ public DcMetaComparator(DcMeta current, DcMeta future) { this.future = future; } - public void compare(){ - Map currentClustersMap = current.getClusters().values().stream() - .collect(Collectors.toMap(ClusterMeta::getDbId, clusterMeta -> clusterMeta)); - Map futureClustersMap = future.getClusters().values().stream() + public void setShardMigrateSupport() { + if (shardMigrateSupport.compareAndSet(false, true)) { + this.currentShards = extractShardsInDc(current); + this.futureShards = extractShardsInDc(future); + } + } + + public boolean supportShardMigrate() { + return shardMigrateSupport.get(); + } + + private Map extractShardsInDc(DcMeta dcMeta) { + Map shards = Maps.newHashMap(); + dcMeta.getClusters().values().forEach(cluster -> { + cluster.getShards().values().forEach(shard -> { + shards.put(shard.getDbId(), shard); + }); + }); + + return shards; + } + + private Map extractClustersInDc(DcMeta dcMeta) { + return dcMeta.getClusters().values().stream() .collect(Collectors.toMap(ClusterMeta::getDbId, clusterMeta -> clusterMeta)); + } + + public void compare(){ + Map currentClustersMap = extractClustersInDc(current); + Map futureClustersMap = extractClustersInDc(future); Triple, Set, Set> result = getDiff(currentClustersMap.keySet(), futureClustersMap.keySet()); Set addedClusterDbIds = result.getFirst(); @@ -76,6 +108,9 @@ public void compare(){ ClusterMeta futureMeta = futureClustersMap.get(clusterDbId); if(!reflectionEquals(currentMeta, futureMeta)) { ClusterMetaComparator clusterMetaComparator = new ClusterMetaComparator(currentMeta, futureMeta); + if (supportShardMigrate()) { + clusterMetaComparator.setShardMigrateSupport(currentShards, futureShards); + } clusterMetaComparator.compare(); modified.add(clusterMetaComparator); } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ReplId.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ReplId.java new file mode 100644 index 0000000000..59304a46af --- /dev/null +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ReplId.java @@ -0,0 +1,48 @@ +package com.ctrip.xpipe.redis.core.store; + +import java.util.Objects; + +/** + * @author lishanglin + * date 2023/10/31 + */ +public class ReplId { + + private final Long id; + + private String mark = "repl_"; + + public static ReplId from(Long id) { + return new ReplId(id); + } + + public ReplId(Long id) { + this.id = id; + } + + public ReplId(String mark, Long id) { + this.id = id; + this.mark = mark; + } + + public Long id() { + return id; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReplId replId = (ReplId) o; + return Objects.equals(id, replId.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + + public String toString() { + return mark + id; + } +} diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ReplicationStoreManager.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ReplicationStoreManager.java index e30edc3551..83bd9ab8df 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ReplicationStoreManager.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ReplicationStoreManager.java @@ -29,8 +29,6 @@ public interface ReplicationStoreManager extends Destroyable, Observable, Lifec */ ReplicationStore getCurrent() throws IOException; + ReplId getReplId(); - ClusterId getClusterId(); - - ShardId getShardId(); } diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java index 8e57670450..5054eb8db9 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java @@ -59,7 +59,7 @@ public abstract class AbstractIntegratedTest extends AbstractRedisTest { private String clusterId = "cluster1", shardId = "shard1"; - private Long clusterDbId = 1L, shardDbId = 1L; + private Long clusterDbId = 1L, shardDbId = 1L, replId = 1L; private int defaultTestMessageCount = 5000; @@ -183,14 +183,16 @@ protected RedisKeeperServer createGtidRedisKeeperServer(KeeperMeta keeperMeta, F KeepersMonitorManager keeperMonitorManager, RedisOpParser redisOpParser) { - return new DefaultRedisKeeperServer(keeperMeta, keeperConfig, baseDir, + Long replId = keeperMeta.parent().getDbId(); + return new DefaultRedisKeeperServer(replId, keeperMeta, keeperConfig, baseDir, leaderElectorManager, keeperMonitorManager, resourceManager, redisOpParser); } protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeperMeta, File baseDir, KeeperConfig keeperConfig, LeaderElectorManager leaderElectorManager, KeepersMonitorManager keeperMonitorManager) { - return new DefaultRedisKeeperServer(keeperMeta, keeperConfig, baseDir, + Long replId = keeperMeta.parent().getDbId(); + return new DefaultRedisKeeperServer(replId, keeperMeta, keeperConfig, baseDir, leaderElectorManager, keeperMonitorManager, resourceManager); } @@ -322,6 +324,10 @@ public Long getShardDbId() { return shardDbId; } + public Long getReplId() { + return replId; + } + protected void sendMesssageToMasterAndTest(RedisMeta redisMaster, List slaves){ sendMesssageToMasterAndTest(defaultTestMessageCount, redisMaster, slaves); } diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/cmd/ServerStartCmd.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/cmd/ServerStartCmd.java index ad88aa0cde..fb35ac0949 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/cmd/ServerStartCmd.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/cmd/ServerStartCmd.java @@ -49,8 +49,7 @@ protected void doExecute() throws Exception { for (URL url: urls) { if(Pattern.matches(".*/redis-proxy-client/target/classes/", url.toString())) { sb.append(url); - sb.append("../redis-proxy-client-1.2.8.jar:"); - + sb.append("../redis-proxy-client-1.2.9.jar:"); } else { sb.append(url.getPath()); sb.append(":"); diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperPsync2.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperPsync2.java index 883e5b9a4a..b6e88535f6 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperPsync2.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperPsync2.java @@ -89,7 +89,7 @@ private void assertSyncCount(List redisKeeperServers, int exp partialError += redisKeeperServer.getKeeperMonitor().getKeeperStats().getPartialSyncErrorCount(); } Assert.assertEquals(0, partialError); - Assert.assertEquals(expectedSyncCnt, full); + Assert.assertEquals(redisKeeperServers.size() - 1, full); } private void initKeepers() throws Exception { diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperSingleDc.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperSingleDc.java index 0eecf17cfe..379e759d44 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperSingleDc.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperSingleDc.java @@ -31,7 +31,7 @@ public void testSignleKeeperSync() throws IOException{ public void testMakeBackupActive() throws Exception{ RedisKeeperServer redisKeeperServer = getRedisKeeperServer(backupKeeper); - Assert.assertEquals(PARTIAL_STATE.FULL, redisKeeperServer.getRedisMaster().partialState()); + Assert.assertEquals(PARTIAL_STATE.PARTIAL, redisKeeperServer.getRedisMaster().partialState()); logger.info(remarkableMessage("make keeper active{}"), backupKeeper); setKeeperState(backupKeeper, KeeperState.ACTIVE, redisMaster.getIp(), redisMaster.getPort()); diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperSingleDcWaitForOffset.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperSingleDcWaitForOffset.java index 96cf0bca14..3fa1305171 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperSingleDcWaitForOffset.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/KeeperSingleDcWaitForOffset.java @@ -20,7 +20,7 @@ public class KeeperSingleDcWaitForOffset extends AbstractKeeperIntegratedSingleD public void testMakeBackupActive() throws Exception { RedisKeeperServer redisKeeperServer = getRedisKeeperServer(backupKeeper); - Assert.assertEquals(PARTIAL_STATE.FULL, redisKeeperServer.getRedisMaster().partialState()); + Assert.assertEquals(PARTIAL_STATE.PARTIAL, redisKeeperServer.getRedisMaster().partialState()); logger.info(remarkableMessage("make keeper active to wrong addr{}"), backupKeeper); diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/XRedisPartialTest.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/XRedisPartialTest.java index e0034a107f..91bb123817 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/XRedisPartialTest.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/XRedisPartialTest.java @@ -89,7 +89,7 @@ private int getBackLogActiveCount(RedisMeta slaveMeta) throws Exception { @Override protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeperMeta, File baseDir, KeeperConfig keeperConfig, LeaderElectorManager leaderElectorManager, KeepersMonitorManager keeperMonitorManager) { - return new DefaultRedisKeeperServer(keeperMeta, keeperConfig, baseDir, leaderElectorManager, + return new DefaultRedisKeeperServer(keeperMeta.parent().getDbId(), keeperMeta, keeperConfig, baseDir, leaderElectorManager, keeperMonitorManager, resourceManager) { private int count = 0; diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/BadKeeper.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/BadKeeper.java index f5bbb96da1..7d04c4a5f3 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/BadKeeper.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/BadKeeper.java @@ -41,7 +41,7 @@ public void startTest() throws IOException{ protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeperMeta, File baseDir, KeeperConfig keeperConfig, LeaderElectorManager leaderElectorManager, KeepersMonitorManager keeperMonitorManager) { - return new DefaultRedisKeeperServer(keeperMeta, keeperConfig, baseDir, leaderElectorManager, + return new DefaultRedisKeeperServer(keeperMeta.parent().getDbId(), keeperMeta, keeperConfig, baseDir, leaderElectorManager, keeperMonitorManager, resourceManager){ @Override public void endWriteRdb() { diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/GivenDataKeeper.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/GivenDataKeeper.java index 4592bd46e0..91b051e377 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/GivenDataKeeper.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/GivenDataKeeper.java @@ -50,7 +50,7 @@ protected String getXpipeMetaConfigFile() { protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeperMeta, File baseDir, KeeperConfig keeperConfig, LeaderElectorManager leaderElectorManager, KeepersMonitorManager keeperMonitorManager) { - return new DefaultRedisKeeperServer(keeperMeta, keeperConfig, baseDir, leaderElectorManager, + return new DefaultRedisKeeperServer(keeperMeta.parent().getDbId(), keeperMeta, keeperConfig, baseDir, leaderElectorManager, keeperMonitorManager, resourceManager) { @Override public void endWriteRdb() { diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/KeeperCloseConnectionAfterPsync.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/KeeperCloseConnectionAfterPsync.java index 00487d11e3..e8138eaa54 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/KeeperCloseConnectionAfterPsync.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/keeper/manul/KeeperCloseConnectionAfterPsync.java @@ -34,7 +34,7 @@ public void testRedis() throws Exception{ @Override protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeperMeta, File baseDir, KeeperConfig keeperConfig, LeaderElectorManager leaderElectorManager, KeepersMonitorManager keeperMonitorManager) { - return new DefaultRedisKeeperServer(keeperMeta, keeperConfig, baseDir, leaderElectorManager, + return new DefaultRedisKeeperServer(keeperMeta.parent().getDbId(), keeperMeta, keeperConfig, baseDir, leaderElectorManager, keeperMonitorManager, resourceManager){ @Override diff --git a/redis/redis-integration-test/src/test/resources/log4j2.xml b/redis/redis-integration-test/src/test/resources/log4j2.xml index 939bba89e0..126352b66d 100644 --- a/redis/redis-integration-test/src/test/resources/log4j2.xml +++ b/redis/redis-integration-test/src/test/resources/log4j2.xml @@ -3,12 +3,12 @@ - + - + diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java index 22cd5e5a80..4b5ae93571 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java @@ -6,6 +6,7 @@ import com.ctrip.xpipe.redis.core.entity.KeeperMeta; import com.ctrip.xpipe.redis.core.protocal.PsyncObserver; import com.ctrip.xpipe.redis.core.store.ClusterId; +import com.ctrip.xpipe.redis.core.store.ReplId; import com.ctrip.xpipe.redis.core.store.ReplicationStore; import com.ctrip.xpipe.redis.core.store.ShardId; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; @@ -41,11 +42,9 @@ public interface RedisKeeperServer extends RedisServer, PsyncObserver, Destroyab Set slaves(); ReplicationStore getReplicationStore(); - - ClusterId getClusterId(); - - ShardId getShardId(); - + + ReplId getReplId(); + boolean compareAndDo(RedisKeeperServerState expected, Runnable action); void setRedisKeeperServerState(RedisKeeperServerState redisKeeperServerState); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultXsyncReplication.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultXsyncReplication.java index 3ec92621d5..a5721480c5 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultXsyncReplication.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultXsyncReplication.java @@ -10,9 +10,6 @@ import com.ctrip.xpipe.redis.core.protocal.cmd.DefaultXsync; import com.ctrip.xpipe.redis.keeper.applier.InstanceDependency; import com.ctrip.xpipe.utils.CloseState; -import io.netty.bootstrap.Bootstrap; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerController.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerController.java index cba334eedb..8a61c3a6df 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerController.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerController.java @@ -2,8 +2,7 @@ import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; -import com.ctrip.xpipe.redis.core.store.ClusterId; -import com.ctrip.xpipe.redis.core.store.ShardId; +import com.ctrip.xpipe.redis.core.store.ReplId; import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; import com.ctrip.xpipe.redis.keeper.ratelimit.CompositeLeakyBucket; import com.ctrip.xpipe.spring.AbstractController; @@ -13,6 +12,7 @@ import org.springframework.web.bind.annotation.*; import java.util.List; +import java.util.Optional; /** * @author Jason Song(song_s@ctrip.com) @@ -53,25 +53,39 @@ public KeeperInstanceMeta apply(RedisKeeperServer server) { return keepers; } + @GetMapping(value = "/port/{port}") + public KeeperInstanceMeta infoPort(@PathVariable int port) { + logger.info("[infoPort] {}", port); + Optional optional = keeperContainerService.list().stream() + .filter(redisKeeperServer -> redisKeeperServer.getListeningPort() == port) + .findFirst(); + + if (optional.isPresent()) { + return optional.get().getKeeperInstanceMeta(); + } else { + return new KeeperInstanceMeta(); + } + } + @RequestMapping(value = "/clusters/" + CLUSTER_NAME_PATH_VARIABLE + "/shards/" + SHARD_NAME_PATH_VARIABLE, method = RequestMethod.DELETE) public void remove(@PathVariable String clusterName, @PathVariable String shardName, @RequestBody KeeperTransMeta keeperTransMeta) { logger.info("[remove]{},{},{}", clusterName, shardName, keeperTransMeta); - keeperContainerService.remove(ClusterId.from(keeperTransMeta.getClusterDbId()), ShardId.from(keeperTransMeta.getShardDbId())); + keeperContainerService.remove(ReplId.from(keeperTransMeta.getReplId())); } @RequestMapping(value = "/clusters/" + CLUSTER_NAME_PATH_VARIABLE + "/shards/" + SHARD_NAME_PATH_VARIABLE + "/start", method = RequestMethod.PUT) public void start(@PathVariable String clusterName, @PathVariable String shardName, @RequestBody KeeperTransMeta keeperTransMeta) { logger.info("[start]{},{},{}", clusterName, shardName, keeperTransMeta); - keeperContainerService.start(ClusterId.from(keeperTransMeta.getClusterDbId()), ShardId.from(keeperTransMeta.getShardDbId())); + keeperContainerService.start(ReplId.from(keeperTransMeta.getReplId())); } @RequestMapping(value = "/clusters/" + CLUSTER_NAME_PATH_VARIABLE + "/shards/" + SHARD_NAME_PATH_VARIABLE + "/stop", method = RequestMethod.PUT) public void stop(@PathVariable String clusterName, @PathVariable String shardName, @RequestBody KeeperTransMeta keeperTransMeta) { logger.info("[stop]{},{},{}", clusterName, shardName, keeperTransMeta); - keeperContainerService.stop(ClusterId.from(keeperTransMeta.getClusterDbId()), ShardId.from(keeperTransMeta.getShardDbId())); + keeperContainerService.stop(ReplId.from(keeperTransMeta.getReplId())); } @RequestMapping(value = "/leakybucket", method = RequestMethod.GET) diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java index 8116c51975..4826985795 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java @@ -6,8 +6,7 @@ import com.ctrip.xpipe.redis.core.entity.*; import com.ctrip.xpipe.redis.core.keeper.container.KeeperContainerErrorCode; import com.ctrip.xpipe.redis.core.redis.operation.parser.GeneralRedisOpParser; -import com.ctrip.xpipe.redis.core.store.ClusterId; -import com.ctrip.xpipe.redis.core.store.ShardId; +import com.ctrip.xpipe.redis.core.store.ReplId; import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; import com.ctrip.xpipe.redis.keeper.config.KeeperContainerConfig; @@ -95,7 +94,7 @@ public RedisKeeperServer addOrStart(KeeperTransMeta keeperTransMeta) { return add(keeperTransMeta); } - start(ClusterId.from(keeperTransMeta.getClusterDbId()), ShardId.from(keeperTransMeta.getShardDbId())); + start(ReplId.from(keeperTransMeta.getReplId())); return keeperServer; } @@ -104,23 +103,21 @@ public List list() { return Lists.newArrayList(redisKeeperServers.values()); } - public void start(ClusterId clusterId, ShardId shardId) { - String keeperServerKey = assembleKeeperServerKey(clusterId, shardId); + public void start(ReplId replId) { + String keeperServerKey = replId.toString(); RedisKeeperServer keeperServer = redisKeeperServers.get(keeperServerKey); if (keeperServer == null) { throw new RedisKeeperRuntimeException( new ErrorMessage<>(KeeperContainerErrorCode.KEEPER_NOT_EXIST, - String.format("Start keeper for cluster %s shard %s failed since keeper doesn't exist", - clusterId, shardId)), null); + String.format("Start keeper for %s failed since keeper doesn't exist", replId)), null); } if (keeperServer.getLifecycleState().isStarted()) { throw new RedisKeeperRuntimeException( new ErrorMessage<>(KeeperContainerErrorCode.KEEPER_ALREADY_STARTED, - String.format("Keeper for cluster %s shard %s already started", - clusterId, shardId)), null); + String.format("Keeper for %s already started", replId)), null); } try { @@ -128,28 +125,25 @@ public void start(ClusterId clusterId, ShardId shardId) { } catch (Throwable ex) { throw new RedisKeeperRuntimeException( new ErrorMessage<>(KeeperContainerErrorCode.INTERNAL_EXCEPTION, - String.format("Start keeper failed for cluster %s shard %s", - clusterId, shardId)), ex); + String.format("Start keeper failed for %s", replId)), ex); } } - public void stop(ClusterId clusterId, ShardId shardId) { - String keeperServerKey = assembleKeeperServerKey(clusterId, shardId); + public void stop(ReplId replId) { + String keeperServerKey = replId.toString(); RedisKeeperServer keeperServer = redisKeeperServers.get(keeperServerKey); if (keeperServer == null) { throw new RedisKeeperRuntimeException( new ErrorMessage<>(KeeperContainerErrorCode.KEEPER_NOT_EXIST, - String.format("Stop keeper for cluster %s shard %s failed since keeper doesn't exist", - clusterId, shardId)), null); + String.format("Stop keeper for %s failed since keeper doesn't exist", replId)), null); } if (keeperServer.getLifecycleState().isStopped()) { throw new RedisKeeperRuntimeException( new ErrorMessage<>(KeeperContainerErrorCode.KEEPER_ALREADY_STOPPED, - String.format("Keeper for cluster %s shard %s already stopped", - clusterId, shardId)), null); + String.format("Keeper for %s already stopped", replId)), null); } try { @@ -157,13 +151,12 @@ public void stop(ClusterId clusterId, ShardId shardId) { } catch (Throwable ex) { throw new RedisKeeperRuntimeException( new ErrorMessage<>(KeeperContainerErrorCode.INTERNAL_EXCEPTION, - String.format("Stop keeper failed for cluster %s shard %s", - clusterId, shardId)), ex); + String.format("Stop keeper failed for %s", replId)), ex); } } - public void remove(ClusterId clusterId, ShardId shardId) { - String keeperServerKey = assembleKeeperServerKey(clusterId, shardId); + public void remove(ReplId replId) { + String keeperServerKey = replId.toString(); RedisKeeperServer keeperServer = redisKeeperServers.get(keeperServerKey); @@ -183,8 +176,7 @@ public void remove(ClusterId clusterId, ShardId shardId) { } catch (Throwable ex) { throw new RedisKeeperRuntimeException( new ErrorMessage<>(KeeperContainerErrorCode.INTERNAL_EXCEPTION, - String.format("Remove keeper failed for cluster %s shard %s", - clusterId, shardId)), ex); + String.format("Remove keeper failed for %s", replId)), ex); } } @@ -198,7 +190,7 @@ private void removeKeeperCache(String keeperServerKey) { private RedisKeeperServer doAdd(KeeperTransMeta keeperTransMeta, KeeperMeta keeperMeta) throws Exception { File baseDir = getReplicationStoreDir(keeperMeta); - return createRedisKeeperServer(keeperMeta, baseDir); + return createRedisKeeperServer(keeperTransMeta.getReplId(), keeperMeta, baseDir); } private void enrichKeeperMetaFromKeeperTransMeta(KeeperMeta keeperMeta, KeeperTransMeta keeperTransMeta) { @@ -210,10 +202,10 @@ private void enrichKeeperMetaFromKeeperTransMeta(KeeperMeta keeperMeta, KeeperTr keeperMeta.setParent(shardMeta); } - private RedisKeeperServer createRedisKeeperServer(KeeperMeta keeper, + private RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keeper, File baseDir) throws Exception { - RedisKeeperServer redisKeeperServer = new DefaultRedisKeeperServer(keeper, keeperConfig, + RedisKeeperServer redisKeeperServer = new DefaultRedisKeeperServer(replId, keeper, keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, redisOpParser); register(redisKeeperServer); @@ -235,10 +227,7 @@ private File getReplicationStoreDir(KeeperMeta keeperMeta) { } private String assembleKeeperServerKey(KeeperTransMeta keeperTransMeta) { - return assembleKeeperServerKey(ClusterId.from(keeperTransMeta.getClusterDbId()), ShardId.from(keeperTransMeta.getShardDbId())); + return ReplId.from(keeperTransMeta.getReplId()).toString(); } - private String assembleKeeperServerKey(ClusterId clusterId, ShardId shardId) { - return String.format("%s-%s", clusterId, shardId); - } } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/AbstractSyncCommandHandler.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/AbstractSyncCommandHandler.java index 3d72da2975..f04c3922cc 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/AbstractSyncCommandHandler.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/AbstractSyncCommandHandler.java @@ -80,7 +80,7 @@ protected void doFullSync(RedisSlave redisSlave) { RedisKeeperServer redisKeeperServer = (RedisKeeperServer)redisSlave.getRedisServer(); //alert full sync - String alert = String.format("FULL(M)<-%s[%s,%s]", redisSlave.metaInfo(), redisKeeperServer.getClusterId(), redisKeeperServer.getShardId()); + String alert = String.format("FULL(M)<-%s[%s]", redisSlave.metaInfo(), redisKeeperServer.getReplId()); EventMonitor.DEFAULT.logAlertEvent(alert); redisKeeperServer.fullSyncToSlave(redisSlave); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java index 1b236e78ab..356df0368a 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java @@ -291,7 +291,7 @@ protected void doRun() throws Exception { channel.close(); } } - }, replTimeoutMilli, replTimeoutMilli, TimeUnit.MILLISECONDS); + }, replTimeoutMilli, 10000, TimeUnit.MILLISECONDS); channel.closeFuture().addListener(new ChannelFutureListener() { @@ -339,7 +339,7 @@ protected void sendReplicationCommand() throws CommandExecutionException { if(canSendPsync()) { executeCommand(psyncCommand()); } else { - EventMonitor.DEFAULT.logAlertEvent("[lack-token]" + redisKeeperServer.getShardId()); + EventMonitor.DEFAULT.logAlertEvent("[lack-token]" + redisKeeperServer.getReplId()); doWhenCannotPsync(); } } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java index 10d749dddd..587cb22236 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java @@ -40,6 +40,7 @@ import com.ctrip.xpipe.redis.keeper.netty.NettyMasterHandler; import com.ctrip.xpipe.redis.keeper.store.DefaultFullSyncListener; import com.ctrip.xpipe.redis.keeper.store.DefaultReplicationStoreManager; +import com.ctrip.xpipe.redis.keeper.util.KeeperReplIdAwareThreadFactory; import com.ctrip.xpipe.utils.*; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; @@ -126,6 +127,7 @@ public class DefaultRedisKeeperServer extends AbstractRedisServer implements Red private final ClusterId clusterId; private final ShardId shardId; + private final ReplId replId; private final File baseDir; private volatile RedisKeeperServerState redisKeeperServerState; @@ -148,19 +150,20 @@ public class DefaultRedisKeeperServer extends AbstractRedisServer implements Red private RedisOpParser redisOpParser; - public DefaultRedisKeeperServer(KeeperMeta currentKeeperMeta, KeeperConfig keeperConfig, File baseDir, + public DefaultRedisKeeperServer(Long replId, KeeperMeta currentKeeperMeta, KeeperConfig keeperConfig, File baseDir, LeaderElectorManager leaderElectorManager, KeepersMonitorManager keepersMonitorManager, KeeperResourceManager resourceManager){ - this(currentKeeperMeta, keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, null); + this(replId, currentKeeperMeta, keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, null); } - public DefaultRedisKeeperServer(KeeperMeta currentKeeperMeta, KeeperConfig keeperConfig, File baseDir, + public DefaultRedisKeeperServer(Long replId, KeeperMeta currentKeeperMeta, KeeperConfig keeperConfig, File baseDir, LeaderElectorManager leaderElectorManager, KeepersMonitorManager keepersMonitorManager, KeeperResourceManager resourceManager, RedisOpParser redisOpParser){ this.clusterId = ClusterId.from(((ClusterMeta) currentKeeperMeta.parent().parent()).getDbId()); this.shardId = ShardId.from(currentKeeperMeta.parent().getDbId()); + this.replId = ReplId.from(replId); this.currentKeeperMeta = currentKeeperMeta; this.baseDir = baseDir; this.keeperConfig = keeperConfig; @@ -172,14 +175,15 @@ public DefaultRedisKeeperServer(KeeperMeta currentKeeperMeta, KeeperConfig keepe this.crossRegion = new AtomicBoolean(false); } - protected ReplicationStoreManager createReplicationStoreManager(KeeperConfig keeperConfig, ClusterId clusterId, ShardId shardId, + protected ReplicationStoreManager createReplicationStoreManager(KeeperConfig keeperConfig, ClusterId clusterId, ShardId shardId, ReplId replId, KeeperMeta currentKeeperMeta, File baseDir, KeeperMonitor keeperMonitor) { - return new DefaultReplicationStoreManager(keeperConfig, clusterId, shardId, currentKeeperMeta.getId(), baseDir, keeperMonitor, redisOpParser); + return new DefaultReplicationStoreManager.ClusterAndShardCompatible(keeperConfig, replId, currentKeeperMeta.getId(), + baseDir, keeperMonitor, redisOpParser).setDeprecatedClusterAndShard(clusterId, shardId); } private LeaderElector createLeaderElector(){ - String leaderElectionZKPath = MetaZkConfig.getKeeperLeaderLatchPath(clusterId, shardId); + String leaderElectionZKPath = MetaZkConfig.getKeeperLeaderLatchPath(replId); String leaderElectionID = MetaZkConfig.getKeeperLeaderElectionId(currentKeeperMeta); ElectContext ctx = new ElectContext(leaderElectionZKPath, leaderElectionID); return leaderElectorManager.createLeaderElector(ctx); @@ -189,20 +193,20 @@ private LeaderElector createLeaderElector(){ protected void doInitialize() throws Exception { super.doInitialize(); - replicationStoreManager = createReplicationStoreManager(keeperConfig, clusterId, shardId, + replicationStoreManager = createReplicationStoreManager(keeperConfig, clusterId, shardId, replId, currentKeeperMeta, baseDir, keeperMonitor); replicationStoreManager.addObserver(new ReplicationStoreManagerListener()); replicationStoreManager.initialize(); - threadPoolName = String.format("keeper:%s", StringUtil.makeSimpleName(clusterId.toString(), shardId.toString())); + threadPoolName = String.format("keeper:%s", replId); logger.info("[doInitialize][keeper config]{}", keeperConfig); - clientExecutors = Executors.newSingleThreadExecutor(ClusterShardAwareThreadFactory.create(clusterId, shardId, "RedisClient-" + threadPoolName)); - scheduled = Executors.newScheduledThreadPool(DEFAULT_SCHEDULED_CORE_POOL_SIZE , ClusterShardAwareThreadFactory.create(clusterId, shardId, "sch-" + threadPoolName)); - bossGroup = new NioEventLoopGroup(DEFAULT_BOSS_EVENT_LOOP_SIZE, ClusterShardAwareThreadFactory.create(clusterId, shardId, "boss-" + threadPoolName)); - workerGroup = new NioEventLoopGroup(DEFAULT_KEEPER_WORKER_GROUP_THREAD_COUNT, ClusterShardAwareThreadFactory.create(clusterId, shardId, "work-"+ threadPoolName)); - masterEventLoopGroup = new NioEventLoopGroup(DEFAULT_MASTER_EVENT_LOOP_SIZE, ClusterShardAwareThreadFactory.create(clusterId, shardId, "master-" + threadPoolName)); - rdbOnlyEventLoopGroup = new NioEventLoopGroup(DEFAULT_RDB_EVENT_LOOP_SIZE, ClusterShardAwareThreadFactory.create(clusterId, shardId, "rdbOnly-" + threadPoolName)); + clientExecutors = Executors.newSingleThreadExecutor(KeeperReplIdAwareThreadFactory.create(replId, "RedisClient-" + threadPoolName)); + scheduled = Executors.newScheduledThreadPool(DEFAULT_SCHEDULED_CORE_POOL_SIZE , KeeperReplIdAwareThreadFactory.create(replId, "sch-" + threadPoolName)); + bossGroup = new NioEventLoopGroup(DEFAULT_BOSS_EVENT_LOOP_SIZE, KeeperReplIdAwareThreadFactory.create(replId, "boss-" + threadPoolName)); + workerGroup = new NioEventLoopGroup(DEFAULT_KEEPER_WORKER_GROUP_THREAD_COUNT, KeeperReplIdAwareThreadFactory.create(replId, "work-"+ threadPoolName)); + masterEventLoopGroup = new NioEventLoopGroup(DEFAULT_MASTER_EVENT_LOOP_SIZE, KeeperReplIdAwareThreadFactory.create(replId, "master-" + threadPoolName)); + rdbOnlyEventLoopGroup = new NioEventLoopGroup(DEFAULT_RDB_EVENT_LOOP_SIZE, KeeperReplIdAwareThreadFactory.create(replId, "rdbOnly-" + threadPoolName)); this.resetReplAfterLongTimeDown(); @@ -504,6 +508,11 @@ public String toString() { return String.format("%s(%s:%d)", getClass().getSimpleName(), currentKeeperMeta.getIp(), currentKeeperMeta.getPort()); } + @Override + public ReplId getReplId() { + return replId; + } + @Override public String getKeeperRunid() { @@ -595,7 +604,7 @@ public void reFullSync() { @Override public void onFullSync(long masterRdbOffset) { //alert full sync - String alert = String.format("FULL(S)->%s[%s,%s]", getRedisMaster().metaInfo(), getClusterId(), getShardId()); + String alert = String.format("FULL(S)->%s[%s]", getRedisMaster().metaInfo(), getReplId()); EventMonitor.DEFAULT.logAlertEvent(alert); } @@ -604,11 +613,11 @@ public void onFullSync(long masterRdbOffset) { public void readRdbGtidSet(RdbStore rdbStore, String gtidSet) { try { if (isStartIndexing) { - EventMonitor.DEFAULT.logEvent("INDEX.START", clusterId + "." + shardId + " - " + gtidSet); + EventMonitor.DEFAULT.logEvent("INDEX.START", replId + " - " + gtidSet); startIndexing(); } } catch (Throwable t) { - EventMonitor.DEFAULT.logAlertEvent("INDEX.START.FAIL: " + clusterId + "." + shardId + " - " + gtidSet); + EventMonitor.DEFAULT.logAlertEvent("INDEX.START.FAIL: " + replId + " - " + gtidSet); } } @@ -638,21 +647,11 @@ public void onKeeperContinue(String replId, long beginOffset) { // do nothing } - @Override - public ClusterId getClusterId() { - return this.clusterId; - } - - @Override - public ShardId getShardId() { - return this.shardId; - } - @Override public synchronized void setRedisKeeperServerState(final RedisKeeperServerState redisKeeperServerState){ TransactionMonitor transactionMonitor = TransactionMonitor.DEFAULT; - String name = String.format("%s,%s,%s", clusterId, shardId, redisKeeperServerState); + String name = String.format("%s,%s", replId, redisKeeperServerState); transactionMonitor.logTransactionSwallowException("setRedisKeeperServerState", name, new Task() { @Override @@ -789,7 +788,7 @@ public synchronized void startIndexing() throws IOException { logger.info("[startIndexing]{}, {}", this, rdbDumper.get()); if (indexingExecutors == null) { - indexingExecutors = Executors.newSingleThreadExecutor(ClusterShardAwareThreadFactory.create(clusterId, shardId, "Indexing-" + threadPoolName)); + indexingExecutors = Executors.newSingleThreadExecutor(KeeperReplIdAwareThreadFactory.create(replId, "Indexing-" + threadPoolName)); } isStartIndexing = true; @@ -830,7 +829,7 @@ public void setRdbDumper(RdbDumper rdbDumper) throws SetRdbDumperException { @Override public KeeperInstanceMeta getKeeperInstanceMeta() { - return new KeeperInstanceMeta(clusterId, shardId, currentKeeperMeta); + return new KeeperInstanceMeta(replId, currentKeeperMeta); } public KeeperConfig getKeeperConfig() { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java index e9470067a6..5548d430d2 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java @@ -11,13 +11,13 @@ import com.ctrip.xpipe.redis.core.protocal.protocal.EofType; import com.ctrip.xpipe.redis.core.protocal.protocal.SimpleStringParser; import com.ctrip.xpipe.redis.core.redis.operation.RedisOp; -import com.ctrip.xpipe.redis.core.redis.operation.RedisOpType; import com.ctrip.xpipe.redis.core.store.*; import com.ctrip.xpipe.redis.keeper.RedisClient; import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; import com.ctrip.xpipe.redis.keeper.RedisSlave; import com.ctrip.xpipe.redis.keeper.SLAVE_STATE; import com.ctrip.xpipe.redis.keeper.exception.RedisKeeperRuntimeException; +import com.ctrip.xpipe.redis.keeper.util.KeeperReplIdAwareThreadFactory; import com.ctrip.xpipe.utils.*; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; @@ -104,10 +104,9 @@ public DefaultRedisSlave(RedisClient redisClient){ private void initExecutor(Channel channel) { String threadPrefix = buildThreadPrefix(channel); - ClusterId clusterId = redisClient.getRedisServer().getClusterId(); - ShardId shardId = redisClient.getRedisServer().getShardId(); - psyncExecutor = Executors.newSingleThreadExecutor(ClusterShardAwareThreadFactory.create(clusterId, shardId, threadPrefix)); - scheduled = Executors.newScheduledThreadPool(1, ClusterShardAwareThreadFactory.create(clusterId, shardId, threadPrefix)); + ReplId replId = redisClient.getRedisServer().getReplId(); + psyncExecutor = Executors.newSingleThreadExecutor(KeeperReplIdAwareThreadFactory.create(replId, threadPrefix)); + scheduled = Executors.newScheduledThreadPool(1, KeeperReplIdAwareThreadFactory.create(replId, threadPrefix)); } protected String buildThreadPrefix(Channel channel) { @@ -391,18 +390,6 @@ public ChannelFuture onCommand(CommandFile currentFile, long filePosition, Objec @VisibleForTesting protected boolean shouldFilter(RedisOp redisOp) { - if (RedisOpType.PUBLISH.equals(redisOp.getOpType())) { - int length = redisOp.buildRawOpArgs().length; - if (length < 5) { - logger.warn("publish command length={} < 5, filtered", length); - return true; - } - String channel = new String(redisOp.buildRawOpArgs()[4]); - if (!channel.startsWith("xpipe-hetero-")) { - logger.debug("publish channel: [{}] filtered", channel); - return true; - } - } return false; } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/XsyncRedisSlave.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/XsyncRedisSlave.java index f09c0ee906..714190e2a9 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/XsyncRedisSlave.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/XsyncRedisSlave.java @@ -1,6 +1,8 @@ package com.ctrip.xpipe.redis.keeper.impl; import com.ctrip.xpipe.redis.core.protocal.cmd.DefaultXsync; +import com.ctrip.xpipe.redis.core.redis.operation.RedisOp; +import com.ctrip.xpipe.redis.core.redis.operation.RedisOpType; import com.ctrip.xpipe.redis.core.store.GtidSetReplicationProgress; import com.ctrip.xpipe.redis.core.store.ReplicationProgress; import com.ctrip.xpipe.redis.keeper.RedisClient; @@ -31,6 +33,23 @@ protected String buildThreadPrefix(Channel channel) { return "RedisClientXsync-" + getRemoteIpLocalPort; } + @Override + protected boolean shouldFilter(RedisOp redisOp) { + if (RedisOpType.PUBLISH.equals(redisOp.getOpType())) { + int length = redisOp.buildRawOpArgs().length; + if (length < 5) { + logger.warn("publish command length={} < 5, filtered", length); + return true; + } + String channel = new String(redisOp.buildRawOpArgs()[4]); + if (!channel.startsWith("xpipe-asymmetric-")) { + logger.debug("publish channel: [{}] filtered", channel); + return true; + } + } + return false; + } + @Override public boolean supportProgress(Class> clazz) { return clazz.equals(GtidSetReplicationProgress.class); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultKeeperMonitor.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultKeeperMonitor.java index 07ca0e4185..5189d3a0a9 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultKeeperMonitor.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultKeeperMonitor.java @@ -24,7 +24,7 @@ public class DefaultKeeperMonitor extends AbstractStartStoppable implements Keep public DefaultKeeperMonitor(RedisKeeperServer redisKeeperServer, ScheduledExecutorService scheduled) { this.redisKeeperServer = redisKeeperServer; - this.keeperStats = new DefaultKeeperStats(redisKeeperServer.getShardId().toString(), scheduled); + this.keeperStats = new DefaultKeeperStats(redisKeeperServer.getReplId().toString(), scheduled); } @Override diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultKeeperStats.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultKeeperStats.java index 340e42c062..0ae2bc8123 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultKeeperStats.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultKeeperStats.java @@ -19,7 +19,7 @@ */ public class DefaultKeeperStats extends AbstractStartStoppable implements KeeperStats { - private String shard; + private String replId; private AtomicLong fullSyncCount = new AtomicLong(); @@ -51,8 +51,8 @@ public class DefaultKeeperStats extends AbstractStartStoppable implements Keeper private AtomicLong peakOutputInstantaneousOutput = new AtomicLong(); - public DefaultKeeperStats(String shard, ScheduledExecutorService scheduled) { - this.shard = shard; + public DefaultKeeperStats(String replId, ScheduledExecutorService scheduled) { + this.replId = replId; this.scheduled = scheduled; } @@ -195,10 +195,10 @@ private void updatePeakStats() { } private void logStats() { - logger.debug("[{}][input]{}", shard, getInputInstantaneousBPS()); - logger.debug("[{}][output]{}", shard, getOutputInstantaneousBPS()); - logger.debug("[{}][peak-in]{}", shard, getPeakInputInstantaneousBPS()); - logger.debug("[{}][peak-out]{}", shard, getPeakOutputInstantaneousBPS()); + logger.debug("[{}][input]{}", replId, getInputInstantaneousBPS()); + logger.debug("[{}][output]{}", replId, getOutputInstantaneousBPS()); + logger.debug("[{}][peak-in]{}", replId, getPeakInputInstantaneousBPS()); + logger.debug("[{}][peak-out]{}", replId, getPeakOutputInstantaneousBPS()); } @Override diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/netty/NettyMasterHandler.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/netty/NettyMasterHandler.java index 82f9a0b41b..f893262795 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/netty/NettyMasterHandler.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/netty/NettyMasterHandler.java @@ -94,13 +94,13 @@ public void update(Object args, Observable observable) { @Override protected void doReportTraffic(long readBytes, long writtenBytes, String remoteIp, int remotePort) { if (writtenBytes > 0) { - String type = String.format("Keeper.Out.%s", redisKeeperServer.getClusterId()); + String type = String.format("Keeper.Out.%s", redisKeeperServer.getReplId()); String name = null; if(redisClient instanceof RedisSlave){ RedisSlave slave = (RedisSlave)redisClient; - name = String.format("slave.%s.%s.%s:%s", slave.roleDesc(), redisKeeperServer.getShardId(), remoteIp, slave.getSlaveListeningPort()); + name = String.format("slave.%s.%s.%s:%s", slave.roleDesc(), redisKeeperServer.getReplId(), remoteIp, slave.getSlaveListeningPort()); }else{ - name = String.format("client.%s.%s", redisKeeperServer.getShardId(), remoteIp); + name = String.format("client.%s.%s", redisKeeperServer.getReplId(), remoteIp); } EventMonitor.DEFAULT.logEvent(type, name, writtenBytes); } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/netty/NettySlaveHandler.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/netty/NettySlaveHandler.java index 6aac545d77..aeda36ff48 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/netty/NettySlaveHandler.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/netty/NettySlaveHandler.java @@ -70,8 +70,8 @@ public void read(Channel channel, ByteBuf byteBuf) throws ByteBufReadActionExcep @Override protected void doReportTraffic(long readBytes, long writtenBytes, String remoteIp, int remotePort) { if (readBytes > 0) { - String type = String.format("Keeper.In.%s", redisKeeperServer.getClusterId()); - String name = String.format("%s-%s-%s:%s", redisMasterReplication.redisMaster().roleDesc(), redisKeeperServer.getShardId(), remoteIp, remotePort); + String type = String.format("Keeper.In.%s", redisKeeperServer.getReplId()); + String name = String.format("%s-%s-%s:%s", redisMasterReplication.redisMaster().roleDesc(), redisKeeperServer.getReplId(), remoteIp, remotePort); EventMonitor.DEFAULT.logEvent(type, name, readBytes); } } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStoreManager.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStoreManager.java index d5ff116390..b4ce1d01e6 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStoreManager.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStoreManager.java @@ -4,16 +4,14 @@ import com.ctrip.xpipe.observer.AbstractLifecycleObservable; import com.ctrip.xpipe.observer.NodeAdded; import com.ctrip.xpipe.redis.core.redis.operation.RedisOpParser; -import com.ctrip.xpipe.redis.core.store.ClusterId; -import com.ctrip.xpipe.redis.core.store.ReplicationStore; -import com.ctrip.xpipe.redis.core.store.ReplicationStoreManager; -import com.ctrip.xpipe.redis.core.store.ShardId; +import com.ctrip.xpipe.redis.core.store.*; import com.ctrip.xpipe.redis.core.util.NonFinalizeFileInputStream; import com.ctrip.xpipe.redis.core.util.NonFinalizeFileOutputStream; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; import com.ctrip.xpipe.redis.keeper.monitor.KeeperMonitor; -import com.ctrip.xpipe.utils.ClusterShardAwareThreadFactory; +import com.ctrip.xpipe.redis.keeper.util.KeeperReplIdAwareThreadFactory; import com.ctrip.xpipe.utils.FileUtils; +import com.google.common.io.Files; import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,9 +40,7 @@ public class DefaultReplicationStoreManager extends AbstractLifecycleObservable private final Logger logger = LoggerFactory.getLogger(getClass()); - private final ClusterId clusterId; - - private final ShardId shardId; + private final ReplId replId; private final String keeperRunid; @@ -70,16 +66,15 @@ public class DefaultReplicationStoreManager extends AbstractLifecycleObservable private final RedisOpParser redisOpParser; - public DefaultReplicationStoreManager(KeeperConfig keeperConfig, ClusterId clusterId, ShardId shardId, + public DefaultReplicationStoreManager(KeeperConfig keeperConfig, ReplId replId, String keeperRunid, File baseDir, KeeperMonitor keeperMonitor) { - this(keeperConfig, clusterId, shardId, keeperRunid, baseDir, keeperMonitor, null); + this(keeperConfig, replId, keeperRunid, baseDir, keeperMonitor, null); } - public DefaultReplicationStoreManager(KeeperConfig keeperConfig, ClusterId clusterId, ShardId shardId, + public DefaultReplicationStoreManager(KeeperConfig keeperConfig, ReplId replId, String keeperRunid, File baseDir, KeeperMonitor keeperMonitor, RedisOpParser redisOpParser) { super(MoreExecutors.directExecutor()); - this.clusterId = clusterId; - this.shardId = shardId; + this.replId = replId; this.keeperRunid = keeperRunid; this.keeperConfig = keeperConfig; this.keeperMonitor = keeperMonitor; @@ -90,11 +85,11 @@ public DefaultReplicationStoreManager(KeeperConfig keeperConfig, ClusterId clust @Override protected void doInitialize() throws Exception { - this.baseDir = new File(keeperBaseDir, clusterId + "/" + shardId); + this.baseDir = new File(keeperBaseDir, replId.toString()); this.metaFile = new File(this.baseDir, META_FILE); scheduled = Executors.newScheduledThreadPool(1, - ClusterShardAwareThreadFactory.create(clusterId.toString(), shardId.toString(), "gc-" + String.format("%s-%s", clusterId, shardId))); + KeeperReplIdAwareThreadFactory.create(replId.toString(), "gc-" + replId.toString())); gcFuture = scheduled.scheduleWithFixedDelay(new AbstractExceptionLogTask() { @@ -254,13 +249,8 @@ public synchronized ReplicationStore getCurrent() throws IOException { } @Override - public ClusterId getClusterId() { - return clusterId; - } - - @Override - public ShardId getShardId() { - return shardId; + public ReplId getReplId() { + return replId; } protected synchronized void gc() throws IOException { @@ -323,7 +313,7 @@ public long getGcCount() { @Override public String toString() { - return String.format("cluster:%s, shard:%s, keeperRunId:%s, baseDir:%s, currentMeta:%s", clusterId, shardId, keeperRunid, baseDir, + return String.format("repl:%s, keeperRunId:%s, baseDir:%s, currentMeta:%s", replId, keeperRunid, baseDir, currentMeta.get() == null ? "" : currentMeta.get().toString()); } @@ -331,4 +321,59 @@ public File getBaseDir() { return baseDir; } + public static class ClusterAndShardCompatible extends DefaultReplicationStoreManager { + + private final Logger logger = LoggerFactory.getLogger(ClusterAndShardCompatible.class); + + private final File keeperBaseDir; + + private ReplId replId; + + private ClusterId deprecatedClusterId; + + private ShardId deprecatedShardId; + + public ClusterAndShardCompatible(KeeperConfig keeperConfig, ReplId replId, String keeperRunid, + File baseDir, KeeperMonitor keeperMonitor, RedisOpParser redisOpParser) { + super(keeperConfig, replId, keeperRunid, baseDir, keeperMonitor, redisOpParser); + this.keeperBaseDir = baseDir; + this.replId = replId; + } + + @Override + protected void doInitialize() throws Exception { + + renameDeprecatedStore(); + + super.doInitialize(); + } + + public ClusterAndShardCompatible setDeprecatedClusterAndShard(ClusterId clusterId, ShardId shardId) { + this.deprecatedClusterId = clusterId; + this.deprecatedShardId = shardId; + return this; + } + + public void renameDeprecatedStore() { + if (null == deprecatedClusterId || null == deprecatedShardId) { + return; + } + + File deprecated = new File(keeperBaseDir, deprecatedClusterId + "/" + deprecatedShardId); + File dest = new File(keeperBaseDir, replId.toString()); + + File deprecatedParent = new File(keeperBaseDir, deprecatedClusterId.toString()); + if (deprecated.exists() && !dest.exists()) { + try { + keeperBaseDir.mkdirs(); + Files.move(deprecated, dest); + logger.info("[renameDeprecatedStore] {} -> {} success", deprecated.getAbsolutePath(), dest.getAbsolutePath()); + FileUtils.recursiveDelete(deprecatedParent); + } catch (IOException e) { + logger.error("[renameDeprecatedStore] {} -> {} failure", deprecated.getAbsolutePath(), dest.getAbsolutePath(), e); + } + } + } + } + } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/util/KeeperReplIdAwareThreadFactory.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/util/KeeperReplIdAwareThreadFactory.java new file mode 100644 index 0000000000..9dc560affc --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/util/KeeperReplIdAwareThreadFactory.java @@ -0,0 +1,46 @@ +package com.ctrip.xpipe.redis.keeper.util; + +import com.ctrip.xpipe.utils.XpipeThreadFactory; +import com.ctrip.xpipe.utils.log.MDCUtil; + +import java.util.concurrent.ThreadFactory; + +/** + * @author lishanglin + * date 2023/10/31 + */ +public class KeeperReplIdAwareThreadFactory extends XpipeThreadFactory { + + private String replId; + + public static ThreadFactory create(String replId, String namePrefix, boolean daemon) { + return new KeeperReplIdAwareThreadFactory(replId, namePrefix, daemon); + } + + public static ThreadFactory create(String replId, String namePrefix) { + return create(replId, namePrefix, false); + } + + public static ThreadFactory create(Object replId, String namePrefix) { + String replIdString = null; + if (replId != null) { + replIdString = replId.toString(); + } + return create(replIdString, namePrefix, false); + } + + private KeeperReplIdAwareThreadFactory(String replId, String namePrefix, boolean daemon) { + super(namePrefix, daemon); + this.replId = replId; + } + + private KeeperReplIdAwareThreadFactory(String replId, String namePrefix) { + this(replId, namePrefix, false); + } + + @Override + public Thread newThread(Runnable r) { + return super.newThread(MDCUtil.decorateKeeperReplMDC(r, replId)); + } + +} diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java index a3736b8a44..f11f0325b3 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java @@ -108,8 +108,8 @@ protected RedisKeeperServer startRedisKeeperServer(int replicationStoreCommandFi return redisKeeperServer; } - protected RedisKeeperServer startRedisKeeperServer(KeeperConfig keeperConfig, KeeperMeta keeperMeta) throws Exception { - RedisKeeperServer redisKeeperServer = createRedisKeeperServer(keeperMeta, keeperConfig, getReplicationStoreManagerBaseDir(keeperMeta)); + protected RedisKeeperServer startRedisKeeperServer(Long replId, KeeperConfig keeperConfig, KeeperMeta keeperMeta) throws Exception { + RedisKeeperServer redisKeeperServer = createRedisKeeperServer(replId, keeperMeta, keeperConfig, getReplicationStoreManagerBaseDir(keeperMeta)); redisKeeperServer.initialize(); redisKeeperServer.start(); add(redisKeeperServer); diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java index 4db6479071..ee6d98207c 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java @@ -9,6 +9,7 @@ import com.ctrip.xpipe.redis.core.redis.operation.RedisOpParserManager; import com.ctrip.xpipe.redis.core.redis.operation.parser.DefaultRedisOpParserManager; import com.ctrip.xpipe.redis.core.redis.operation.parser.GeneralRedisOpParser; +import com.ctrip.xpipe.redis.core.store.ReplId; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; import com.ctrip.xpipe.redis.keeper.impl.DefaultRedisKeeperServer; @@ -91,21 +92,22 @@ protected String getKeeperConfigFile() { protected RedisKeeperServer createRedisKeeperServer(KeeperConfig keeperConfig) throws Exception { KeeperMeta keeperMeta = createKeeperMeta(); - return createRedisKeeperServer(keeperMeta, keeperConfig, getReplicationStoreManagerBaseDir(keeperMeta)); + ReplId replId = getReplId(); + return createRedisKeeperServer(replId.id(), keeperMeta, keeperConfig, getReplicationStoreManagerBaseDir(keeperMeta)); } protected RedisKeeperServer createRedisKeeperServer() throws Exception { - return createRedisKeeperServer(createKeeperMeta()); + return createRedisKeeperServer(getReplId().id(), createKeeperMeta()); } - protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeper) throws Exception { - return createRedisKeeperServer(keeper, getReplicationStoreManagerBaseDir(keeper)); + protected RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keeper) throws Exception { + return createRedisKeeperServer(replId, keeper, getReplicationStoreManagerBaseDir(keeper)); } - protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeper, File baseDir) throws Exception { + protected RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keeper, File baseDir) throws Exception { - return createRedisKeeperServer(keeper, getKeeperConfig(), baseDir); + return createRedisKeeperServer(replId, keeper, getKeeperConfig(), baseDir); } @@ -113,14 +115,14 @@ protected KeeperConfig getKeeperConfig() { return keeperConfig; } - protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeper, KeeperConfig keeperConfig, File baseDir) throws Exception { + protected RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keeper, KeeperConfig keeperConfig, File baseDir) throws Exception { - return createRedisKeeperServer(keeper, keeperConfig, baseDir, getRegistry().getComponent(LeaderElectorManager.class)); + return createRedisKeeperServer(replId, keeper, keeperConfig, baseDir, getRegistry().getComponent(LeaderElectorManager.class)); } - protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeper, KeeperConfig keeperConfig, + protected RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keeper, KeeperConfig keeperConfig, File baseDir, LeaderElectorManager leaderElectorManager) { - return new DefaultRedisKeeperServer(keeper, keeperConfig, baseDir, leaderElectorManager, + return new DefaultRedisKeeperServer(replId, keeper, keeperConfig, baseDir, leaderElectorManager, createkeepersMonitorManager(), getResourceManager(), createRedisOpParser()); } diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperTest.java index b0f26fff17..1003e091d4 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperTest.java @@ -49,6 +49,10 @@ protected ClusterId getClusterId(Long id) { return new ClusterId(currentTestName() + "-", id); } + protected ReplId getReplId() { + return new ReplId(currentTestName() + "-", 0L); + } + protected ShardId getShardId() { return new ShardId(currentTestName() + "-", 0L); } @@ -61,14 +65,14 @@ protected ReplicationStoreManager createReplicationStoreManager(String keeperRun String tmpDir = getTestFileDir(); - return createReplicationStoreManager(getClusterId(), getShardId(), keeperRunid, keeperConfig, new File(tmpDir)); + return createReplicationStoreManager(getReplId(), keeperRunid, keeperConfig, new File(tmpDir)); } protected ReplicationStoreManager createReplicationStoreManager(KeeperConfig keeperConfig) { String tmpDir = getTestFileDir(); - return createReplicationStoreManager(getClusterId(), getShardId(), keeperConfig, new File(tmpDir)); + return createReplicationStoreManager(getReplId(), keeperConfig, new File(tmpDir)); } @@ -76,31 +80,31 @@ protected ReplicationStoreManager createReplicationStoreManager() { String tmpDir = getTestFileDir(); - return createReplicationStoreManager(getClusterId(), getShardId(), new File(tmpDir)); + return createReplicationStoreManager(getReplId(), new File(tmpDir)); } - protected ReplicationStoreManager createReplicationStoreManager(ClusterId clusterId, ShardId shardId, String keeperRunid, File storeDir) { + protected ReplicationStoreManager createReplicationStoreManager(ReplId replId, String keeperRunid, File storeDir) { - return createReplicationStoreManager(clusterId, shardId, keeperRunid, getKeeperConfig(), storeDir); + return createReplicationStoreManager(replId, keeperRunid, getKeeperConfig(), storeDir); } - protected ReplicationStoreManager createReplicationStoreManager(ClusterId clusterId, ShardId shardId, File storeDir) { + protected ReplicationStoreManager createReplicationStoreManager(ReplId replId, File storeDir) { - return createReplicationStoreManager(clusterId, shardId, getKeeperConfig(), storeDir); + return createReplicationStoreManager(replId, getKeeperConfig(), storeDir); } protected KeeperConfig getKeeperConfig() { return new TestKeeperConfig(); } - protected ReplicationStoreManager createReplicationStoreManager(ClusterId clusterId, ShardId shardId, KeeperConfig keeperConfig, File storeDir) { + protected ReplicationStoreManager createReplicationStoreManager(ReplId replId, KeeperConfig keeperConfig, File storeDir) { - return createReplicationStoreManager(clusterId, shardId, randomKeeperRunid(), keeperConfig, storeDir); + return createReplicationStoreManager(replId, randomKeeperRunid(), keeperConfig, storeDir); } - protected ReplicationStoreManager createReplicationStoreManager(ClusterId clusterId, ShardId shardId, String keeperRunid, KeeperConfig keeperConfig, File storeDir) { + protected ReplicationStoreManager createReplicationStoreManager(ReplId replId, String keeperRunid, KeeperConfig keeperConfig, File storeDir) { - DefaultReplicationStoreManager replicationStoreManager = new DefaultReplicationStoreManager(keeperConfig, clusterId, shardId, keeperRunid, storeDir, createkeeperMonitor()); + DefaultReplicationStoreManager replicationStoreManager = new DefaultReplicationStoreManager(keeperConfig, replId, keeperRunid, storeDir, createkeeperMonitor()); replicationStoreManager.addObserver(new Observer() { diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerServiceTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerServiceTest.java index 4204cd3257..323cd6d3ff 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerServiceTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerServiceTest.java @@ -7,6 +7,7 @@ import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; import com.ctrip.xpipe.redis.core.entity.Shard; import com.ctrip.xpipe.redis.core.store.ClusterId; +import com.ctrip.xpipe.redis.core.store.ReplId; import com.ctrip.xpipe.redis.core.store.ShardId; import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; import com.ctrip.xpipe.redis.keeper.config.DefaultKeeperConfig; @@ -46,6 +47,7 @@ public class KeeperContainerServiceTest extends AbstractTest { private KeeperContainerService keeperContainerService; private ClusterId someClusterId; private ShardId someShardId; + private ReplId someReplId; private int somePort; private KeeperTransMeta someKeeperTransMeta; private KeeperMeta someKeeperMeta; @@ -63,13 +65,13 @@ public void setUp() throws Exception { someClusterId = ClusterId.from(randomLong()); someShardId = ShardId.from(randomLong()); + someReplId = ReplId.from(randomLong()); somePort = 6789; someKeeperMeta = new KeeperMeta(); someKeeperMeta.setPort(somePort); someKeeperTransMeta = new KeeperTransMeta(); - someKeeperTransMeta.setClusterDbId(someClusterId.id()); - someKeeperTransMeta.setShardDbId(someShardId.id()); + someKeeperTransMeta.setReplId(someReplId.id()); someKeeperTransMeta.setKeeperMeta(someKeeperMeta); when(keeperContainerConfig.getReplicationStoreDir()).thenReturn(System.getProperty("user.dir")); @@ -84,8 +86,7 @@ public void testAddKeeper() throws Exception { RedisKeeperServer redisKeeperServer = keeperContainerService.add(someKeeperTransMeta); verify(componentRegistry, times(1)).add(redisKeeperServer); - assertEquals(someClusterId, redisKeeperServer.getClusterId()); - assertEquals(someShardId, redisKeeperServer.getShardId()); + assertEquals(someReplId, redisKeeperServer.getReplId()); assertEquals(somePort, redisKeeperServer.getListeningPort()); } diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandlerTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandlerTest.java index 5199668af7..e3c649f3de 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandlerTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandlerTest.java @@ -57,7 +57,7 @@ public void beforePsyncHandlerTest(){ when(redisSlave.getRedisServer()).thenReturn(redisKeeperServer); when(redisKeeperServer.getKeeperRepl()).thenReturn(keeperRepl); - when(redisKeeperServer.getShardId()).thenReturn(getShardId()); + when(redisKeeperServer.getReplId()).thenReturn(getReplId()); DefaultKeeperMonitor monitor = new DefaultKeeperMonitor(redisKeeperServer, scheduled); when(redisKeeperServer.getKeeperMonitor()).thenReturn(monitor); when(redisKeeperServer.getKeeperConfig()).thenReturn(KeeperConfig); diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/hetero/manual/KeeperServerTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/hetero/manual/KeeperServerTest.java index 8db49c818b..2607056f68 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/hetero/manual/KeeperServerTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/hetero/manual/KeeperServerTest.java @@ -26,7 +26,7 @@ public class KeeperServerTest extends AbstractRedisKeeperContextTest { @Test public void manual() throws Exception { - RedisKeeperServer redisKeeperServer = createRedisKeeperServer(createKeeperMeta(6000, "0")); + RedisKeeperServer redisKeeperServer = createRedisKeeperServer(getReplId().id(), createKeeperMeta(6000, "0")); redisKeeperServer.initialize(); redisKeeperServer.start(); diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServerTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServerTest.java index 74422ee83d..6207f15617 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServerTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServerTest.java @@ -6,6 +6,7 @@ import com.ctrip.xpipe.redis.core.meta.KeeperState; import com.ctrip.xpipe.redis.core.proxy.protocols.DefaultProxyConnectProtocol; import com.ctrip.xpipe.redis.core.server.FakeRedisServer; +import com.ctrip.xpipe.redis.core.store.ReplId; import com.ctrip.xpipe.redis.keeper.*; import com.ctrip.xpipe.redis.keeper.config.TestKeeperConfig; import io.netty.channel.Channel; @@ -221,8 +222,9 @@ public void testKeeperStopNoConnectMaster() throws Exception { public void testKeeperServerInitState() throws Exception { KeeperMeta keeperMeta = createKeeperMeta(); + ReplId replId = getReplId(); - RedisKeeperServer redisKeeperServer = createRedisKeeperServer(keeperMeta); + RedisKeeperServer redisKeeperServer = createRedisKeeperServer(replId.id(), keeperMeta); redisKeeperServer.initialize(); Assert.assertEquals(KeeperState.UNKNOWN, redisKeeperServer.getRedisKeeperServerState().keeperState()); @@ -232,7 +234,7 @@ public void testKeeperServerInitState() throws Exception { redisKeeperServer.dispose(); - redisKeeperServer = createRedisKeeperServer(keeperMeta); + redisKeeperServer = createRedisKeeperServer(replId.id(), keeperMeta); redisKeeperServer.initialize(); Assert.assertEquals(KeeperState.PRE_ACTIVE, redisKeeperServer.getRedisKeeperServerState().keeperState()); @@ -241,7 +243,7 @@ public void testKeeperServerInitState() throws Exception { redisKeeperServer.dispose(); - redisKeeperServer = createRedisKeeperServer(keeperMeta); + redisKeeperServer = createRedisKeeperServer(replId.id(), keeperMeta); redisKeeperServer.initialize(); Assert.assertEquals(KeeperState.PRE_BACKUP, redisKeeperServer.getRedisKeeperServerState().keeperState()); } diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java index 327c2dd9ac..508c8e0b97 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java @@ -55,7 +55,7 @@ public void afterGtidRedisKeeperServerTest() throws Exception { public void testPsyncAndXsync() throws Exception { KeeperConfig keeperConfig = newTestKeeperConfig(); KeeperMeta keeperMeta = createKeeperMeta(); - DefaultRedisKeeperServer keeperServer = new DefaultRedisKeeperServer(keeperMeta, keeperConfig, + DefaultRedisKeeperServer keeperServer = new DefaultRedisKeeperServer(getReplId().id(), keeperMeta, keeperConfig, getReplicationStoreManagerBaseDir(keeperMeta), getRegistry().getComponent(LeaderElectorManager.class), createkeepersMonitorManager(), getResourceManager(), parser); keeperServer.initialize(); diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/FakeRedisExceptionTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/FakeRedisExceptionTest.java index 3ed5d1386f..f1c8fda1a1 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/FakeRedisExceptionTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/FakeRedisExceptionTest.java @@ -51,10 +51,10 @@ public void testRdbFileError() throws Exception{ } - protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeper, KeeperConfig keeperConfig, + protected RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keeper, KeeperConfig keeperConfig, File baseDir, LeaderElectorManager leaderElectorManager) { - return new DefaultRedisKeeperServer(keeper, keeperConfig, baseDir, leaderElectorManager, + return new DefaultRedisKeeperServer(replId, keeper, keeperConfig, baseDir, leaderElectorManager, createkeepersMonitorManager(), getRegistry().getComponent(KeeperResourceManager.class)){ @Override diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/PsyncForKeeperTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/PsyncForKeeperTest.java index 9b26d5c51a..62078d6c79 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/PsyncForKeeperTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/PsyncForKeeperTest.java @@ -3,6 +3,7 @@ import com.ctrip.xpipe.endpoint.DefaultEndPoint; import com.ctrip.xpipe.lifecycle.LifecycleHelper; import com.ctrip.xpipe.redis.core.entity.KeeperMeta; +import com.ctrip.xpipe.redis.core.store.ReplId; import com.ctrip.xpipe.redis.keeper.AbstractFakeRedisTest; import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; @@ -117,7 +118,6 @@ public void testBackupKeeperTerminateRefullsyncAndPartialLater() throws Exceptio long originFsyncCnt = keeperStats.getFullSyncCount(); // upstream cmd not continue and refullsync - logger.info("[lsl] {}", keeperServer1.getListeningPort()); keeperServer2.getRedisKeeperServerState().becomeBackup(new DefaultEndPoint("127.0.0.1", keeperServer1.getListeningPort())); waitKeeperSyncWithRedis(keeperServer2); @@ -135,9 +135,10 @@ private RedisKeeperServer restartKeeperServer(RedisKeeperServer keeperServer, lo KeeperConfig keeperConfig = newTestKeeperConfig(); ((TestKeeperConfig)keeperConfig).setReplKeepSecondsAfterDown(replKeepSecondsAfterDown); KeeperMeta keeperMeta = createKeeperMeta(keeperPort, keeperRunId); + ReplId replId = getReplId(); TimeUnit.SECONDS.sleep(waitSecondsAfterDown); - return startRedisKeeperServer(keeperConfig, keeperMeta); + return startRedisKeeperServer(replId.id(), keeperConfig, keeperMeta); } private void waitKeeperSyncWithRedis(RedisKeeperServer keeperServer) throws Exception { diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStoreManagerTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStoreManagerTest.java index 314f3e8b1a..2e3213ebcc 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStoreManagerTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStoreManagerTest.java @@ -4,14 +4,9 @@ import com.ctrip.xpipe.lifecycle.LifecycleHelper; import com.ctrip.xpipe.redis.core.protocal.protocal.LenEofType; import com.ctrip.xpipe.redis.core.redis.RunidGenerator; -import com.ctrip.xpipe.redis.core.store.ClusterId; -import com.ctrip.xpipe.redis.core.store.MetaStore; -import com.ctrip.xpipe.redis.core.store.ReplicationStore; -import com.ctrip.xpipe.redis.core.store.ShardId; +import com.ctrip.xpipe.redis.core.store.*; import com.ctrip.xpipe.redis.keeper.AbstractRedisKeeperTest; import com.ctrip.xpipe.redis.keeper.config.TestKeeperConfig; -import com.ctrip.xpipe.utils.FileUtils; -import com.google.common.io.Files; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.junit.Assert; @@ -24,7 +19,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; -import static org.mockito.Mockito.spy; /** * @author marsqing @@ -243,10 +237,8 @@ public void test() throws Exception { String keeperRunid = randomKeeperRunid(); File baseDir = new File(getTestFileDir()); - ClusterId clusterId = getClusterId(); - ShardId shardId = getShardId(); - DefaultReplicationStoreManager mgr = (DefaultReplicationStoreManager) createReplicationStoreManager(clusterId, - shardId, keeperRunid, baseDir); + ReplId replId = getReplId(); + DefaultReplicationStoreManager mgr = (DefaultReplicationStoreManager) createReplicationStoreManager(replId, keeperRunid, baseDir); LifecycleHelper.initializeIfPossible(mgr); @@ -255,8 +247,7 @@ public void test() throws Exception { currentStore = mgr.create(); - assertEquals(clusterId, mgr.getClusterId()); - assertEquals(shardId, mgr.getShardId()); + assertEquals(replId, mgr.getReplId()); assertEquals(currentStore, mgr.getCurrent()); DefaultReplicationStore newCurrentStore = (DefaultReplicationStore) mgr.create(); @@ -271,7 +262,7 @@ public void test() throws Exception { cmdBuf.writeByte(9); newCurrentStore.cmdStore.appendCommands(cmdBuf); - DefaultReplicationStoreManager mgr2 = (DefaultReplicationStoreManager) createReplicationStoreManager(clusterId,shardId, keeperRunid, baseDir); + DefaultReplicationStoreManager mgr2 = (DefaultReplicationStoreManager) createReplicationStoreManager(replId, keeperRunid, baseDir); LifecycleHelper.initializeIfPossible(mgr2); assertEquals(metaStore.getReplId(), mgr2.getCurrent().getMetaStore().getReplId()); diff --git a/redis/redis-keeper/src/test/resources/log4j2.xml b/redis/redis-keeper/src/test/resources/log4j2.xml index c2258edaa5..517c2e3f91 100644 --- a/redis/redis-keeper/src/test/resources/log4j2.xml +++ b/redis/redis-keeper/src/test/resources/log4j2.xml @@ -10,7 +10,7 @@ - + @@ -18,7 +18,7 @@ - + diff --git a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/applier/manager/DefaultApplierManager.java b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/applier/manager/DefaultApplierManager.java index bdf2ff8a5c..cf26ddb12b 100644 --- a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/applier/manager/DefaultApplierManager.java +++ b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/applier/manager/DefaultApplierManager.java @@ -248,6 +248,12 @@ public void visitAdded(ShardMeta added) { public void visitModified(@SuppressWarnings("rawtypes") MetaComparator comparator) { ShardMetaComparator shardMetaComparator = (ShardMetaComparator) comparator; ShardMeta shard = shardMetaComparator.getCurrent(); + if (null == shardMetaComparator.getFuture()) { + // shard migrate out + for (ApplierMeta applier: shard.getAppliers()) { + removeApplier(clusterDbId, shard.getDbId(), applier); + } + } shardMetaComparator.accept(new ShardComparatorVisitor(clusterDbId, shard.getDbId())); } diff --git a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/container/DefaultKeeperContainerService.java b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/container/DefaultKeeperContainerService.java index 467956c00a..fc191a5d90 100644 --- a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/container/DefaultKeeperContainerService.java +++ b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/container/DefaultKeeperContainerService.java @@ -29,6 +29,16 @@ public DefaultKeeperContainerService(KeeperContainerMeta keeperContainerMeta, Re this.restTemplate = restTemplate; } + @Override + public KeeperInstanceMeta infoPort(int queryPort) { + try { + return restTemplate.getForObject("http://{ip}:{port}/keepers/port/{queryPort}}", KeeperInstanceMeta.class, + keeperContainerMeta.getIp(), keeperContainerMeta.getPort(), queryPort); + } catch (HttpStatusCodeException ex) { + throw KeeperContainerErrorParser.parseErrorFromHttpException(ex); + } + } + @Override public void addKeeper(KeeperTransMeta keeperTransMeta) { try { diff --git a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/DefaultKeeperElectorManager.java b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/DefaultKeeperElectorManager.java index c3f23a8843..491217b57d 100644 --- a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/DefaultKeeperElectorManager.java +++ b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/DefaultKeeperElectorManager.java @@ -67,6 +67,7 @@ protected void observerShardLeader(final Long clusterDbId, final Long shardDbId) try { List pathChildrenCaches = new ArrayList<>(); pathChildrenCaches.add(buildPathChildrenCacheByDbId(clusterDbId, shardDbId, client)); + pathChildrenCaches.add(buildPathChildrenCacheByShard(shardDbId, client)); ReentrantLock lock = new ReentrantLock(); pathChildrenCaches.forEach(pathChildrenCache -> { @@ -131,6 +132,13 @@ private PathChildrenCache buildPathChildrenCacheByDbId(Long clusterDbId, Long sh XpipeThreadFactory.create(String.format("PathChildrenCache:cluster_%d-shard_%d", clusterDbId, shardDbId))); } + private PathChildrenCache buildPathChildrenCacheByShard(Long shardDbId, CuratorFramework client) { + final String leaderLatchPath = MetaZkConfig.getKeeperLeaderLatchPath(shardDbId); + logger.info("[observerShardLeader][add PathChildrenCache]shard_{}, {}", shardDbId, leaderLatchPath); + return new PathChildrenCache(client, leaderLatchPath, true, + XpipeThreadFactory.create(String.format("PathChildrenCache:shard_%d", shardDbId))); + } + private List> aggregateChildData(List pathChildrenCaches) { List> children = new ArrayList<>(); pathChildrenCaches.forEach(pathChildrenCache -> children.add(pathChildrenCache.getCurrentData())); diff --git a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AbstractKeeperCommand.java b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AbstractKeeperCommand.java index f879629dce..bc0e1cd4f1 100644 --- a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AbstractKeeperCommand.java +++ b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AbstractKeeperCommand.java @@ -46,6 +46,7 @@ protected void doExecute() throws Exception { @SuppressWarnings("unchecked") private void doOpetation() { try{ + preKeeperContainerOperation(); doKeeperContainerOperation(); checkUntilStateOk(); }catch(KeeperContainerException e){ @@ -88,6 +89,8 @@ protected RetryPolicy createRetryPolicy() { protected abstract boolean isSuccess(ErrorMessage error); + protected abstract void preKeeperContainerOperation(); + protected abstract void doKeeperContainerOperation(); diff --git a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AddKeeperCommand.java b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AddKeeperCommand.java index 45f03b6216..066b753603 100644 --- a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AddKeeperCommand.java +++ b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AddKeeperCommand.java @@ -10,6 +10,7 @@ import com.ctrip.xpipe.exception.ErrorMessage; import com.ctrip.xpipe.exception.ExceptionUtils; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; import com.ctrip.xpipe.redis.core.entity.KeeperMeta; import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; import com.ctrip.xpipe.redis.core.keeper.container.KeeperContainerErrorCode; @@ -43,6 +44,27 @@ public AddKeeperCommand(KeeperContainerService keeperContainerService, KeeperTra super(keeperContainerService, keeperTransMeta, scheduled, timeoutMilli, checkIntervalMilli); } + @Override + protected void preKeeperContainerOperation() { + try { + int usePort = keeperTransMeta.getKeeperMeta().getPort(); + KeeperInstanceMeta keeperInstanceMeta = keeperContainerService.infoPort(usePort); + if (null != keeperInstanceMeta.getReplId() && !keeperInstanceMeta.getReplId().equals(keeperTransMeta.getReplId())) { + getLogger().info("[preKeeperContainerOperation][{}-{}][remove unknown keeper use the same port] found:{}", + keeperTransMeta.getClusterDbId(), keeperTransMeta.getShardDbId(), keeperInstanceMeta); + keeperContainerService.removeKeeper(keeperInstanceMeta); + } + } catch (Throwable th) { + if (getLogger().isDebugEnabled()) { + getLogger().info("[preKeeperContainerOperation][{}-{}][fail]", + keeperTransMeta.getClusterDbId(), keeperTransMeta.getShardDbId(), th); + } else if (getLogger().isInfoEnabled()) { + getLogger().info("[preKeeperContainerOperation][{}-{}][fail] {}", + keeperTransMeta.getClusterDbId(), keeperTransMeta.getShardDbId(), th.getMessage()); + } + } + } + @Override protected void doKeeperContainerOperation() { keeperContainerService.addOrStartKeeper(keeperTransMeta); diff --git a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DefaultKeeperManager.java b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DefaultKeeperManager.java index 99bcdd392a..6d301b8055 100644 --- a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DefaultKeeperManager.java +++ b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DefaultKeeperManager.java @@ -132,7 +132,7 @@ protected void handleClusterDeleted(ClusterMeta clusterMeta) { private void removeKeeper(Long clusterDbId, Long shardDbId, KeeperMeta keeperMeta) { try { - keeperStateController.removeKeeper(new KeeperTransMeta(clusterDbId, shardDbId, keeperMeta)); + keeperStateController.removeKeeper(new KeeperTransMeta(clusterDbId, shardDbId, shardDbId, keeperMeta)); } catch (Exception e) { logger.error(String.format("[removeKeeper]cluster_%s:shard_%s,%s", clusterDbId, shardDbId, keeperMeta), e); } @@ -140,7 +140,7 @@ private void removeKeeper(Long clusterDbId, Long shardDbId, KeeperMeta keeperMet private void addKeeper(Long clusterDbId, Long shardDbId, KeeperMeta keeperMeta) { try { - KeeperTransMeta keeperTransMeta = new KeeperTransMeta(clusterDbId, shardDbId, keeperMeta); + KeeperTransMeta keeperTransMeta = new KeeperTransMeta(clusterDbId, shardDbId, shardDbId, keeperMeta); keeperStateController.addKeeper(keeperTransMeta); } catch (Exception e) { logger.error(String.format("[addKeeper]cluster_%s:shard_%s,%s", clusterDbId, shardDbId, keeperMeta), e); @@ -206,7 +206,7 @@ protected void doCheckShard(ClusterMeta clusterMeta, ShardMeta shardMeta) { private void addDeadKeepers(List deadKeepers, Long clusterDbId, Long shardDbId) { for (KeeperMeta deadKeeper : deadKeepers) { try { - KeeperTransMeta keeperTransMeta = new KeeperTransMeta(clusterDbId, shardDbId, deadKeeper); + KeeperTransMeta keeperTransMeta = new KeeperTransMeta(clusterDbId, shardDbId, shardDbId, deadKeeper); keeperStateController.addKeeper(keeperTransMeta); } catch (ResourceAccessException e) { logger.error(String.format("[check dead keepers]cluster_%d,shard_%d, keeper:%s, error:%s", clusterDbId, shardDbId, @@ -220,7 +220,7 @@ private void addDeadKeepers(List deadKeepers, Long clusterDbId, Long private void removeRemovedKeepers(List removedKeepers, Long clusterDbId, Long shardDbId) { for (KeeperMeta removedKeeper : removedKeepers) { try { - KeeperTransMeta keeperTransMeta = new KeeperTransMeta(clusterDbId, shardDbId, removedKeeper); + KeeperTransMeta keeperTransMeta = new KeeperTransMeta(clusterDbId, shardDbId, shardDbId, removedKeeper); keeperStateController.removeKeeper(keeperTransMeta); } catch (ResourceAccessException e) { logger.error(String.format("[check removed keepers]cluster_%d,shard_%d, keeper:%s, error:%s", clusterDbId, shardDbId, diff --git a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DeleteKeeperCommand.java b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DeleteKeeperCommand.java index b96ca9829b..12947706e2 100644 --- a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DeleteKeeperCommand.java +++ b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DeleteKeeperCommand.java @@ -31,6 +31,9 @@ public DeleteKeeperCommand(KeeperContainerService keeperContainerService, Keeper super(keeperContainerService, keeperTransMeta, scheduled, timeoutMilli, checkIntervalMilli); } + @Override + protected void preKeeperContainerOperation() { + } @Override protected void doKeeperContainerOperation() { diff --git a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/meta/CurrentMeta.java b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/meta/CurrentMeta.java index 42bcac4266..4aca00b771 100644 --- a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/meta/CurrentMeta.java +++ b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/meta/CurrentMeta.java @@ -406,7 +406,14 @@ public void visitRemoved(ShardMeta removed) { @Override public void visitModified(@SuppressWarnings("rawtypes") MetaComparator comparator) { - currentClusterMeta.changeShard((ShardMetaComparator) comparator); + ShardMetaComparator shardMetaComparator = (ShardMetaComparator) comparator; + ShardMeta shard = ((ShardMetaComparator) comparator).getCurrent(); + if (null != shard && null == shardMetaComparator.getFuture()) { + // shard migrate out + currentClusterMeta.removeShard(shard); + } else { + currentClusterMeta.changeShard(shardMetaComparator); + } } @Override diff --git a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/meta/impl/DefaultDcMetaCache.java b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/meta/impl/DefaultDcMetaCache.java index 5ce46d1e75..290aa05ac6 100644 --- a/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/meta/impl/DefaultDcMetaCache.java +++ b/redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/meta/impl/DefaultDcMetaCache.java @@ -160,6 +160,7 @@ protected void changeDcMeta(DcMeta current, DcMeta future, final long metaLoadTi } DcMetaComparator dcMetaComparator = new DcMetaComparator(current, future); + dcMetaComparator.setShardMigrateSupport(); dcMetaComparator.compare(); if (dcMetaComparator.totalChangedCount() - drClusterNums(dcMetaComparator) > META_MODIFY_PROTECT_COUNT) { diff --git a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/AbstractMetaServerTest.java b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/AbstractMetaServerTest.java index 3a1709648e..b4941a3947 100644 --- a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/AbstractMetaServerTest.java +++ b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/AbstractMetaServerTest.java @@ -2,12 +2,20 @@ import com.ctrip.xpipe.redis.core.AbstractRedisTest; +import com.ctrip.xpipe.redis.core.entity.ClusterMeta; +import com.ctrip.xpipe.redis.core.entity.ShardMeta; +import com.ctrip.xpipe.redis.core.entity.SourceMeta; import com.ctrip.xpipe.redis.meta.server.config.DefaultMetaServerConfig; import com.ctrip.xpipe.redis.meta.server.config.MetaServerConfig; import com.ctrip.xpipe.testutils.MemoryPrinter; import org.junit.After; import org.junit.Before; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * @author wenchao.meng * @@ -65,5 +73,23 @@ public Long getClusterDbId() { public Long getShardDbId() { return shardDbId; } - + + public void exchangeClusterShards(ClusterMeta cluster1, ClusterMeta cluster2) { + Map shards1 = new HashMap<>(cluster1.getShards()); + Map shards2 = new HashMap<>(cluster2.getShards()); + cluster1.getShards().clear(); + cluster2.getShards().clear(); + shards1.values().forEach(cluster2::addShard); + shards2.values().forEach(cluster1::addShard); + } + + public void exchangeClusterSources(ClusterMeta cluster1, ClusterMeta cluster2) { + List sources1 = new ArrayList<>(cluster1.getSources()); + List sources2 = new ArrayList<>(cluster2.getSources()); + cluster1.getSources().clear(); + cluster2.getSources().clear(); + sources1.forEach(cluster2::addSource); + sources2.forEach(cluster1::addSource); + } + } diff --git a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/applier/manager/DefaultApplierManagerTest.java b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/applier/manager/DefaultApplierManagerTest.java index d89ce66f28..e3bd088a04 100644 --- a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/applier/manager/DefaultApplierManagerTest.java +++ b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/applier/manager/DefaultApplierManagerTest.java @@ -8,8 +8,11 @@ import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.core.entity.*; import com.ctrip.xpipe.redis.core.meta.MetaClone; +import com.ctrip.xpipe.redis.core.meta.MetaComparator; import com.ctrip.xpipe.redis.core.meta.comparator.ClusterMetaComparator; +import com.ctrip.xpipe.redis.core.meta.comparator.DcMetaComparator; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; +import com.ctrip.xpipe.redis.meta.server.AbstractMetaServerTest; import com.ctrip.xpipe.redis.meta.server.keeper.applier.ApplierStateController; import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager; import com.ctrip.xpipe.tuple.Pair; @@ -20,6 +23,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -30,7 +34,7 @@ *

* 2022/4/7 11:42 */ -public class DefaultApplierManagerTest extends AbstractTest { +public class DefaultApplierManagerTest extends AbstractMetaServerTest { private DefaultApplierManager manager = new DefaultApplierManager(); diff --git a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/AbstractKeeperElectorManagerTest.java b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/AbstractKeeperElectorManagerTest.java index 41c84314d6..2be1e7b1ed 100644 --- a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/AbstractKeeperElectorManagerTest.java +++ b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/AbstractKeeperElectorManagerTest.java @@ -48,6 +48,12 @@ protected LeaderElector addKeeperZkNode(Long clusterDbId, Long shardDbId, ZkClie return addKeeperZkNode(zkClient, leaderElectionID, leaderElectionZKPath); } + protected LeaderElector addKeeperZkNode(Long shardDbId, ZkClient zkClient, KeeperMeta keeperMeta) throws Exception { + String leaderElectionZKPath = MetaZkConfig.getKeeperLeaderLatchPath(shardDbId); + String leaderElectionID = MetaZkConfig.getKeeperLeaderElectionId(keeperMeta); + return addKeeperZkNode(zkClient, leaderElectionID, leaderElectionZKPath); + } + protected LeaderElector addKeeperZkNode(ZkClient zkClient, String leaderElectionID, String leaderElectionZKPath) throws Exception { ElectContext ctx = new ElectContext(leaderElectionZKPath, leaderElectionID); LeaderElector leaderElector = new DefaultLeaderElector(ctx, zkClient.get()); diff --git a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/MultiPathKeeperElectorManagerTest.java b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/MultiPathKeeperElectorManagerTest.java index 62097fe5ce..99454e9571 100644 --- a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/MultiPathKeeperElectorManagerTest.java +++ b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/elect/MultiPathKeeperElectorManagerTest.java @@ -76,12 +76,47 @@ public void beforeDefaultKeeperElectorManagerTest() throws Exception{ public void testLeaderSelect() throws Exception { keeperElectorManager.observerShardLeader(clusterDbId, shardDbId); + LeaderElector active = addKeeperZkNode(shardDbId, getZkClient(), keeper1); + LeaderElector backup = addKeeperZkNode(shardDbId, getZkClient(), keeper2); + waitConditionUntilTimeOut(()->assertSuccess(()->{ + verify(currentMetaManager, times(2)).setSurviveKeepers(anyLong(), anyLong(), anyList(), any(KeeperMeta.class)); + })); + + Assert.assertEquals(Arrays.asList(MetaClone.clone(keeper1).setActive(true), MetaClone.clone(keeper2).setActive(false)), surviveKeepers.get()); + Assert.assertEquals("1", activeKeeper.get().getId()); + } + + @Test + public void testPathSwitch() throws Exception { + keeperElectorManager.observerShardLeader(clusterDbId, shardDbId); + LeaderElector active = addKeeperZkNode(clusterDbId, shardDbId, getZkClient(), keeper1); LeaderElector backup = addKeeperZkNode(clusterDbId, shardDbId, getZkClient(), keeper2); waitConditionUntilTimeOut(()->assertSuccess(()->{ verify(currentMetaManager, times(2)).setSurviveKeepers(anyLong(), anyLong(), anyList(), any(KeeperMeta.class)); })); + // active switch to path for ids + active.stop(); + waitConditionUntilTimeOut(()->assertSuccess(()->{ + verify(currentMetaManager, times(3)).setSurviveKeepers(anyLong(), anyLong(), anyList(), any(KeeperMeta.class)); + })); + Assert.assertEquals(Arrays.asList(MetaClone.clone(keeper2).setActive(true)), surviveKeepers.get()); + Assert.assertEquals("2", activeKeeper.get().getId()); + + active = addKeeperZkNode(shardDbId, getZkClient(), keeper1); + waitConditionUntilTimeOut(()->assertSuccess(()->{ + verify(currentMetaManager, times(4)).setSurviveKeepers(anyLong(), anyLong(), anyList(), any(KeeperMeta.class)); + })); + Assert.assertEquals(Arrays.asList(MetaClone.clone(keeper2).setActive(true), MetaClone.clone(keeper1).setActive(false)), surviveKeepers.get()); + Assert.assertEquals("2", activeKeeper.get().getId()); + + // backup switch to path for ids + backup.stop(); + backup = addKeeperZkNode(shardDbId, getZkClient(), keeper2); + waitConditionUntilTimeOut(()->assertSuccess(()->{ + verify(currentMetaManager, times(6)).setSurviveKeepers(anyLong(), anyLong(), anyList(), any(KeeperMeta.class)); + })); Assert.assertEquals(Arrays.asList(MetaClone.clone(keeper1).setActive(true), MetaClone.clone(keeper2).setActive(false)), surviveKeepers.get()); Assert.assertEquals("1", activeKeeper.get().getId()); } diff --git a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AddKeeperCommandTest.java b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AddKeeperCommandTest.java index c1a78ae1c0..b683c43878 100644 --- a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AddKeeperCommandTest.java +++ b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/AddKeeperCommandTest.java @@ -5,15 +5,14 @@ import com.ctrip.xpipe.exception.ExceptionUtils; import com.ctrip.xpipe.lifecycle.LifecycleHelper; import com.ctrip.xpipe.netty.ByteBufUtils; -import com.ctrip.xpipe.redis.core.entity.KeeperContainerMeta; +import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; import com.ctrip.xpipe.redis.core.entity.KeeperMeta; import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; import com.ctrip.xpipe.redis.core.keeper.container.KeeperContainerService; import com.ctrip.xpipe.redis.core.protocal.MASTER_STATE; import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole; +import com.ctrip.xpipe.redis.core.store.ReplId; import com.ctrip.xpipe.redis.meta.server.AbstractMetaServerTest; -import com.ctrip.xpipe.redis.meta.server.keeper.container.DefaultKeeperContainerService; -import com.ctrip.xpipe.redis.meta.server.keeper.container.DefaultKeeperContainerServiceFactory; import com.ctrip.xpipe.simpleserver.Server; import com.google.common.util.concurrent.SettableFuture; import org.junit.Assert; @@ -21,6 +20,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import java.util.concurrent.Callable; @@ -53,7 +53,7 @@ public void beforeAddKeeperCommandTest(){ keeperMeta.setPort(keeperPort); keeperMeta.setIp("127.0.0.1"); - keeperTransMeta = new KeeperTransMeta(1L, 1L, keeperMeta); + keeperTransMeta = new KeeperTransMeta(1L, 1L, 1L, keeperMeta); addKeeperCommand = new AddKeeperCommand(keeperContainerService, keeperTransMeta, scheduled, timeoutMilli, checkInterval); } @@ -204,4 +204,22 @@ public String call() throws Exception { Assert.assertEquals(MASTER_STATE.REDIS_REPL_CONNECTED, keeperRole.getMasterState()); } + @Test + public void testRemoveWildKeeperBeforeAdd() throws Exception { + SlaveRole keeperRole = new SlaveRole(SERVER_ROLE.KEEPER, "127.0.0.1", randomPort(), MASTER_STATE.REDIS_REPL_CONNECTED, 0); + KeeperInstanceMeta instanceMeta = new KeeperInstanceMeta(new ReplId(2L), keeperTransMeta.getKeeperMeta()); + Mockito.when(keeperContainerService.infoPort(keeperPort)).thenReturn(instanceMeta); + + Server server = null; + try { + server = startServer(keeperPort, ByteBufUtils.readToString(keeperRole.format())); + SlaveRole real = addKeeperCommand.execute().get(); + Mockito.verify(keeperContainerService, Mockito.times(1)).removeKeeper(instanceMeta); + Mockito.verify(keeperContainerService, Mockito.times(1)).addOrStartKeeper(keeperTransMeta); + Assert.assertEquals(keeperRole, real); + } finally { + if (null != server) server.stop(); + } + } + } diff --git a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DefaultKeeperManagerTest.java b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DefaultKeeperManagerTest.java index abe532e127..12742b0621 100644 --- a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DefaultKeeperManagerTest.java +++ b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/keeper/manager/DefaultKeeperManagerTest.java @@ -1,6 +1,5 @@ package com.ctrip.xpipe.redis.meta.server.keeper.manager; -import com.ctrip.xpipe.AbstractTest; import com.ctrip.xpipe.api.endpoint.Endpoint; import com.ctrip.xpipe.api.pool.SimpleKeyedObjectPool; import com.ctrip.xpipe.concurrent.KeyedOneThreadMutexableTaskExecutor; @@ -8,8 +7,13 @@ import com.ctrip.xpipe.netty.commands.NettyClient; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.core.entity.*; +import com.ctrip.xpipe.redis.core.meta.MetaClone; +import com.ctrip.xpipe.redis.core.meta.MetaComparator; +import com.ctrip.xpipe.redis.core.meta.comparator.ClusterMetaComparator; +import com.ctrip.xpipe.redis.core.meta.comparator.DcMetaComparator; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; +import com.ctrip.xpipe.redis.meta.server.AbstractMetaServerTest; import com.ctrip.xpipe.redis.meta.server.keeper.KeeperStateController; import com.ctrip.xpipe.redis.meta.server.keeper.manager.DefaultKeeperManager.ActiveKeeperInfoChecker; import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager; @@ -22,6 +26,7 @@ import org.junit.*; import java.util.Arrays; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -30,7 +35,7 @@ import static org.mockito.Mockito.*; -public class DefaultKeeperManagerTest extends AbstractTest { +public class DefaultKeeperManagerTest extends AbstractMetaServerTest { private DefaultKeeperManager manager = new DefaultKeeperManager(); @@ -83,7 +88,7 @@ public void testDoCheckNotRemovedWithDeadKeepers() { when(currentMetaManager.getSurviveKeepers(clusterDbId, shardDbId)).thenReturn(Arrays.asList(surviveKeeper)); checker.doCheckShard(new ClusterMeta(clusterId).setDbId(clusterDbId), shardMeta); - verify(keeperStateController, times(1)).addKeeper(new KeeperTransMeta(clusterDbId, shardDbId, metaKeeper)); + verify(keeperStateController, times(1)).addKeeper(new KeeperTransMeta(clusterDbId, shardDbId, shardDbId, metaKeeper)); verify(keeperStateController, times(0)).removeKeeper(any()); } @@ -98,7 +103,7 @@ public void testDoCheckRemoved() { checker.doCheckShard(new ClusterMeta(clusterId).setDbId(clusterDbId), shardMeta); verify(keeperStateController, times(0)).addKeeper(any()); - verify(keeperStateController, times(1)).removeKeeper(new KeeperTransMeta(clusterDbId, shardDbId, surviveKeeper)); + verify(keeperStateController, times(1)).removeKeeper(new KeeperTransMeta(clusterDbId, shardDbId, shardDbId, surviveKeeper)); } @Test @@ -570,4 +575,25 @@ protected byte[] getToWrite(Object readResult) { InfoCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI = originTimeout; } } + + @Test + public void testShardMigrate() { + DcMeta current = getDcMeta("jq"); + DcMeta future = MetaClone.clone(current); + exchangeClusterShards(future.getClusters().get("cluster1"), future.getClusters().get("cluster2")); + DcMetaComparator comparator = new DcMetaComparator(current, future); + comparator.setShardMigrateSupport(); + comparator.compare(); + + Set clusterMetaComparators = comparator.getMofified(); + Assert.assertEquals(2, clusterMetaComparators.size()); + + clusterMetaComparators.forEach(clusterMetaComparator -> { + manager.handleClusterModified((ClusterMetaComparator) clusterMetaComparator); + }); + // allow redundant addAndStart keeper/applier job since the job is idempotent + verify(keeperStateController, times(6)).addKeeper(any()); + verify(keeperStateController, never()).removeKeeper(any()); + } + } \ No newline at end of file diff --git a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/meta/CurrentMetaTest.java b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/meta/CurrentMetaTest.java index 1314e962bc..a7a24e2555 100644 --- a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/meta/CurrentMetaTest.java +++ b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/meta/CurrentMetaTest.java @@ -4,7 +4,9 @@ import com.ctrip.xpipe.gtid.GtidSet; import com.ctrip.xpipe.redis.core.entity.*; import com.ctrip.xpipe.redis.core.meta.MetaClone; +import com.ctrip.xpipe.redis.core.meta.MetaComparator; import com.ctrip.xpipe.redis.core.meta.comparator.ClusterMetaComparator; +import com.ctrip.xpipe.redis.core.meta.comparator.DcMetaComparator; import com.ctrip.xpipe.redis.meta.server.AbstractMetaServerTest; import com.ctrip.xpipe.redis.meta.server.meta.impl.CurrentOneWayShardMeta; import com.ctrip.xpipe.tuple.Pair; @@ -18,6 +20,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -269,6 +272,35 @@ public void testChange() { Assert.assertTrue(currentMeta.hasShard(clusterDbId, newShardDbId)); } + @Test + public void testShardsMigrateOut() { + DcMeta current = getDcMeta("jq"); + DcMeta future = MetaClone.clone(current); + exchangeClusterShards(future.getClusters().get("cluster1"), future.getClusters().get("cluster2")); + DcMetaComparator comparator = new DcMetaComparator(current, future); + comparator.setShardMigrateSupport(); + comparator.compare(); + + Set clusterMetaComparators = comparator.getMofified(); + Assert.assertEquals(2, clusterMetaComparators.size()); + + CurrentMeta currentMeta = new CurrentMeta(); + comparator.getMofified().forEach(clusterMetaComparator -> { + currentMeta.addCluster(((ClusterMetaComparator)clusterMetaComparator).getCurrent()); + }); + comparator.getMofified().forEach(clusterMetaComparator -> currentMeta.changeCluster(((ClusterMetaComparator)clusterMetaComparator))); + + AtomicBoolean dismatch = new AtomicBoolean(false); + currentMeta.allClusterMetas().forEach(currentClusterMeta -> { + ClusterMeta clusterMeta = future.getClusters().get(currentClusterMeta.getClusterId()); + Set metaShardIds = clusterMeta.getAllShards().values().stream().map(ShardMeta::getDbId).collect(Collectors.toSet()); + Set currentShardIds = currentClusterMeta.getClusterMetas().keySet(); + if (!metaShardIds.equals(currentShardIds)) dismatch.set(true); + }); + + Assert.assertFalse(dismatch.get()); + } + @Test public void testSetInfoForCRDTCluster() { diff --git a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/meta/impl/DefaultCurrentMetaManagerTest.java b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/meta/impl/DefaultCurrentMetaManagerTest.java index eb294610e4..2065c096f8 100644 --- a/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/meta/impl/DefaultCurrentMetaManagerTest.java +++ b/redis/redis-meta/src/test/java/com/ctrip/xpipe/redis/meta/server/meta/impl/DefaultCurrentMetaManagerTest.java @@ -4,6 +4,7 @@ import com.ctrip.xpipe.cluster.ClusterType; import com.ctrip.xpipe.observer.NodeAdded; import com.ctrip.xpipe.redis.core.entity.*; +import com.ctrip.xpipe.redis.core.meta.MetaClone; import com.ctrip.xpipe.redis.core.meta.comparator.ClusterMetaComparator; import com.ctrip.xpipe.redis.core.meta.comparator.DcMetaComparator; import com.ctrip.xpipe.redis.core.meta.comparator.DcRouteMetaComparator; diff --git a/redis/redis-meta/src/test/resources/meta-test.xml b/redis/redis-meta/src/test/resources/meta-test.xml index 2142ebd055..b5febe386f 100644 --- a/redis/redis-meta/src/test/resources/meta-test.xml +++ b/redis/redis-meta/src/test/resources/meta-test.xml @@ -24,6 +24,18 @@ + + + + + + + + + + + + diff --git a/services/ctrip-integration-test/src/test/resources/log4j2.xml b/services/ctrip-integration-test/src/test/resources/log4j2.xml index 9137c4a757..bb3ac8c1c9 100644 --- a/services/ctrip-integration-test/src/test/resources/log4j2.xml +++ b/services/ctrip-integration-test/src/test/resources/log4j2.xml @@ -3,12 +3,12 @@ - + - +