Skip to content

Commit

Permalink
merge mark request from multi checkers
Browse files Browse the repository at this point in the history
  • Loading branch information
lishanglin committed Oct 15, 2024
1 parent a181ca9 commit e5663cd
Show file tree
Hide file tree
Showing 21 changed files with 466 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
import com.ctrip.xpipe.redis.checker.RedisInfoManager;
import com.ctrip.xpipe.redis.checker.controller.result.ActionContextRetMessage;
import com.ctrip.xpipe.redis.checker.healthcheck.*;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultPsubPingActionCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.*;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats.KeeperFlowCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.AbstractRedisConfigRuleAction;
Expand Down Expand Up @@ -157,12 +154,14 @@ public Map<HostPort, HealthStatusDesc> getAllCrossRegionHealthStatusDesc() {
@RequestMapping(value = "/health/check/instances/status", method = RequestMethod.POST)
public Map<HostPort, HealthStatusDesc> getHealthCheckInstanceCluster(@RequestBody List<HostPort> hostPorts) {
if (hostPorts == null || hostPorts.isEmpty()) return Collections.emptyMap();
if (siteStability.isSiteStable()) return Collections.emptyMap();

Map<HostPort, HealthStatusDesc> result = new HashMap<>();
for (HostPort hostPort : hostPorts) {
if (Objects.equals(currentDc, metaCache.getDc(hostPort)) && metaCache.isCrossRegion(metaCache.getActiveDc(hostPort), currentDc)) {
result.put(hostPort, new HealthStatusDesc(hostPort, getCrossRegionHealthState(hostPort.getHost(), hostPort.getPort())));
result.put(hostPort, defaultPsubPingActionCollector.getHealthStatusDesc(hostPort));
} else {
result.put(hostPort, new HealthStatusDesc(hostPort, getHealthState(hostPort.getHost(), hostPort.getPort())));
result.put(hostPort, defaultDelayPingActionCollector.getHealthStatusDesc(hostPort));
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction;

import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext;
import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
Expand Down Expand Up @@ -34,6 +35,14 @@ protected void removeHealthStatus(HealthCheckAction action) {
}
}

protected HealthStatus getHealthStatus(HostPort hostPort) {
RedisHealthCheckInstance key = allHealthStatus.keySet().stream()
.filter(instance -> instance.getCheckInfo().getHostPort().equals(hostPort))
.findFirst().orElse(null);

return null == key ? null : allHealthStatus.get(key);
}

@Override
public boolean supportInstance(RedisHealthCheckInstance instance) {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction;

import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext;
import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
Expand Down Expand Up @@ -44,6 +45,14 @@ public PsubActionListener createPsubActionListener() {
return psubActionListener;
}

protected HealthStatus getHealthStatus(HostPort hostPort) {
RedisHealthCheckInstance key = allHealthStatus.keySet().stream()
.filter(instance -> instance.getCheckInfo().getHostPort().equals(hostPort))
.findFirst().orElse(null);

return null == key ? null : allHealthStatus.get(key);
}

protected class CollectorPingActionListener implements PingActionListener {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.XpipeThreadFactory;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -38,7 +39,7 @@ public class DefaultAggregatorPullService implements AggregatorPullService{
private OuterClientService outerClientService = OuterClientService.DEFAULT;
private static final Logger logger = LoggerFactory.getLogger(DefaultAggregatorPullService.class);

private ExecutorService executors;
private Executor executors;

@PostConstruct
public void postConstruct() {
Expand All @@ -54,17 +55,30 @@ public Set<HostPortDcStatus> getNeedAdjustInstances(String cluster, Set<HostPort
QueryXPipeInstanceStatusCmd queryXPipeInstanceStatusCmd = new QueryXPipeInstanceStatusCmd(cluster, instances);
QueryOuterClintInstanceStatusCmd queryOuterClintInstanceStatusCmd = new QueryOuterClintInstanceStatusCmd(cluster, instances);

CommandFuture<Map<HostPort, Boolean>> xpipeQueryFuture = queryXPipeInstanceStatusCmd.execute(executors);
CommandFuture<XPipeInstanceHealthHolder> xpipeQueryFuture = queryXPipeInstanceStatusCmd.execute(executors);
CommandFuture<Map<HostPort, Boolean>> outerClientQueryFuture = queryOuterClintInstanceStatusCmd.execute(executors);

Map<HostPort, Boolean> xpipeAllHealthStatus = xpipeQueryFuture.get();
XPipeInstanceHealthHolder xpipeInstanceHealthHolder = xpipeQueryFuture.get();
Map<HostPort, Boolean> outerClientAllHealthStatus = outerClientQueryFuture.get();
Map<HostPort, Boolean> xpipeAllHealthStatus = xpipeInstanceHealthHolder.getAllHealthStatus(checkerConfig.getQuorum());
Map<HostPort, Boolean> lastMarks = xpipeInstanceHealthHolder.getOtherCheckerLastMark();

for (Map.Entry<HostPort, Boolean> entry : xpipeAllHealthStatus.entrySet()) {
if (!outerClientAllHealthStatus.containsKey(entry.getKey())
|| !entry.getValue().equals(outerClientAllHealthStatus.get(entry.getKey()))) {
instanceNeedAdjust.add(
new HostPortDcStatus(entry.getKey().getHost(), entry.getKey().getPort(), metaCache.getDc(entry.getKey()), entry.getValue()));
HostPort instance = entry.getKey();
Boolean expectMark = entry.getValue();
if (null == expectMark) {
logger.info("[getNeedAdjustInstances][unknown host] {}", instance);
continue;
}

if (!outerClientAllHealthStatus.containsKey(instance)
|| !expectMark.equals(outerClientAllHealthStatus.get(instance))) {
if (lastMarks.containsKey(instance) && expectMark.equals(lastMarks.get(instance))) {
logger.info("[getNeedAdjustInstances][otherCheckerMark][{}-{}] skip", instance, expectMark);
continue;
}
instanceNeedAdjust.add(new HostPortDcStatus(instance.getHost(), instance.getPort(),
metaCache.getDc(instance), expectMark));
}
}
return instanceNeedAdjust;
Expand Down Expand Up @@ -95,7 +109,7 @@ private void alertMarkInstance(String clusterName, Set<HostPortDcStatus> instanc
}
}

private class QueryXPipeInstanceStatusCmd extends AbstractCommand<Map<HostPort, Boolean>> {
protected class QueryXPipeInstanceStatusCmd extends AbstractCommand<XPipeInstanceHealthHolder> {

private String cluster;

Expand All @@ -112,7 +126,7 @@ protected void doExecute() throws Throwable {
for (CheckerService checkerService : remoteCheckerManager.getAllCheckerServices()) {
xpipeInstanceHealthHolder.add(checkerService.getAllClusterInstanceHealthStatus(instances));
}
future().setSuccess(xpipeInstanceHealthHolder.getAllHealthStatus(checkerConfig.getQuorum()));
future().setSuccess(xpipeInstanceHealthHolder);
}

@Override
Expand All @@ -125,7 +139,7 @@ public String getName() {
}
}

private class QueryOuterClintInstanceStatusCmd extends AbstractCommand<Map<HostPort, Boolean>> {
protected class QueryOuterClintInstanceStatusCmd extends AbstractCommand<Map<HostPort, Boolean>> {

private String cluster;

Expand All @@ -151,4 +165,9 @@ public String getName() {
}
}

@VisibleForTesting
protected void setExecutors(Executor executors) {
this.executors = executors;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.ctrip.xpipe.redis.checker.ClusterHealthManager;
import com.ctrip.xpipe.redis.checker.alert.ALERT_TYPE;
import com.ctrip.xpipe.redis.checker.alert.AlertManager;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckInstanceManager;
import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
Expand All @@ -21,6 +22,7 @@
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.processor.HealthEventProcessor;
import com.ctrip.xpipe.utils.MapUtils;
import com.ctrip.xpipe.utils.StringUtil;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -66,6 +68,9 @@ public class DefaultDelayPingActionCollector extends AbstractDelayPingActionColl
@Autowired
private AlertManager alertManager;

@Autowired
private CheckerConfig config;

private OuterClientService outerClientService = OuterClientService.DEFAULT;

private FinalStateSetterManager<ClusterShardHostPort, Boolean> finalStateSetterManager;
Expand Down Expand Up @@ -152,6 +157,25 @@ public HEALTH_STATE getHealthState(HostPort hostPort) {
return null;
}

@Override
public HealthStatusDesc getHealthStatusDesc(HostPort hostPort) {
HealthStatus status = getHealthStatus(hostPort);
if (null != status) {
long timeoutMill = config.getMarkInstanceMaxDelayMilli() + config.getCheckerMetaRefreshIntervalMilli();
return new HealthStatusDesc(hostPort, status, status.getLastMarkHandled(timeoutMill));
} else {
return new HealthStatusDesc(hostPort, HEALTH_STATE.UNKNOWN);
}
}

@Override
public void updateLastMarkHandled(HostPort hostPort, boolean lastMark) {
HealthStatus status = getHealthStatus(hostPort);
if (null != status) {
status.updateLastMarkHandled(lastMark);
}
}

@Override
public Map<HostPort, HEALTH_STATE> getAllCachedState() {
Map<HostPort, HEALTH_STATE> cachedHealthStatus = new HashMap<>();
Expand Down Expand Up @@ -207,4 +231,9 @@ private boolean activeDcCheckerSubscribeMasterTypeInstance(RedisHealthCheckInsta
return ClusterType.lookup(azGroupType) == ClusterType.SINGLE_DC;
}

@VisibleForTesting
protected void setScheduled(ScheduledExecutorService scheduled) {
this.scheduled = scheduled;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.ctrip.xpipe.api.observer.Observer;
import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo;
Expand Down Expand Up @@ -35,6 +36,9 @@ public class DefaultPsubPingActionCollector extends AbstractPsubPingActionCollec
@Autowired
private List<HealthEventProcessor> healthEventProcessors;

@Autowired
private CheckerConfig config;

@Resource(name = SCHEDULED_EXECUTOR)
private ScheduledExecutorService scheduled;

Expand All @@ -51,6 +55,25 @@ public HEALTH_STATE getHealthState(HostPort hostPort) {
return HEALTH_STATE.UNKNOWN;
}

@Override
public HealthStatusDesc getHealthStatusDesc(HostPort hostPort) {
HealthStatus status = getHealthStatus(hostPort);
if (null != status) {
long timeoutMill = config.getMarkInstanceMaxDelayMilli() + config.getCheckerMetaRefreshIntervalMilli();
return new HealthStatusDesc(hostPort, status, status.getLastMarkHandled(timeoutMill));
} else {
return new HealthStatusDesc(hostPort, HEALTH_STATE.UNKNOWN);
}
}

@Override
public void updateLastMarkHandled(HostPort hostPort, boolean lastMark) {
HealthStatus status = getHealthStatus(hostPort);
if (null != status) {
status.updateLastMarkHandled(lastMark);
}
}

@Override
public Map<HostPort, HEALTH_STATE> getAllCachedState() {
Map<HostPort, HEALTH_STATE> cachedHealthStatus = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ public interface HealthStateService {

HEALTH_STATE getHealthState(HostPort hostPort);

HealthStatusDesc getHealthStatusDesc(HostPort hostPort);

Map<HostPort, HEALTH_STATE> getAllCachedState();

void updateHealthState(Map<HostPort, HEALTH_STATE> redisStates);

void updateLastMarkHandled(HostPort hostPort, boolean lastMark);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.IntSupplier;

/**
Expand Down Expand Up @@ -47,6 +49,12 @@ public class HealthStatus extends AbstractObservable implements Startable, Stopp
protected static final String currentDcId = FoundationService.DEFAULT.getDataCenter();
protected static Logger delayLogger = LoggerFactory.getLogger(HealthStatus.class.getName() + ".delay");

private ReadWriteLock lastMarkHandledLock = new ReentrantReadWriteLock();

private volatile boolean lastMarkHandled = false;

private volatile long lastMarkHandledTime = -1;

public HealthStatus(RedisHealthCheckInstance instance, ScheduledExecutorService scheduled){
this.instance = instance;
this.scheduled = scheduled;
Expand Down Expand Up @@ -275,4 +283,38 @@ public long getLastHealthyDelayTime() {
return lastHealthDelayTime.get();
}

public void updateLastMarkHandled(boolean handleMarkUp) {
try {
lastMarkHandledLock.writeLock().lock();
this.lastMarkHandled = handleMarkUp;
this.lastMarkHandledTime = System.currentTimeMillis();
} finally {
lastMarkHandledLock.writeLock().unlock();
}
}

public Boolean getLastMarkHandled(long timeoutMill) {
try {
lastMarkHandledLock.readLock().lock();
long current = System.currentTimeMillis();
if (lastMarkHandledTime > current) {
resetLastMarkHandled();
} else if (lastMarkHandledTime > 0 && lastMarkHandledTime + timeoutMill > current) {
return lastMarkHandled;
}
} finally {
lastMarkHandledLock.readLock().unlock();
}
return null;
}

private void resetLastMarkHandled() {
try {
lastMarkHandledLock.writeLock().lock();
this.lastMarkHandledTime = -1;
} finally {
lastMarkHandledLock.writeLock().unlock();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class HealthStatusDesc {

private HEALTH_STATE state;

private Boolean lastMarkHandled = null;

private long lastPongTime = -1;

private long lastHealthDelayTime = -1;
Expand All @@ -25,11 +27,22 @@ public HealthStatusDesc(HostPort hostPort, HEALTH_STATE state) {
this.state = state;
}

public HealthStatusDesc(HostPort hostPort, HEALTH_STATE state, Boolean lastMarkHandled) {
this.hostPort = hostPort;
this.state = state;
this.lastMarkHandled = lastMarkHandled;
}

public HealthStatusDesc(HostPort hostPort, HealthStatus status) {
this(hostPort, status, null);
}

public HealthStatusDesc(HostPort hostPort, HealthStatus status, Boolean lastMarkHandled) {
this.hostPort = hostPort;
this.state = status.getState();
this.lastPongTime = status.getLastPongTime();
this.lastHealthDelayTime = status.getLastHealthyDelayTime();
this.lastMarkHandled = lastMarkHandled;
}

public HostPort getHostPort() {
Expand All @@ -48,4 +61,7 @@ public long getLastHealthDelayTime() {
return lastHealthDelayTime;
}

public Boolean getLastMarkHandled() {
return lastMarkHandled;
}
}
Loading

0 comments on commit e5663cd

Please sign in to comment.