From 6eaf416ee22dba3c2f34c4fc0e7634a4d28c41f3 Mon Sep 17 00:00:00 2001 From: Baoyi Chen Date: Sat, 13 Mar 2021 10:34:12 +0800 Subject: [PATCH] v0.6.2 --- CHANGELOG.md | 4 + pom.xml | 2 +- .../cli/ext/AbstractMigrateRdbVisitor.java | 112 +++++++++++++++--- .../rdb/cli/ext/DumpRawByteListener.java | 10 +- .../redis/rdb/cli/ext/rct/DumpRdbVisitor.java | 70 ++++++++++- src/main/resources/.version | 2 +- 6 files changed, 172 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0780f5ab..1f31018a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### 0.6.2 + +support downgrade migration from redis 6.2 to 2.8. + ### 0.6.1 Fix OOM bug when use `RawByteListener`. diff --git a/pom.xml b/pom.xml index 49b3a9db..7d679727 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ com.moilioncircle redis-replicator - 3.5.2 + 3.6.0 org.slf4j diff --git a/src/main/java/com/moilioncircle/redis/rdb/cli/ext/AbstractMigrateRdbVisitor.java b/src/main/java/com/moilioncircle/redis/rdb/cli/ext/AbstractMigrateRdbVisitor.java index a3008712..2f473f9e 100644 --- a/src/main/java/com/moilioncircle/redis/rdb/cli/ext/AbstractMigrateRdbVisitor.java +++ b/src/main/java/com/moilioncircle/redis/rdb/cli/ext/AbstractMigrateRdbVisitor.java @@ -16,6 +16,10 @@ package com.moilioncircle.redis.rdb.cli.ext; +import static com.moilioncircle.redis.replicator.Constants.RDB_LOAD_NONE; +import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_LIST; +import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_ZSET; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; @@ -28,8 +32,11 @@ import com.moilioncircle.redis.replicator.Replicator; import com.moilioncircle.redis.replicator.event.Event; import com.moilioncircle.redis.replicator.io.RedisInputStream; +import com.moilioncircle.redis.replicator.rdb.BaseRdbEncoder; +import com.moilioncircle.redis.replicator.rdb.BaseRdbParser; import com.moilioncircle.redis.replicator.rdb.datatype.ContextKeyValuePair; import com.moilioncircle.redis.replicator.rdb.dump.datatype.DumpKeyValuePair; +import com.moilioncircle.redis.replicator.util.ByteArray; /** * @author Baoyi Chen @@ -116,17 +123,48 @@ protected Event doApplyZSet(RedisInputStream in, int version, byte[] key, boolea @Override protected Event doApplyZSet2(RedisInputStream in, int version, byte[] key, boolean contains, int type, ContextKeyValuePair context) throws IOException { int ver = configure.getDumpRdbVersion() == -1 ? version : configure.getDumpRdbVersion(); - try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) { - try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o, raw)) { - listener.write((byte) type); - super.doApplyZSet2(in, version, key, contains, type, context); + if (ver < 8 /* since redis rdb version 8 */) { + // downgrade to RDB_TYPE_ZSET + BaseRdbParser parser = new BaseRdbParser(in); + BaseRdbEncoder encoder = new BaseRdbEncoder(); + + try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) { + long len = parser.rdbLoadLen().len; + long temp = len; + while (len > 0) { + ByteArray element = parser.rdbLoadEncodedStringObject(); + encoder.rdbGenericSaveStringObject(element, o); + double score = parser.rdbLoadBinaryDoubleValue(); + encoder.rdbSaveDoubleValue(score, o); + len--; + } + ByteArrayOutputStream o1 = new ByteArrayOutputStream(configure.getBufferSize()); + try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o1, raw, false)) { + listener.write((byte) RDB_TYPE_ZSET); + listener.handle(encoder.rdbSaveLen(temp)); + listener.handle(o.toByteArray()); + } + + DumpKeyValuePair dump = new DumpKeyValuePair(); + dump.setValueRdbType(RDB_TYPE_ZSET); + dump.setKey(key); + dump.setValue(o1.toByteArray()); + return context.valueOf(dump); + } + } else { + try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) { + try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o, raw)) { + listener.write((byte) type); + super.doApplyZSet2(in, version, key, contains, type, context); + } + DumpKeyValuePair dump = new DumpKeyValuePair(); + dump.setValueRdbType(type); + dump.setKey(key); + dump.setValue(o.toByteArray()); + return context.valueOf(dump); } - DumpKeyValuePair dump = new DumpKeyValuePair(); - dump.setValueRdbType(type); - dump.setKey(key); - dump.setValue(o.toByteArray()); - return context.valueOf(dump); } + } @Override @@ -228,16 +266,54 @@ protected Event doApplyHashZipList(RedisInputStream in, int version, byte[] key, @Override protected Event doApplyListQuickList(RedisInputStream in, int version, byte[] key, boolean contains, int type, ContextKeyValuePair context) throws IOException { int ver = configure.getDumpRdbVersion() == -1 ? version : configure.getDumpRdbVersion(); - try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) { - try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o, raw)) { - listener.write((byte) type); - super.doApplyListQuickList(in, version, key, contains, type, context); + if (ver < 7 /* since redis rdb version 7 */) { + // downgrade to RDB_TYPE_LIST + BaseRdbParser parser = new BaseRdbParser(in); + BaseRdbEncoder encoder = new BaseRdbEncoder(); + + try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) { + int total = 0; + long len = parser.rdbLoadLen().len; + for (long i = 0; i < len; i++) { + RedisInputStream stream = new RedisInputStream(parser.rdbGenericLoadStringObject(RDB_LOAD_NONE)); + + BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes + BaseRdbParser.LenHelper.zltail(stream); // zltail + int zllen = BaseRdbParser.LenHelper.zllen(stream); + for (int j = 0; j < zllen; j++) { + byte[] e = BaseRdbParser.StringHelper.zipListEntry(stream); + encoder.rdbGenericSaveStringObject(new ByteArray(e), o); + total++; + } + int zlend = BaseRdbParser.LenHelper.zlend(stream); + if (zlend != 255) { + throw new AssertionError("zlend expect 255 but " + zlend); + } + } + ByteArrayOutputStream o1 = new ByteArrayOutputStream(configure.getBufferSize()); + try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o1, raw, false)) { + listener.write((byte) RDB_TYPE_LIST); + listener.handle(encoder.rdbSaveLen(total)); + listener.handle(o.toByteArray()); + } + DumpKeyValuePair dump = new DumpKeyValuePair(); + dump.setValueRdbType(RDB_TYPE_LIST); + dump.setKey(key); + dump.setValue(o1.toByteArray()); + return context.valueOf(dump); + } + } else { + try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) { + try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o, raw)) { + listener.write((byte) type); + super.doApplyListQuickList(in, version, key, contains, type, context); + } + DumpKeyValuePair dump = new DumpKeyValuePair(); + dump.setValueRdbType(type); + dump.setKey(key); + dump.setValue(o.toByteArray()); + return context.valueOf(dump); } - DumpKeyValuePair dump = new DumpKeyValuePair(); - dump.setValueRdbType(type); - dump.setKey(key); - dump.setValue(o.toByteArray()); - return context.valueOf(dump); } } diff --git a/src/main/java/com/moilioncircle/redis/rdb/cli/ext/DumpRawByteListener.java b/src/main/java/com/moilioncircle/redis/rdb/cli/ext/DumpRawByteListener.java index 3630b646..a00d767b 100644 --- a/src/main/java/com/moilioncircle/redis/rdb/cli/ext/DumpRawByteListener.java +++ b/src/main/java/com/moilioncircle/redis/rdb/cli/ext/DumpRawByteListener.java @@ -31,13 +31,19 @@ */ public class DumpRawByteListener implements RawByteListener, Closeable { private final int version; + private final boolean listener; private final CRCOutputStream out; private final Replicator replicator; public DumpRawByteListener(Replicator replicator, int version, OutputStream out, Escaper escaper) { + this(replicator, version, out, escaper, true); + } + + public DumpRawByteListener(Replicator replicator, int version, OutputStream out, Escaper escaper, boolean listener) { this.version = version; + this.listener = listener; this.replicator = replicator; - this.replicator.addRawByteListener(this); + if (listener) this.replicator.addRawByteListener(this); this.out = new CRCOutputStream(out, escaper); } @@ -52,7 +58,7 @@ public void handle(byte... rawBytes) { @Override public void close() throws IOException { - this.replicator.removeRawByteListener(this); + if (listener) this.replicator.removeRawByteListener(this); this.out.write((byte) version); this.out.write((byte) 0x00); this.out.write(this.out.getCRC64()); diff --git a/src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/DumpRdbVisitor.java b/src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/DumpRdbVisitor.java index fff3e151..12af9b0b 100644 --- a/src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/DumpRdbVisitor.java +++ b/src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/DumpRdbVisitor.java @@ -16,6 +16,10 @@ package com.moilioncircle.redis.rdb.cli.ext.rct; +import static com.moilioncircle.redis.replicator.Constants.RDB_LOAD_NONE; +import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_LIST; +import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_ZSET; + import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; @@ -30,8 +34,11 @@ import com.moilioncircle.redis.replicator.Replicator; import com.moilioncircle.redis.replicator.event.Event; import com.moilioncircle.redis.replicator.io.RedisInputStream; +import com.moilioncircle.redis.replicator.rdb.BaseRdbEncoder; +import com.moilioncircle.redis.replicator.rdb.BaseRdbParser; import com.moilioncircle.redis.replicator.rdb.datatype.ContextKeyValuePair; import com.moilioncircle.redis.replicator.rdb.datatype.DB; +import com.moilioncircle.redis.replicator.util.ByteArray; /** * @author Baoyi Chen @@ -171,9 +178,30 @@ protected Event doApplyZSet2(RedisInputStream in, int version, byte[] key, boole } version = configure.getDumpRdbVersion() == -1 ? version : configure.getDumpRdbVersion(); try (ByteArrayOutputStream out = new ByteArrayOutputStream(configure.getBufferSize())) { - try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) { - listener.write((byte) type); - super.doApplyZSet2(in, version, key, contains, type, context); + if (version < 8 /* since redis rdb version 8 */) { + // downgrade to RDB_TYPE_LIST + BaseRdbParser parser = new BaseRdbParser(in); + BaseRdbEncoder encoder = new BaseRdbEncoder(); + ByteArrayOutputStream out1 = new ByteArrayOutputStream(configure.getBufferSize()); + long len = parser.rdbLoadLen().len; + long temp = len; + while (len > 0) { + ByteArray element = parser.rdbLoadEncodedStringObject(); + encoder.rdbGenericSaveStringObject(element, out1); + double score = parser.rdbLoadBinaryDoubleValue(); + encoder.rdbSaveDoubleValue(score, out1); + len--; + } + try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper, false)) { + listener.write((byte) RDB_TYPE_ZSET); + listener.handle(encoder.rdbSaveLen(temp)); + listener.handle(out1.toByteArray()); + } + } else { + try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) { + listener.write((byte) type); + super.doApplyZSet2(in, version, key, contains, type, context); + } } if (replace) { emit(this.out, RESTORE, key, ex, out.toByteArray(), REPLACE); @@ -353,10 +381,40 @@ protected Event doApplyListQuickList(RedisInputStream in, int version, byte[] ke } version = configure.getDumpRdbVersion() == -1 ? version : configure.getDumpRdbVersion(); try (ByteArrayOutputStream out = new ByteArrayOutputStream(configure.getBufferSize())) { - try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) { - listener.write((byte) type); - super.doApplyListQuickList(in, version, key, contains, type, context); + if (version < 7 /* since redis rdb version 7 */) { + BaseRdbParser parser = new BaseRdbParser(in); + BaseRdbEncoder encoder = new BaseRdbEncoder(); + ByteArrayOutputStream out1 = new ByteArrayOutputStream(configure.getBufferSize()); + int total = 0; + long len = parser.rdbLoadLen().len; + for (long i = 0; i < len; i++) { + RedisInputStream stream = new RedisInputStream(parser.rdbGenericLoadStringObject(RDB_LOAD_NONE)); + + BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes + BaseRdbParser.LenHelper.zltail(stream); // zltail + int zllen = BaseRdbParser.LenHelper.zllen(stream); + for (int j = 0; j < zllen; j++) { + byte[] e = BaseRdbParser.StringHelper.zipListEntry(stream); + encoder.rdbGenericSaveStringObject(new ByteArray(e), out1); + total++; + } + int zlend = BaseRdbParser.LenHelper.zlend(stream); + if (zlend != 255) { + throw new AssertionError("zlend expect 255 but " + zlend); + } + } + try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper, false)) { + listener.write((byte) RDB_TYPE_LIST); + listener.handle(encoder.rdbSaveLen(total)); + listener.handle(out1.toByteArray()); + } + } else { + try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) { + listener.write((byte) type); + super.doApplyListQuickList(in, version, key, contains, type, context); + } } + if (replace) { emit(this.out, RESTORE, key, ex, out.toByteArray(), REPLACE); } else { diff --git a/src/main/resources/.version b/src/main/resources/.version index 45a99704..f420d704 100644 --- a/src/main/resources/.version +++ b/src/main/resources/.version @@ -1 +1 @@ -v0.6.1 \ No newline at end of file +v0.6.2 \ No newline at end of file