Skip to content

Commit

Permalink
v0.6.2
Browse files Browse the repository at this point in the history
  • Loading branch information
Baoyi Chen committed Mar 13, 2021
1 parent 48b752c commit 6eaf416
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 28 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 0.6.2

support downgrade migration from redis 6.2 to 2.8.

### 0.6.1

Fix OOM bug when use `RawByteListener`.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<dependency>
<groupId>com.moilioncircle</groupId>
<artifactId>redis-replicator</artifactId>
<version>3.5.2</version>
<version>3.6.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package com.moilioncircle.redis.rdb.cli.ext;

import static com.moilioncircle.redis.replicator.Constants.RDB_LOAD_NONE;
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_LIST;
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_ZSET;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
Expand All @@ -28,8 +32,11 @@
import com.moilioncircle.redis.replicator.Replicator;
import com.moilioncircle.redis.replicator.event.Event;
import com.moilioncircle.redis.replicator.io.RedisInputStream;
import com.moilioncircle.redis.replicator.rdb.BaseRdbEncoder;
import com.moilioncircle.redis.replicator.rdb.BaseRdbParser;
import com.moilioncircle.redis.replicator.rdb.datatype.ContextKeyValuePair;
import com.moilioncircle.redis.replicator.rdb.dump.datatype.DumpKeyValuePair;
import com.moilioncircle.redis.replicator.util.ByteArray;

/**
* @author Baoyi Chen
Expand Down Expand Up @@ -116,17 +123,48 @@ protected Event doApplyZSet(RedisInputStream in, int version, byte[] key, boolea
@Override
protected Event doApplyZSet2(RedisInputStream in, int version, byte[] key, boolean contains, int type, ContextKeyValuePair context) throws IOException {
int ver = configure.getDumpRdbVersion() == -1 ? version : configure.getDumpRdbVersion();
try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) {
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o, raw)) {
listener.write((byte) type);
super.doApplyZSet2(in, version, key, contains, type, context);
if (ver < 8 /* since redis rdb version 8 */) {
// downgrade to RDB_TYPE_ZSET
BaseRdbParser parser = new BaseRdbParser(in);
BaseRdbEncoder encoder = new BaseRdbEncoder();

try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) {
long len = parser.rdbLoadLen().len;
long temp = len;
while (len > 0) {
ByteArray element = parser.rdbLoadEncodedStringObject();
encoder.rdbGenericSaveStringObject(element, o);
double score = parser.rdbLoadBinaryDoubleValue();
encoder.rdbSaveDoubleValue(score, o);
len--;
}
ByteArrayOutputStream o1 = new ByteArrayOutputStream(configure.getBufferSize());
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o1, raw, false)) {
listener.write((byte) RDB_TYPE_ZSET);
listener.handle(encoder.rdbSaveLen(temp));
listener.handle(o.toByteArray());
}

DumpKeyValuePair dump = new DumpKeyValuePair();
dump.setValueRdbType(RDB_TYPE_ZSET);
dump.setKey(key);
dump.setValue(o1.toByteArray());
return context.valueOf(dump);
}
} else {
try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) {
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o, raw)) {
listener.write((byte) type);
super.doApplyZSet2(in, version, key, contains, type, context);
}
DumpKeyValuePair dump = new DumpKeyValuePair();
dump.setValueRdbType(type);
dump.setKey(key);
dump.setValue(o.toByteArray());
return context.valueOf(dump);
}
DumpKeyValuePair dump = new DumpKeyValuePair();
dump.setValueRdbType(type);
dump.setKey(key);
dump.setValue(o.toByteArray());
return context.valueOf(dump);
}

}

