Skip to content

Commit

Permalink
Merge pull request #922 from ctripcorp/feature/keeper_fresh_rdb_dumper
Browse files Browse the repository at this point in the history
keeper support freshRdbPsync
  • Loading branch information
LanternLee authored Dec 31, 2024
2 parents 8222d72 + 01c5773 commit a1edbe9
Show file tree
Hide file tree
Showing 20 changed files with 535 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public interface Psync extends Command<Object>, Closeable {
String FULL_SYNC = "FULLRESYNC";
String PARTIAL_SYNC = "CONTINUE";
long KEEPER_PARTIAL_SYNC_OFFSET = -2;
long KEEPER_FRESH_RDB_SYNC_OFFSET = -3;

void addPsyncObserver(PsyncObserver observer);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.ctrip.xpipe.redis.core.protocal.cmd;

import com.ctrip.xpipe.api.pool.SimpleObjectPool;
import com.ctrip.xpipe.netty.commands.NettyClient;
import com.ctrip.xpipe.redis.core.store.ReplicationStore;
import com.ctrip.xpipe.tuple.Pair;

import java.util.concurrent.ScheduledExecutorService;

public class FreshRdbOnlyPsync extends RdbOnlyPsync {

public FreshRdbOnlyPsync(SimpleObjectPool<NettyClient> clientPool, ReplicationStore store, ScheduledExecutorService scheduled) {
super(clientPool, store, scheduled);
}

@Override
protected Pair<String, Long> getRequestMasterInfo() {
// psync ? -3
return new Pair<>("?", KEEPER_FRESH_RDB_SYNC_OFFSET);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.ctrip.xpipe.redis.core.protocal.cmd;

import com.ctrip.xpipe.api.command.CommandFuture;
import com.ctrip.xpipe.api.command.CommandFutureListener;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.endpoint.DefaultEndPoint;
import com.ctrip.xpipe.netty.NettyPoolUtil;
import com.ctrip.xpipe.redis.core.AbstractRedisTest;
import com.ctrip.xpipe.redis.core.protocal.PsyncObserver;
import com.ctrip.xpipe.redis.core.protocal.protocal.EofType;
import com.ctrip.xpipe.redis.core.redis.RunidGenerator;
import com.ctrip.xpipe.redis.core.store.MetaStore;
import com.ctrip.xpipe.redis.core.store.RdbStore;
import com.ctrip.xpipe.redis.core.store.ReplicationStore;
import com.ctrip.xpipe.redis.core.store.ReplicationStoreManager;
import com.ctrip.xpipe.simpleserver.Server;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.Silent.class)
public class FreshRdbOnlyPsyncTest extends AbstractRedisTest {

@Mock
private ReplicationStore replicationStore;

@Mock
private MetaStore metaStore;

@Before
public void beforeDefaultPsyncTest() throws Exception{
when(replicationStore.getMetaStore()).thenReturn(metaStore);
when(replicationStore.getEndOffset()).thenReturn(-1L);

}

@Test
public void testFreshRdbOnlyPsync() throws Exception {
String replId = RunidGenerator.DEFAULT.generateRunid();
int offset = 100;
Server redisServer = startServer(randomPort(), new Function<String, String>() {
@Override
public String apply(String s) {
logger.info("[testFreshRdbOnlyPsync] {}", s);
if (s.trim().equals("psync ? -3")) {
return String.format("+FULLRESYNC %s %d\r\n", replId, offset);
} else {
return "+OK\r\n";
}
}
});
Endpoint redisEndpoint = new DefaultEndPoint("127.0.0.1", redisServer.getPort());
FreshRdbOnlyPsync psync = new FreshRdbOnlyPsync(NettyPoolUtil.createNettyPool(redisEndpoint), replicationStore, scheduled);

CountDownLatch latch = new CountDownLatch(1);
AtomicInteger masetrOffset = new AtomicInteger(0);
psync.addPsyncObserver(new PsyncObserver() {
@Override
public void onFullSync(long masterRdbOffset) {
masetrOffset.set((int)masterRdbOffset);
latch.countDown();
}
@Override
public void reFullSync() {
}
@Override
public void beginWriteRdb(EofType eofType, String replId, long masterRdbOffset) throws IOException {
}
@Override
public void endWriteRdb() {
}
@Override
public void onContinue(String requestReplId, String responseReplId) {
}
@Override
public void onKeeperContinue(String replId, long beginOffset) {
}
@Override
public void readAuxEnd(RdbStore rdbStore, Map<String, String> auxMap) {
}
});
psync.execute().addListener(new CommandFutureListener<Object>() {

@Override
public void operationComplete(CommandFuture<Object> commandFuture) throws Exception {
if(!commandFuture.isSuccess()){
logger.error("[operationComplete]", commandFuture.cause());
}
}
});

latch.await(1000, TimeUnit.SECONDS);
Assert.assertEquals(offset, masetrOffset.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ public static enum PROMOTION_STATE{

}

void fullSyncToSlave(RedisSlave redisSlave) throws IOException;
default void fullSyncToSlave(RedisSlave redisSlave) throws IOException {
fullSyncToSlave(redisSlave, false);
}

void fullSyncToSlave(RedisSlave redisSlave, boolean freshRdbNeeded) throws IOException;

void startIndexing() throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface RedisMaster extends RedisRole, Lifecycle, LifecycleStateAware,

void reconnect();

RdbDumper createRdbDumper(boolean tryRrodb) throws CreateRdbDumperException;
RdbDumper createRdbDumper(boolean tryRrodb, boolean freshRdbNeeded) throws CreateRdbDumperException;

MASTER_STATE getMasterState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface RedisSlave extends RedisClient<RedisKeeperServer>, PartialAware
void markPsyncProcessed();

/**
* If psync ? -1, slave start with no data, we should fsync immediately
* If psync ? -1 or psync xxxx 1, slave start with no data, we should fsync immediately
*/
void markColdStart();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.ctrip.xpipe.redis.keeper.exception.psync;

import com.ctrip.xpipe.redis.keeper.monitor.PsyncFailReason;

public class KeeperTolerantClosePsyncException extends PsyncRuntimeException {

public KeeperTolerantClosePsyncException(PsyncRuntimeException e) {
super("keeper tolerant:" + e.getMessage(), e);
}

@Override
public PsyncFailReason toReason() {
Throwable cause = getCause();
if (cause instanceof PsyncRuntimeException) return ((PsyncRuntimeException) cause).toReason();
else return super.toReason();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public void run() {
protected abstract void innerDoHandle(final String[] args, final RedisSlave redisSlave, RedisKeeperServer redisKeeperServer) throws IOException;

protected void doFullSync(RedisSlave redisSlave) {
doFullSync(redisSlave, false);
}

protected void doFullSync(RedisSlave redisSlave, boolean freshRdbNeeded) {

try {
if(logger.isInfoEnabled()){
Expand All @@ -83,7 +87,7 @@ protected void doFullSync(RedisSlave redisSlave) {
String alert = String.format("FULL(M)<-%s[%s]", redisSlave.metaInfo(), redisKeeperServer.getReplId());
EventMonitor.DEFAULT.logAlertEvent(alert);

redisKeeperServer.fullSyncToSlave(redisSlave);
redisKeeperServer.fullSyncToSlave(redisSlave, freshRdbNeeded);
redisKeeperServer.getKeeperMonitor().getKeeperStats().increaseFullSync();
} catch (IOException e) {
logger.error("[doFullSync][close client]" + redisSlave, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.io.IOException;

import static com.ctrip.xpipe.redis.core.protocal.Psync.KEEPER_FRESH_RDB_SYNC_OFFSET;
import static com.ctrip.xpipe.redis.core.protocal.Psync.KEEPER_PARTIAL_SYNC_OFFSET;

/**
Expand All @@ -36,14 +37,17 @@ protected void innerDoHandle(final String[] args, final RedisSlave redisSlave, R

Long offsetRequest = Long.valueOf(args[1]);
String replIdRequest = args[0];

if(replIdRequest.equals("?")){
if (replIdRequest.equals("?") || offsetRequest == 1) {
redisSlave.markColdStart();
}

if(replIdRequest.equals("?")){
if (redisSlave.isKeeper() && offsetRequest.equals(KEEPER_PARTIAL_SYNC_OFFSET) && null != keeperRepl.replId()) {
logger.info("[innerDoHandler][keeper psync]");
long continueOffset = keeperRepl.getEndOffset() + 1; // continue from next byte
doKeeperPartialSync(redisSlave, keeperRepl.replId(), continueOffset);
} else if (redisSlave.isKeeper() && offsetRequest.equals(KEEPER_FRESH_RDB_SYNC_OFFSET)) {
doFullSync(redisSlave, true);
} else {
doFullSync(redisSlave);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,17 +770,17 @@ public void promoteSlave(String ip, int port) throws RedisSlavePromotionExceptio
}

@Override
public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException {
public void fullSyncToSlave(final RedisSlave redisSlave, boolean freshRdbNeeded) throws IOException {

logger.info("[fullSyncToSlave]{}, {}", redisSlave, rdbDumper.get());

if (crossRegion.get() && !tryFullSyncToSlaveWithOthers(redisSlave)) {
if (crossRegion.get() && !redisSlave.isKeeper() && !tryFullSyncToSlaveWithOthers(redisSlave)) {
redisSlave.waitForSeqFsync();
return;
}

boolean tryRordb = false; // slave and master all support rordb or not
if (redisSlave.capaOf(CAPA.RORDB) ) {
if (redisSlave.capaOf(CAPA.RORDB)) {
try {
logger.info("[fullSyncToSlave][rordb]{}", redisSlave);
tryRordb = keeperRedisMaster.checkMasterSupportRordb().get();
Expand All @@ -792,28 +792,30 @@ public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException {

if(rdbDumper.get() == null){
logger.info("[fullSyncToSlave][dumper null]{}", redisSlave);
FullSyncListener fullSyncListener = new DefaultFullSyncListener(redisSlave);
FULLSYNC_FAIL_CAUSE failCause = getCurrentReplicationStore().fullSyncIfPossible(fullSyncListener, tryRordb);
if(null != failCause){
if (FULLSYNC_PROGRESS_NOT_SUPPORTED.equals(failCause)) {
if (!freshRdbNeeded) {
FullSyncListener fullSyncListener = new DefaultFullSyncListener(redisSlave);
FULLSYNC_FAIL_CAUSE failCause = getCurrentReplicationStore().fullSyncIfPossible(fullSyncListener, tryRordb);
if (null == failCause) {
return;
} else if (FULLSYNC_PROGRESS_NOT_SUPPORTED.equals(failCause)) {
logger.info("[fullSyncToSlave][progress not support][cancel slave]");
redisSlave.close();
return;
}
}

try{
RdbDumper newDumper = dumpNewRdb(tryRordb);
redisSlave.waitForRdbDumping();
if (newDumper.future().isDone() && !newDumper.future().isSuccess()) {
logger.info("[fullSyncToSlave][new dumper fail immediatelly]");
redisSlave.close();
}
}catch(AbstractRdbDumperException e){
logger.error("[fullSyncToSlave]", e);
if(e.isCancelSlave()){
logger.info("[fullSyncToSlave][cancel slave]");
redisSlave.close();
}
try{
RdbDumper newDumper = dumpNewRdb(tryRordb, freshRdbNeeded);
redisSlave.waitForRdbDumping();
if (newDumper.future().isDone() && !newDumper.future().isSuccess()) {
logger.info("[fullSyncToSlave][new dumper fail immediatelly]");
redisSlave.close();
}
}catch(AbstractRdbDumperException e){
logger.error("[fullSyncToSlave]", e);
if(e.isCancelSlave()){
logger.info("[fullSyncToSlave][cancel slave]");
redisSlave.close();
}
}
}else{
Expand All @@ -822,7 +824,6 @@ public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException {
}

private synchronized boolean tryFullSyncToSlaveWithOthers(RedisSlave redisSlave) {
if (redisSlave.isKeeper()) return true;
if (loadingSlaves.contains(redisSlave)) return true;

int maxConcurrentLoadingSlaves = keeperConfig.getCrossRegionMaxLoadingSlavesCnt();
Expand Down Expand Up @@ -880,8 +881,12 @@ public boolean isStartIndexing() {
}

private RdbDumper dumpNewRdb(boolean tryRordb) throws CreateRdbDumperException, SetRdbDumperException {
return dumpNewRdb(tryRordb, false);
}

private RdbDumper dumpNewRdb(boolean tryRordb, boolean freshRdbNeeded) throws CreateRdbDumperException, SetRdbDumperException {

RdbDumper rdbDumper = keeperRedisMaster.createRdbDumper(tryRordb);
RdbDumper rdbDumper = keeperRedisMaster.createRdbDumper(tryRordb, freshRdbNeeded);
setRdbDumper(rdbDumper);
rdbDumper.execute();
return rdbDumper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,13 @@ public PARTIAL_STATE partialState() {


@Override
public RdbDumper createRdbDumper(boolean tryRrodb) throws CreateRdbDumperException {
public RdbDumper createRdbDumper(boolean tryRrodb, boolean freshRdbNeeded) throws CreateRdbDumperException {

if(masterState != MASTER_STATE.REDIS_REPL_CONNECTED){
logger.info("[createRdbDumper][master state not connected, dumper not allowed]{}", redisMasterReplication);
throw new CreateRdbDumperException(this, "master state not connected, dumper not allowed:" + masterState);
}
return new RedisMasterNewRdbDumper(this, redisKeeperServer, tryRrodb, rdbOnlyEventLoopGroup, scheduled, keeperResourceManager);
return new RedisMasterNewRdbDumper(this, redisKeeperServer, tryRrodb, freshRdbNeeded, rdbOnlyEventLoopGroup, scheduled, keeperResourceManager);
}

public MASTER_STATE getMasterState() {
Expand Down
Loading

0 comments on commit a1edbe9

Please sign in to comment.