From e5663cd1d9dc376b170ef0ffbb906b0b4384c649 Mon Sep 17 00:00:00 2001 From: lishanglin Date: Tue, 15 Oct 2024 22:14:50 +0800 Subject: [PATCH] merge mark request from multi checkers --- .../xpipe/redis/checker/CheckerService.java | 1 - .../controller/CheckerHealthController.java | 11 +- .../AbstractDelayPingActionCollector.java | 9 ++ .../AbstractPsubPingActionCollector.java | 9 ++ .../DefaultAggregatorPullService.java | 39 ++++-- .../DefaultDelayPingActionCollector.java | 29 ++++ .../DefaultPsubPingActionCollector.java | 23 ++++ .../interaction/HealthStateService.java | 4 + .../actions/interaction/HealthStatus.java | 42 ++++++ .../actions/interaction/HealthStatusDesc.java | 16 +++ .../data/XPipeInstanceHealthHolder.java | 13 ++ .../event/AbstractInstanceEvent.java | 7 + .../handler/DefaultOuterClientAggregator.java | 21 +++ .../actions/ping/PingActionFactory.java | 4 +- .../checker/impl/CheckerProxyManager.java | 1 - .../checker/impl/DefaultCheckerService.java | 1 - .../ctrip/xpipe/redis/checker/AllTests.java | 12 +- .../DefaultAggregatorPullServiceTest.java | 130 ++++++++++++++++++ .../DefaultDelayPingActionCollectorTest.java | 70 ++++++++++ .../DefaultOuterClientAggregatorTest.java | 56 +++++--- .../impl/ConsoleCachedHealthStateService.java | 15 ++ 21 files changed, 466 insertions(+), 47 deletions(-) create mode 100644 redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullServiceTest.java create mode 100644 redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultDelayPingActionCollectorTest.java diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java index 7261364de2..d5142ee8d7 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java @@ -5,7 +5,6 @@ import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java index c414c10c02..91a69eef58 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java @@ -6,10 +6,7 @@ import com.ctrip.xpipe.redis.checker.RedisInfoManager; import com.ctrip.xpipe.redis.checker.controller.result.ActionContextRetMessage; import com.ctrip.xpipe.redis.checker.healthcheck.*; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultPsubPingActionCollector; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.*; import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats.KeeperFlowCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.AbstractRedisConfigRuleAction; @@ -157,12 +154,14 @@ public Map getAllCrossRegionHealthStatusDesc() { @RequestMapping(value = "/health/check/instances/status", method = RequestMethod.POST) public Map getHealthCheckInstanceCluster(@RequestBody List hostPorts) { if (hostPorts == null || hostPorts.isEmpty()) return Collections.emptyMap(); + if (siteStability.isSiteStable()) return Collections.emptyMap(); + Map result = new HashMap<>(); for (HostPort hostPort : hostPorts) { if (Objects.equals(currentDc, metaCache.getDc(hostPort)) && metaCache.isCrossRegion(metaCache.getActiveDc(hostPort), currentDc)) { - result.put(hostPort, new HealthStatusDesc(hostPort, getCrossRegionHealthState(hostPort.getHost(), hostPort.getPort()))); + result.put(hostPort, defaultPsubPingActionCollector.getHealthStatusDesc(hostPort)); } else { - result.put(hostPort, new HealthStatusDesc(hostPort, getHealthState(hostPort.getHost(), hostPort.getPort()))); + result.put(hostPort, defaultDelayPingActionCollector.getHealthStatusDesc(hostPort)); } } return result; diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractDelayPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractDelayPingActionCollector.java index bbbea61cf7..cd9ef71e97 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractDelayPingActionCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractDelayPingActionCollector.java @@ -1,6 +1,7 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext; import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction; import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; @@ -34,6 +35,14 @@ protected void removeHealthStatus(HealthCheckAction action) { } } + protected HealthStatus getHealthStatus(HostPort hostPort) { + RedisHealthCheckInstance key = allHealthStatus.keySet().stream() + .filter(instance -> instance.getCheckInfo().getHostPort().equals(hostPort)) + .findFirst().orElse(null); + + return null == key ? null : allHealthStatus.get(key); + } + @Override public boolean supportInstance(RedisHealthCheckInstance instance) { return true; diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java index 33ba4ea6f9..bed16a4595 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java @@ -1,5 +1,6 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; +import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext; import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction; import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; @@ -44,6 +45,14 @@ public PsubActionListener createPsubActionListener() { return psubActionListener; } + protected HealthStatus getHealthStatus(HostPort hostPort) { + RedisHealthCheckInstance key = allHealthStatus.keySet().stream() + .filter(instance -> instance.getCheckInfo().getHostPort().equals(hostPort)) + .findFirst().orElse(null); + + return null == key ? null : allHealthStatus.get(key); + } + protected class CollectorPingActionListener implements PingActionListener { @Override diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullService.java index 29c08b4b88..19227d13b1 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullService.java @@ -14,6 +14,7 @@ import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder; import com.ctrip.xpipe.redis.core.meta.MetaCache; import com.ctrip.xpipe.utils.XpipeThreadFactory; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -38,7 +39,7 @@ public class DefaultAggregatorPullService implements AggregatorPullService{ private OuterClientService outerClientService = OuterClientService.DEFAULT; private static final Logger logger = LoggerFactory.getLogger(DefaultAggregatorPullService.class); - private ExecutorService executors; + private Executor executors; @PostConstruct public void postConstruct() { @@ -54,17 +55,30 @@ public Set getNeedAdjustInstances(String cluster, Set> xpipeQueryFuture = queryXPipeInstanceStatusCmd.execute(executors); + CommandFuture xpipeQueryFuture = queryXPipeInstanceStatusCmd.execute(executors); CommandFuture> outerClientQueryFuture = queryOuterClintInstanceStatusCmd.execute(executors); - Map xpipeAllHealthStatus = xpipeQueryFuture.get(); + XPipeInstanceHealthHolder xpipeInstanceHealthHolder = xpipeQueryFuture.get(); Map outerClientAllHealthStatus = outerClientQueryFuture.get(); + Map xpipeAllHealthStatus = xpipeInstanceHealthHolder.getAllHealthStatus(checkerConfig.getQuorum()); + Map lastMarks = xpipeInstanceHealthHolder.getOtherCheckerLastMark(); for (Map.Entry entry : xpipeAllHealthStatus.entrySet()) { - if (!outerClientAllHealthStatus.containsKey(entry.getKey()) - || !entry.getValue().equals(outerClientAllHealthStatus.get(entry.getKey()))) { - instanceNeedAdjust.add( - new HostPortDcStatus(entry.getKey().getHost(), entry.getKey().getPort(), metaCache.getDc(entry.getKey()), entry.getValue())); + HostPort instance = entry.getKey(); + Boolean expectMark = entry.getValue(); + if (null == expectMark) { + logger.info("[getNeedAdjustInstances][unknown host] {}", instance); + continue; + } + + if (!outerClientAllHealthStatus.containsKey(instance) + || !expectMark.equals(outerClientAllHealthStatus.get(instance))) { + if (lastMarks.containsKey(instance) && expectMark.equals(lastMarks.get(instance))) { + logger.info("[getNeedAdjustInstances][otherCheckerMark][{}-{}] skip", instance, expectMark); + continue; + } + instanceNeedAdjust.add(new HostPortDcStatus(instance.getHost(), instance.getPort(), + metaCache.getDc(instance), expectMark)); } } return instanceNeedAdjust; @@ -95,7 +109,7 @@ private void alertMarkInstance(String clusterName, Set instanc } } - private class QueryXPipeInstanceStatusCmd extends AbstractCommand> { + protected class QueryXPipeInstanceStatusCmd extends AbstractCommand { private String cluster; @@ -112,7 +126,7 @@ protected void doExecute() throws Throwable { for (CheckerService checkerService : remoteCheckerManager.getAllCheckerServices()) { xpipeInstanceHealthHolder.add(checkerService.getAllClusterInstanceHealthStatus(instances)); } - future().setSuccess(xpipeInstanceHealthHolder.getAllHealthStatus(checkerConfig.getQuorum())); + future().setSuccess(xpipeInstanceHealthHolder); } @Override @@ -125,7 +139,7 @@ public String getName() { } } - private class QueryOuterClintInstanceStatusCmd extends AbstractCommand> { + protected class QueryOuterClintInstanceStatusCmd extends AbstractCommand> { private String cluster; @@ -151,4 +165,9 @@ public String getName() { } } + @VisibleForTesting + protected void setExecutors(Executor executors) { + this.executors = executors; + } + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultDelayPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultDelayPingActionCollector.java index f9a8263856..e14629a201 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultDelayPingActionCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultDelayPingActionCollector.java @@ -13,6 +13,7 @@ import com.ctrip.xpipe.redis.checker.ClusterHealthManager; import com.ctrip.xpipe.redis.checker.alert.ALERT_TYPE; import com.ctrip.xpipe.redis.checker.alert.AlertManager; +import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckInstanceManager; import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; @@ -21,6 +22,7 @@ import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.processor.HealthEventProcessor; import com.ctrip.xpipe.utils.MapUtils; import com.ctrip.xpipe.utils.StringUtil; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -66,6 +68,9 @@ public class DefaultDelayPingActionCollector extends AbstractDelayPingActionColl @Autowired private AlertManager alertManager; + @Autowired + private CheckerConfig config; + private OuterClientService outerClientService = OuterClientService.DEFAULT; private FinalStateSetterManager finalStateSetterManager; @@ -152,6 +157,25 @@ public HEALTH_STATE getHealthState(HostPort hostPort) { return null; } + @Override + public HealthStatusDesc getHealthStatusDesc(HostPort hostPort) { + HealthStatus status = getHealthStatus(hostPort); + if (null != status) { + long timeoutMill = config.getMarkInstanceMaxDelayMilli() + config.getCheckerMetaRefreshIntervalMilli(); + return new HealthStatusDesc(hostPort, status, status.getLastMarkHandled(timeoutMill)); + } else { + return new HealthStatusDesc(hostPort, HEALTH_STATE.UNKNOWN); + } + } + + @Override + public void updateLastMarkHandled(HostPort hostPort, boolean lastMark) { + HealthStatus status = getHealthStatus(hostPort); + if (null != status) { + status.updateLastMarkHandled(lastMark); + } + } + @Override public Map getAllCachedState() { Map cachedHealthStatus = new HashMap<>(); @@ -207,4 +231,9 @@ private boolean activeDcCheckerSubscribeMasterTypeInstance(RedisHealthCheckInsta return ClusterType.lookup(azGroupType) == ClusterType.SINGLE_DC; } + @VisibleForTesting + protected void setScheduled(ScheduledExecutorService scheduled) { + this.scheduled = scheduled; + } + } \ No newline at end of file diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java index ca6307946a..56dc1e6b82 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java @@ -5,6 +5,7 @@ import com.ctrip.xpipe.api.observer.Observer; import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; import com.ctrip.xpipe.endpoint.HostPort; +import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; @@ -35,6 +36,9 @@ public class DefaultPsubPingActionCollector extends AbstractPsubPingActionCollec @Autowired private List healthEventProcessors; + @Autowired + private CheckerConfig config; + @Resource(name = SCHEDULED_EXECUTOR) private ScheduledExecutorService scheduled; @@ -51,6 +55,25 @@ public HEALTH_STATE getHealthState(HostPort hostPort) { return HEALTH_STATE.UNKNOWN; } + @Override + public HealthStatusDesc getHealthStatusDesc(HostPort hostPort) { + HealthStatus status = getHealthStatus(hostPort); + if (null != status) { + long timeoutMill = config.getMarkInstanceMaxDelayMilli() + config.getCheckerMetaRefreshIntervalMilli(); + return new HealthStatusDesc(hostPort, status, status.getLastMarkHandled(timeoutMill)); + } else { + return new HealthStatusDesc(hostPort, HEALTH_STATE.UNKNOWN); + } + } + + @Override + public void updateLastMarkHandled(HostPort hostPort, boolean lastMark) { + HealthStatus status = getHealthStatus(hostPort); + if (null != status) { + status.updateLastMarkHandled(lastMark); + } + } + @Override public Map getAllCachedState() { Map cachedHealthStatus = new HashMap<>(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStateService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStateService.java index 236b888732..b88f004ae2 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStateService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStateService.java @@ -12,8 +12,12 @@ public interface HealthStateService { HEALTH_STATE getHealthState(HostPort hostPort); + HealthStatusDesc getHealthStatusDesc(HostPort hostPort); + Map getAllCachedState(); void updateHealthState(Map redisStates); + void updateLastMarkHandled(HostPort hostPort, boolean lastMark); + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java index be0421bfaf..c79ddac951 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java @@ -18,6 +18,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.IntSupplier; /** @@ -47,6 +49,12 @@ public class HealthStatus extends AbstractObservable implements Startable, Stopp protected static final String currentDcId = FoundationService.DEFAULT.getDataCenter(); protected static Logger delayLogger = LoggerFactory.getLogger(HealthStatus.class.getName() + ".delay"); + private ReadWriteLock lastMarkHandledLock = new ReentrantReadWriteLock(); + + private volatile boolean lastMarkHandled = false; + + private volatile long lastMarkHandledTime = -1; + public HealthStatus(RedisHealthCheckInstance instance, ScheduledExecutorService scheduled){ this.instance = instance; this.scheduled = scheduled; @@ -275,4 +283,38 @@ public long getLastHealthyDelayTime() { return lastHealthDelayTime.get(); } + public void updateLastMarkHandled(boolean handleMarkUp) { + try { + lastMarkHandledLock.writeLock().lock(); + this.lastMarkHandled = handleMarkUp; + this.lastMarkHandledTime = System.currentTimeMillis(); + } finally { + lastMarkHandledLock.writeLock().unlock(); + } + } + + public Boolean getLastMarkHandled(long timeoutMill) { + try { + lastMarkHandledLock.readLock().lock(); + long current = System.currentTimeMillis(); + if (lastMarkHandledTime > current) { + resetLastMarkHandled(); + } else if (lastMarkHandledTime > 0 && lastMarkHandledTime + timeoutMill > current) { + return lastMarkHandled; + } + } finally { + lastMarkHandledLock.readLock().unlock(); + } + return null; + } + + private void resetLastMarkHandled() { + try { + lastMarkHandledLock.writeLock().lock(); + this.lastMarkHandledTime = -1; + } finally { + lastMarkHandledLock.writeLock().unlock(); + } + } + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatusDesc.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatusDesc.java index 63480c6cf6..24324c14fc 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatusDesc.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatusDesc.java @@ -12,6 +12,8 @@ public class HealthStatusDesc { private HEALTH_STATE state; + private Boolean lastMarkHandled = null; + private long lastPongTime = -1; private long lastHealthDelayTime = -1; @@ -25,11 +27,22 @@ public HealthStatusDesc(HostPort hostPort, HEALTH_STATE state) { this.state = state; } + public HealthStatusDesc(HostPort hostPort, HEALTH_STATE state, Boolean lastMarkHandled) { + this.hostPort = hostPort; + this.state = state; + this.lastMarkHandled = lastMarkHandled; + } + public HealthStatusDesc(HostPort hostPort, HealthStatus status) { + this(hostPort, status, null); + } + + public HealthStatusDesc(HostPort hostPort, HealthStatus status, Boolean lastMarkHandled) { this.hostPort = hostPort; this.state = status.getState(); this.lastPongTime = status.getLastPongTime(); this.lastHealthDelayTime = status.getLastHealthyDelayTime(); + this.lastMarkHandled = lastMarkHandled; } public HostPort getHostPort() { @@ -48,4 +61,7 @@ public long getLastHealthDelayTime() { return lastHealthDelayTime; } + public Boolean getLastMarkHandled() { + return lastMarkHandled; + } } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/data/XPipeInstanceHealthHolder.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/data/XPipeInstanceHealthHolder.java index 4bcd059249..45a12c943c 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/data/XPipeInstanceHealthHolder.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/data/XPipeInstanceHealthHolder.java @@ -72,6 +72,19 @@ public Map getAllHealthStatus(int quorum) { return result; } + public Map getOtherCheckerLastMark() { + Map lastMarks = new HashMap<>(); + for (Map hostPortHealthStatusDescMap : healthCheckResult) { + for (Map.Entry entry: hostPortHealthStatusDescMap.entrySet()) { + if (null != entry.getValue().getLastMarkHandled()) { + lastMarks.put(entry.getKey(), entry.getValue().getLastMarkHandled()); + } + } + } + + return lastMarks; + } + public List getHealthStatus(HostPort hostPort) { List statusList = new ArrayList<>(); healthCheckResult.forEach(result -> { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/event/AbstractInstanceEvent.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/event/AbstractInstanceEvent.java index 9abb9eb8c2..db82a597dd 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/event/AbstractInstanceEvent.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/event/AbstractInstanceEvent.java @@ -1,6 +1,7 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event; import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatus; /** * @author chen.zhu @@ -11,6 +12,8 @@ public class AbstractInstanceEvent { protected RedisHealthCheckInstance instance; + protected HealthStatus currentStatus; + public AbstractInstanceEvent(RedisHealthCheckInstance instance) { this.instance = instance; } @@ -19,6 +22,10 @@ public RedisHealthCheckInstance getInstance() { return instance; } + public HealthStatus getCurrentStatus() { + return currentStatus; + } + @Override public String toString() { return String.format("%s:%s", getInstance(), getClass().getSimpleName()); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregator.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregator.java index e0f85f4c2b..d092969cda 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregator.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregator.java @@ -8,6 +8,7 @@ import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.AggregatorPullService; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStateService; import com.ctrip.xpipe.utils.MapUtils; import com.ctrip.xpipe.utils.VisibleForTesting; import org.slf4j.Logger; @@ -40,6 +41,9 @@ public class DefaultOuterClientAggregator implements OuterClientAggregator{ @Autowired private AggregatorPullService aggregatorPullService; + @Autowired + private List healthStateServices; // loop depend? + private Random rand = new Random(); private KeyedOneThreadTaskExecutor clusterOneThreadTaskExecutor; @@ -139,6 +143,18 @@ protected void doExecute() throws Exception { logger.info("[aggregator][getEmpty] skip"); future().setSuccess(); } else { + for (HostPortDcStatus hostPortDcStatus: instancesToUpdate) { + for (HealthStateService stateService: healthStateServices) { + try { + stateService.updateLastMarkHandled( + new HostPort(hostPortDcStatus.getHost(), hostPortDcStatus.getPort()), + hostPortDcStatus.isCanRead()); + } catch (Throwable th) { + logger.info("[aggregator][updateLastMark][fail]", th); + } + } + } + for(int i=0; i < retry ;i++){ try{ logger.debug("[aggregator][begin] {}", cluster); @@ -169,4 +185,9 @@ public void setScheduled(ScheduledExecutorService scheduled) { this.scheduled = scheduled; } + @VisibleForTesting + public void setHealthStateServices(List healthStateServices) { + this.healthStateServices = healthStateServices; + } + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/ping/PingActionFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/ping/PingActionFactory.java index e9fc21a6e2..44441c4951 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/ping/PingActionFactory.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/ping/PingActionFactory.java @@ -87,9 +87,7 @@ public PingAction create(RedisHealthCheckInstance instance) { delayPingCollectorsByClusterType.get(clusterType).forEach(collector -> { if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); }); - } - - if (activeDc != null && metaCache.isCrossRegion(currentDcId, activeDc)) { + } else if (activeDc != null && metaCache.isCrossRegion(currentDcId, activeDc)) { psubPingActionCollectorsByClusterType.get(clusterType).forEach(collector -> { if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); }); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/CheckerProxyManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/CheckerProxyManager.java index 93ca9441dd..1419f22040 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/CheckerProxyManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/CheckerProxyManager.java @@ -6,7 +6,6 @@ import com.ctrip.xpipe.redis.checker.ProxyManager; import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderAware; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; -import com.ctrip.xpipe.redis.checker.model.DcClusterShard; import com.ctrip.xpipe.redis.checker.model.DcClusterShardPeer; import com.ctrip.xpipe.redis.checker.model.ProxyTunnelInfo; import com.ctrip.xpipe.redis.core.service.AbstractService; diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java index c9131baf96..c75d04b66b 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java @@ -6,7 +6,6 @@ import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; import com.ctrip.xpipe.redis.core.service.AbstractService; -import java.util.List; import java.util.Map; import java.util.Set; diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AllTests.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AllTests.java index ca57b01cd6..7e65c528e0 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AllTests.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AllTests.java @@ -17,12 +17,11 @@ import com.ctrip.xpipe.redis.checker.healthcheck.actions.delay.DelayActionTest; import com.ctrip.xpipe.redis.checker.healthcheck.actions.gtidgap.GtidGapCheckActionControllerTest; import com.ctrip.xpipe.redis.checker.healthcheck.actions.gtidgap.GtidGapCheckActionTest; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.CRDTDelayPingActionCollectorTest; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusTest; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HeteroHealthStatusTest; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.*; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.InstanceHealthStatusCollectorTest; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.InstanceHealthStatusConsistenceInspectorTest; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.InstanceStatusAdjustCommandTest; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.handler.DefaultOuterClientAggregatorTest; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.handler.TestAbstractHealthEventHandlerTest; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.processor.OuterClientServiceProcessorTest; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.processor.route.DefaultRouteHealthEventProcessorTest; @@ -219,7 +218,12 @@ DefaultHealthCheckConfigTest.class, DefaultRouteChooserTest.class, - DefaultHealthCheckInstanceManagerTest.class + DefaultHealthCheckInstanceManagerTest.class, + + DefaultOuterClientAggregatorTest.class, + DefaultDelayPingActionCollectorTest.class, + DefaultAggregatorPullServiceTest.class + }) public class AllTests { } \ No newline at end of file diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullServiceTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullServiceTest.java new file mode 100644 index 0000000000..5ba9edc749 --- /dev/null +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullServiceTest.java @@ -0,0 +1,130 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; + +import com.ctrip.xpipe.api.migration.OuterClientService; +import com.ctrip.xpipe.endpoint.HostPort; +import com.ctrip.xpipe.redis.checker.AbstractCheckerTest; +import com.ctrip.xpipe.redis.checker.CheckerService; +import com.ctrip.xpipe.redis.checker.RemoteCheckerManager; +import com.ctrip.xpipe.redis.checker.config.CheckerConfig; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder; +import com.ctrip.xpipe.redis.core.meta.MetaCache; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.when; + +/** + * @author lishanglin + * date 2024/10/15 + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class DefaultAggregatorPullServiceTest extends AbstractCheckerTest { + + @InjectMocks + private DefaultAggregatorPullService aggregatorPullService; + + @Mock + private OuterClientService outerClientService; + + @Mock + private RemoteCheckerManager remoteCheckerManager; + + @Mock + private CheckerService checkerService1; + + @Mock + private CheckerService checkerService2; + + @Mock + private MetaCache metaCache; + + @Mock + private CheckerConfig config; + + private HostPort hostPort1 = new HostPort("127.0.0.1", 6379); + private HostPort hostPort2 = new HostPort("127.0.0.1", 6380); + + @Before + public void setupDefaultAggregatorPullServiceTest() throws Exception { + aggregatorPullService.setExecutors(MoreExecutors.directExecutor()); + + when(remoteCheckerManager.getAllCheckerServices()).thenReturn(Arrays.asList(checkerService1, checkerService2)); + when(config.getQuorum()).thenReturn(2); + + Map outerClientState = new HashMap<>(); + outerClientState.put(hostPort1, true); + outerClientState.put(hostPort2, false); + when(outerClientService.batchQueryInstanceStatus(anyString(), anySet())).thenReturn(outerClientState); + when(metaCache.getDc(any())).thenReturn("jq"); + } + + @Test + public void testQuorumWithUndefined() throws Exception { + Set instances = Sets.newHashSet(hostPort1, hostPort2); + Map healthStatusDescMap = new HashMap<>(); + healthStatusDescMap.put(hostPort1, new HealthStatusDesc(hostPort1, HEALTH_STATE.HEALTHY)); + healthStatusDescMap.put(hostPort2, new HealthStatusDesc(hostPort1, HEALTH_STATE.UNKNOWN)); + + when(checkerService1.getAllClusterInstanceHealthStatus(instances)).thenReturn(healthStatusDescMap); + when(checkerService2.getAllClusterInstanceHealthStatus(instances)).thenReturn(healthStatusDescMap); + + DefaultAggregatorPullService.QueryXPipeInstanceStatusCmd cmd = aggregatorPullService.new QueryXPipeInstanceStatusCmd("cluster1", instances); + XPipeInstanceHealthHolder holder = cmd.execute().get(); + Map healthStatusMap = holder.getAllHealthStatus(2); + Assert.assertEquals(2, healthStatusDescMap.size()); + Assert.assertTrue(healthStatusMap.get(hostPort1)); + Assert.assertNull(healthStatusMap.get(hostPort2)); + } + + @Test + public void testMarkAggregate() throws Exception { + Set instances = Sets.newHashSet(hostPort1, hostPort2); + Map healthStatusDescMap = new HashMap<>(); + healthStatusDescMap.put(hostPort1, new HealthStatusDesc(hostPort1, HEALTH_STATE.DOWN)); + healthStatusDescMap.put(hostPort2, new HealthStatusDesc(hostPort1, HEALTH_STATE.DOWN)); + when(checkerService1.getAllClusterInstanceHealthStatus(instances)).thenReturn(healthStatusDescMap); + when(checkerService2.getAllClusterInstanceHealthStatus(instances)).thenReturn(healthStatusDescMap); + + Set hostPortDcStatuses = aggregatorPullService.getNeedAdjustInstances("cluster1", instances); + Assert.assertEquals(1, hostPortDcStatuses.size()); + + OuterClientService.HostPortDcStatus status = hostPortDcStatuses.iterator().next(); + Assert.assertEquals(hostPort1.getHost(), status.getHost()); + Assert.assertEquals(hostPort1.getPort(), status.getPort()); + Assert.assertEquals("jq", status.getDc()); + Assert.assertFalse(status.isCanRead()); + } + + @Test + public void testQuorumWithLastMark() throws Exception { + Set instances = Sets.newHashSet(hostPort1, hostPort2); + Map healthStatusDescMap = new HashMap<>(); + healthStatusDescMap.put(hostPort1, new HealthStatusDesc(hostPort1, HEALTH_STATE.DOWN, false)); + healthStatusDescMap.put(hostPort2, new HealthStatusDesc(hostPort1, HEALTH_STATE.HEALTHY, false)); + when(checkerService1.getAllClusterInstanceHealthStatus(instances)).thenReturn(healthStatusDescMap); + when(checkerService2.getAllClusterInstanceHealthStatus(instances)).thenReturn(healthStatusDescMap); + + Set hostPortDcStatuses = aggregatorPullService.getNeedAdjustInstances("cluster1", instances); + Assert.assertEquals(1, hostPortDcStatuses.size()); + + OuterClientService.HostPortDcStatus status = hostPortDcStatuses.iterator().next(); + Assert.assertEquals(hostPort2.getHost(), status.getHost()); + Assert.assertEquals(hostPort2.getPort(), status.getPort()); + Assert.assertEquals("jq", status.getDc()); + Assert.assertTrue(status.isCanRead()); + } + +} diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultDelayPingActionCollectorTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultDelayPingActionCollectorTest.java new file mode 100644 index 0000000000..2aaaf38aae --- /dev/null +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultDelayPingActionCollectorTest.java @@ -0,0 +1,70 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; + +import com.ctrip.xpipe.api.observer.Observer; +import com.ctrip.xpipe.endpoint.HostPort; +import com.ctrip.xpipe.redis.checker.AbstractCheckerTest; +import com.ctrip.xpipe.redis.checker.ClusterHealthManager; +import com.ctrip.xpipe.redis.checker.config.CheckerConfig; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.Mockito.when; + +/** + * @author lishanglin + * date 2024/10/15 + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class DefaultDelayPingActionCollectorTest extends AbstractCheckerTest { + + @InjectMocks + private DefaultDelayPingActionCollector delayPingActionCollector; + + @Mock + private ClusterHealthManager clusterHealthManager; + + @Mock + private CheckerConfig config; + + @Mock + private Observer observer; + + private RedisHealthCheckInstance instance; + + private int lastMarkTimeout = 20; + + @Before + public void setupDefaultDelayPingActionCollectorTest() throws Exception { + delayPingActionCollector.setScheduled(scheduled); + when(clusterHealthManager.createHealthStatusObserver()).thenReturn(observer); + + instance = newRandomRedisHealthCheckInstance(6379); + delayPingActionCollector.createOrGetHealthStatus(instance); + + when(config.getMarkInstanceMaxDelayMilli()).thenReturn(lastMarkTimeout/2); + when(config.getCheckerMetaRefreshIntervalMilli()).thenReturn(lastMarkTimeout/2); + } + + @Test + public void setGetLastMarkTest() { + HealthStatusDesc statusDesc = delayPingActionCollector.getHealthStatusDesc(new HostPort("10.0.0.1", 10)); + Assert.assertEquals(HEALTH_STATE.UNKNOWN, statusDesc.getState()); + Assert.assertNull(statusDesc.getLastMarkHandled()); + + HostPort hostPort = new HostPort(instance.getEndpoint().getHost(), instance.getEndpoint().getPort()); + delayPingActionCollector.updateLastMarkHandled(hostPort, true); + statusDesc = delayPingActionCollector.getHealthStatusDesc(hostPort); + Assert.assertTrue(statusDesc.getLastMarkHandled()); + + sleep(lastMarkTimeout + 1); + statusDesc = delayPingActionCollector.getHealthStatusDesc(hostPort); + Assert.assertNull(statusDesc.getLastMarkHandled()); + } + +} diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregatorTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregatorTest.java index 72df8e05fb..c0e89ddb1a 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregatorTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregatorTest.java @@ -1,38 +1,41 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.handler; -import com.ctrip.xpipe.concurrent.DefaultExecutorFactory; +import com.ctrip.xpipe.api.migration.OuterClientService; import com.ctrip.xpipe.endpoint.ClusterShardHostPort; import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.AggregatorPullService; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStateService; import com.ctrip.xpipe.utils.XpipeThreadFactory; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.*; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; -@RunWith(SpringJUnit4ClassRunner.class) +@RunWith(MockitoJUnitRunner.Silent.class) public class DefaultOuterClientAggregatorTest { @InjectMocks private DefaultOuterClientAggregator outerClientAggregator; - @Mock - private ScheduledExecutorService scheduled; - @Mock - private ExecutorService executors; @Mock private CheckerConfig checkerConfig; @Mock private AggregatorPullService aggregatorPullService; + @Mock + private HealthStateService healthStateService; private final HostPort hostPort1 = new HostPort("127.0.0.1", 6379); @@ -44,29 +47,22 @@ public class DefaultOuterClientAggregatorTest { private ClusterShardHostPort info1, info2, info3; - private int pullIntervalSeconds = 3; + private int baseDelay = 3; - private int pullRandomSeconds = 3; + private int maxDelay = 6; @Before public void beforeDefaultOuterClientAggregatorTest() { info1 = new ClusterShardHostPort(cluster1, null, hostPort1); info2 = new ClusterShardHostPort(cluster1, null, hostPort2); info3 = new ClusterShardHostPort(cluster1, null, hostPort3); - when(checkerConfig.getMarkInstanceBaseDelayMilli()).thenReturn(pullIntervalSeconds * 1000); - when(checkerConfig.getMarkInstanceMaxDelayMilli()).thenReturn(pullRandomSeconds * 1000); + when(checkerConfig.getMarkInstanceBaseDelayMilli()).thenReturn(baseDelay * 1000); + when(checkerConfig.getMarkInstanceMaxDelayMilli()).thenReturn(maxDelay * 1000); outerClientAggregator.setScheduled(MoreExecutors.getExitingScheduledExecutorService( new ScheduledThreadPoolExecutor(1, XpipeThreadFactory.create("DefaultOuterClientAggregatorTest")), 5, TimeUnit.SECONDS )); - } - - @Test - public void testRandomMile() { - for (int i = 0; i < 100000; i++) { - long randomMill = outerClientAggregator.randomMill(); - assertTrue(randomMill >= pullIntervalSeconds * 1000L && randomMill < (pullIntervalSeconds + pullRandomSeconds) * 1000L); - } + outerClientAggregator.setHealthStateServices(Collections.singletonList(healthStateService)); } @Test @@ -74,7 +70,25 @@ public void testMarkInstance() { outerClientAggregator.markInstance(info1); outerClientAggregator.markInstance(info2); outerClientAggregator.markInstance(info3); + } + @Test + public void testAggregate() throws Exception { + Set toMarkInstances = Sets.newHashSet( + new OuterClientService.HostPortDcStatus(hostPort1.getHost(), hostPort1.getPort(), "jq", true), + new OuterClientService.HostPortDcStatus(hostPort2.getHost(), hostPort2.getPort(), "jq", false) + ); + when(aggregatorPullService.getNeedAdjustInstances(anyString(), anySet())).thenReturn(toMarkInstances); + + DefaultOuterClientAggregator.AggregatorCheckAndSetTask aggregateTask = + outerClientAggregator.new AggregatorCheckAndSetTask(cluster1, Sets.newHashSet(hostPort1, hostPort2, hostPort3)); + aggregateTask.execute().get(); + + Mockito.verify(healthStateService, times(2)).updateLastMarkHandled(any(), anyBoolean()); + Mockito.verify(healthStateService).updateLastMarkHandled(hostPort1, true); + Mockito.verify(healthStateService).updateLastMarkHandled(hostPort2, false); + + Mockito.verify(aggregatorPullService).doMarkInstances(cluster1, toMarkInstances); } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/ConsoleCachedHealthStateService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/ConsoleCachedHealthStateService.java index b8f0617c0c..106cd965b9 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/ConsoleCachedHealthStateService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/ConsoleCachedHealthStateService.java @@ -3,6 +3,7 @@ import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStateService; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; import com.ctrip.xpipe.redis.checker.spring.ConsoleServerMode; import com.ctrip.xpipe.redis.checker.spring.ConsoleServerModeCondition; import com.google.common.collect.Maps; @@ -34,4 +35,18 @@ public Map getAllCachedState() { public void updateHealthState(Map redisStates) { cachedRedisStates.putAll(redisStates); } + + @Override + public HealthStatusDesc getHealthStatusDesc(HostPort hostPort) { + if (cachedRedisStates.containsKey(hostPort)) { + return new HealthStatusDesc(hostPort, cachedRedisStates.get(hostPort)); + } else { + return new HealthStatusDesc(hostPort, HEALTH_STATE.UNKNOWN); + } + } + + @Override + public void updateLastMarkHandled(HostPort hostPort, boolean lastMark) { + // do nothing + } }