@Override
Expand Down Expand Up @@ -228,16 +266,54 @@ protected Event doApplyHashZipList(RedisInputStream in, int version, byte[] key,
@Override
protected Event doApplyListQuickList(RedisInputStream in, int version, byte[] key, boolean contains, int type, ContextKeyValuePair context) throws IOException {
int ver = configure.getDumpRdbVersion() == -1 ? version : configure.getDumpRdbVersion();
try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) {
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o, raw)) {
listener.write((byte) type);
super.doApplyListQuickList(in, version, key, contains, type, context);
if (ver < 7 /* since redis rdb version 7 */) {
// downgrade to RDB_TYPE_LIST
BaseRdbParser parser = new BaseRdbParser(in);
BaseRdbEncoder encoder = new BaseRdbEncoder();

try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) {
int total = 0;
long len = parser.rdbLoadLen().len;
for (long i = 0; i < len; i++) {
RedisInputStream stream = new RedisInputStream(parser.rdbGenericLoadStringObject(RDB_LOAD_NONE));

BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes
BaseRdbParser.LenHelper.zltail(stream); // zltail
int zllen = BaseRdbParser.LenHelper.zllen(stream);
for (int j = 0; j < zllen; j++) {
byte[] e = BaseRdbParser.StringHelper.zipListEntry(stream);
encoder.rdbGenericSaveStringObject(new ByteArray(e), o);
total++;
}
int zlend = BaseRdbParser.LenHelper.zlend(stream);
if (zlend != 255) {
throw new AssertionError("zlend expect 255 but " + zlend);
}
}
ByteArrayOutputStream o1 = new ByteArrayOutputStream(configure.getBufferSize());
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o1, raw, false)) {
listener.write((byte) RDB_TYPE_LIST);
listener.handle(encoder.rdbSaveLen(total));
listener.handle(o.toByteArray());
}
DumpKeyValuePair dump = new DumpKeyValuePair();
dump.setValueRdbType(RDB_TYPE_LIST);
dump.setKey(key);
dump.setValue(o1.toByteArray());
return context.valueOf(dump);
}
} else {
try (ByteArrayOutputStream o = new ByteArrayOutputStream(configure.getBufferSize())) {
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, o, raw)) {
listener.write((byte) type);
super.doApplyListQuickList(in, version, key, contains, type, context);
}
DumpKeyValuePair dump = new DumpKeyValuePair();
dump.setValueRdbType(type);
dump.setKey(key);
dump.setValue(o.toByteArray());
return context.valueOf(dump);
}
DumpKeyValuePair dump = new DumpKeyValuePair();
dump.setValueRdbType(type);
dump.setKey(key);
dump.setValue(o.toByteArray());
return context.valueOf(dump);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@
*/
public class DumpRawByteListener implements RawByteListener, Closeable {
private final int version;
private final boolean listener;
private final CRCOutputStream out;
private final Replicator replicator;

public DumpRawByteListener(Replicator replicator, int version, OutputStream out, Escaper escaper) {
this(replicator, version, out, escaper, true);
}

public DumpRawByteListener(Replicator replicator, int version, OutputStream out, Escaper escaper, boolean listener) {
this.version = version;
this.listener = listener;
this.replicator = replicator;
this.replicator.addRawByteListener(this);
if (listener) this.replicator.addRawByteListener(this);
this.out = new CRCOutputStream(out, escaper);
}

Expand All @@ -52,7 +58,7 @@ public void handle(byte... rawBytes) {

@Override
public void close() throws IOException {
this.replicator.removeRawByteListener(this);
if (listener) this.replicator.removeRawByteListener(this);
this.out.write((byte) version);
this.out.write((byte) 0x00);
this.out.write(this.out.getCRC64());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package com.moilioncircle.redis.rdb.cli.ext.rct;

import static com.moilioncircle.redis.replicator.Constants.RDB_LOAD_NONE;
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_LIST;
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_ZSET;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
Expand All @@ -30,8 +34,11 @@
import com.moilioncircle.redis.replicator.Replicator;
import com.moilioncircle.redis.replicator.event.Event;
import com.moilioncircle.redis.replicator.io.RedisInputStream;
import com.moilioncircle.redis.replicator.rdb.BaseRdbEncoder;
import com.moilioncircle.redis.replicator.rdb.BaseRdbParser;
import com.moilioncircle.redis.replicator.rdb.datatype.ContextKeyValuePair;
import com.moilioncircle.redis.replicator.rdb.datatype.DB;
import com.moilioncircle.redis.replicator.util.ByteArray;

/**
* @author Baoyi Chen
Expand Down Expand Up @@ -171,9 +178,30 @@ protected Event doApplyZSet2(RedisInputStream in, int version, byte[] key, boole
}
version = configure.getDumpRdbVersion() == -1 ? version : configure.getDumpRdbVersion();
try (ByteArrayOutputStream out = new ByteArrayOutputStream(configure.getBufferSize())) {
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) {
listener.write((byte) type);
super.doApplyZSet2(in, version, key, contains, type, context);
if (version < 8 /* since redis rdb version 8 */) {
// downgrade to RDB_TYPE_LIST
BaseRdbParser parser = new BaseRdbParser(in);
BaseRdbEncoder encoder = new BaseRdbEncoder();
ByteArrayOutputStream out1 = new ByteArrayOutputStream(configure.getBufferSize());
long len = parser.rdbLoadLen().len;
long temp = len;
while (len > 0) {
ByteArray element = parser.rdbLoadEncodedStringObject();
encoder.rdbGenericSaveStringObject(element, out1);
double score = parser.rdbLoadBinaryDoubleValue();
encoder.rdbSaveDoubleValue(score, out1);
len--;
}
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper, false)) {
listener.write((byte) RDB_TYPE_ZSET);
listener.handle(encoder.rdbSaveLen(temp));
listener.handle(out1.toByteArray());
}
} else {
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) {
listener.write((byte) type);
super.doApplyZSet2(in, version, key, contains, type, context);
}
}
if (replace) {
emit(this.out, RESTORE, key, ex, out.toByteArray(), REPLACE);
Expand Down Expand Up @@ -353,10 +381,40 @@ protected Event doApplyListQuickList(RedisInputStream in, int version, byte[] ke
}
version = configure.getDumpRdbVersion() == -1 ? version : configure.getDumpRdbVersion();
try (ByteArrayOutputStream out = new ByteArrayOutputStream(configure.getBufferSize())) {
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) {
listener.write((byte) type);
super.doApplyListQuickList(in, version, key, contains, type, context);
if (version < 7 /* since redis rdb version 7 */) {
BaseRdbParser parser = new BaseRdbParser(in);
BaseRdbEncoder encoder = new BaseRdbEncoder();
ByteArrayOutputStream out1 = new ByteArrayOutputStream(configure.getBufferSize());
int total = 0;
long len = parser.rdbLoadLen().len;
for (long i = 0; i < len; i++) {
RedisInputStream stream = new RedisInputStream(parser.rdbGenericLoadStringObject(RDB_LOAD_NONE));

BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes
BaseRdbParser.LenHelper.zltail(stream); // zltail
int zllen = BaseRdbParser.LenHelper.zllen(stream);
for (int j = 0; j < zllen; j++) {
byte[] e = BaseRdbParser.StringHelper.zipListEntry(stream);
encoder.rdbGenericSaveStringObject(new ByteArray(e), out1);
total++;
}
int zlend = BaseRdbParser.LenHelper.zlend(stream);
if (zlend != 255) {
throw new AssertionError("zlend expect 255 but " + zlend);
}
}
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper, false)) {
listener.write((byte) RDB_TYPE_LIST);
listener.handle(encoder.rdbSaveLen(total));
listener.handle(out1.toByteArray());
}
} else {
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) {
listener.write((byte) type);
super.doApplyListQuickList(in, version, key, contains, type, context);
}
}

if (replace) {
emit(this.out, RESTORE, key, ex, out.toByteArray(), REPLACE);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/.version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.6.1
v0.6.2

0 comments on commit 6eaf416

Please sign in to comment.