Skip to content

Commit

Permalink
fixed issue #5017 , json partial update fixed & format
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Jan 2, 2024
1 parent 54145ce commit 04c11ec
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,13 @@ public static StringBuilder print_json_diff(LogBuffer buffer, long len, String c
buffer.forward((int) value_length);
}

if (buffer.position - position >= len) {
// see https://github.com/alibaba/canal/pull/5018
if (buffer.position() - position >= len) {
break;
}
}

if (buffer.position - position != len) {
if (buffer.position() - position != len) {
throw new IllegalArgumentException("reading json diff");
}

Expand Down Expand Up @@ -137,7 +138,8 @@ public static StringBuilder print_json_diff(LogBuffer buffer, long len, String c
builder.append(jsonBuilder);
}

if (buffer.position - position >= len) {
// see https://github.com/alibaba/canal/pull/5018
if (buffer.position() - position >= len) {
builder.append(")");
break;
}
Expand All @@ -153,7 +155,7 @@ public static StringBuilder print_json_diff(LogBuffer buffer, long len, String c
diff_i++;
}

if (buffer.position - position != len) {
if (buffer.position() - position != len) {
throw new IllegalArgumentException("reading json diff");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import java.nio.charset.Charset;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.ZoneId;
import java.time.zone.ZoneRules;
import java.util.BitSet;

import org.apache.commons.logging.Log;
Expand All @@ -32,27 +30,27 @@ public final class RowsLogBuffer {
public static final Integer[] integerCache = new Integer[1024 * 128];
public static final int integerCacheLimit = longCache.length + 127;

public static final long DATETIMEF_INT_OFS = 0x8000000000L;
public static final long TIMEF_INT_OFS = 0x800000L;
public static final long TIMEF_OFS = 0x800000000000L;
private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' };
public static final long DATETIMEF_INT_OFS = 0x8000000000L;
public static final long TIMEF_INT_OFS = 0x800000L;
public static final long TIMEF_OFS = 0x800000000000L;
private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' };

private final LogBuffer buffer;
private final int columnLen;
private final int jsonColumnCount;
private final Charset charset;
private final LogBuffer buffer;
private final int columnLen;
private final int jsonColumnCount;
private final Charset charset;

private final BitSet nullBits;
private int nullBitIndex;
private final BitSet nullBits;
private int nullBitIndex;

// Read value_options if this is AI for PARTIAL_UPDATE_ROWS_EVENT
private final boolean partial;
private final BitSet partialBits;
private final boolean partial;
private final BitSet partialBits;

private boolean fNull;
private int javaType;
private int length;
private Serializable value;
private boolean fNull;
private int javaType;
private int length;
private Serializable value;

public RowsLogBuffer(LogBuffer buffer, final int columnLen, Charset charset, int jsonColumnCount, boolean partial){
this.buffer = buffer;
Expand All @@ -71,8 +69,7 @@ public final boolean nextOneRow(BitSet columns) {
/**
* Extracting next row from packed buffer.
*
* @see mysql-5.1.60/sql/log_event.cc -
* Rows_log_event::print_verbose_one_row
* @see mysql-5.1.60/sql/log_event.cc - Rows_log_event::print_verbose_one_row
*/
public final boolean nextOneRow(BitSet columns, boolean after) {
final boolean hasOneRow = buffer.hasRemaining();
Expand Down Expand Up @@ -105,8 +102,7 @@ public final boolean nextOneRow(BitSet columns, boolean after) {
/**
* Extracting next field value from packed buffer.
*
* @see mysql-5.1.60/sql/log_event.cc -
* Rows_log_event::print_verbose_one_row
* @see mysql-5.1.60/sql/log_event.cc - Rows_log_event::print_verbose_one_row
*/
public final Serializable nextValue(final String columName, final int columnIndex, final int type, final int meta) {
return nextValue(columName, columnIndex, type, meta, false);
Expand All @@ -115,8 +111,7 @@ public final Serializable nextValue(final String columName, final int columnInde
/**
* Extracting next field value from packed buffer.
*
* @see mysql-5.1.60/sql/log_event.cc -
* Rows_log_event::print_verbose_one_row
* @see mysql-5.1.60/sql/log_event.cc - Rows_log_event::print_verbose_one_row
*/
public final Serializable nextValue(final String columName, final int columnIndex, final int type, final int meta,
boolean isBinary) {
Expand Down Expand Up @@ -300,10 +295,8 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
len = byte1;
break;
default:
throw new IllegalArgumentException(String.format("!! Don't know how to handle column type=%d meta=%d (%04X)",
type,
meta,
meta));
throw new IllegalArgumentException(String
.format("!! Don't know how to handle column type=%d meta=%d (%04X)", type, meta, meta));
}
}
} else {
Expand Down Expand Up @@ -359,8 +352,8 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
}
case LogEvent.MYSQL_TYPE_DECIMAL: {
/*
* log_event.h : This enumeration value is only used internally
* and cannot exist in a binlog.
* log_event.h : This enumeration value is only used internally and cannot exist
* in a binlog.
*/
logger.warn("MYSQL_TYPE_DECIMAL : This enumeration value is "
+ "only used internally and cannot exist in a binlog!");
Expand Down Expand Up @@ -541,12 +534,10 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
}
case LogEvent.MYSQL_TYPE_DATETIME2: {
/*
* DATETIME and DATE low-level memory and disk representation
* routines 1 bit sign (used when on disk) 17 bits year*13+month
* (year 0-9999, month 0-12) 5 bits day (0-31) 5 bits hour
* (0-23) 6 bits minute (0-59) 6 bits second (0-59) 24 bits
* microseconds (0-999999) Total: 64 bits = 8 bytes
* SYYYYYYY.YYYYYYYY
* DATETIME and DATE low-level memory and disk representation routines 1 bit
* sign (used when on disk) 17 bits year*13+month (year 0-9999, month 0-12) 5
* bits day (0-31) 5 bits hour (0-23) 6 bits minute (0-59) 6 bits second (0-59)
* 24 bits microseconds (0-999999) Total: 64 bits = 8 bytes SYYYYYYY.YYYYYYYY
* .YYdddddh.hhhhmmmm.mmssssss.ffffffff.ffffffff.ffffffff
*/
long intpart = buffer.getBeUlong40() - DATETIMEF_INT_OFS; // big-endian
Expand Down Expand Up @@ -665,12 +656,10 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
}
case LogEvent.MYSQL_TYPE_TIME2: {
/*
* TIME low-level memory and disk representation routines
* In-memory format: 1 bit sign (Used for sign, when on disk) 1
* bit unused (Reserved for wider hour range, e.g. for
* intervals) 10 bit hour (0-836) 6 bit minute (0-59) 6 bit
* second (0-59) 24 bits microseconds (0-999999) Total: 48 bits
* = 6 bytes
* TIME low-level memory and disk representation routines In-memory format: 1
* bit sign (Used for sign, when on disk) 1 bit unused (Reserved for wider hour
* range, e.g. for intervals) 10 bit hour (0-836) 6 bit minute (0-59) 6 bit
* second (0-59) 24 bits microseconds (0-999999) Total: 48 bits = 6 bytes
* Suhhhhhh.hhhhmmmm.mmssssss.ffffffff.ffffffff.ffffffff
*/
long intpart = 0;
Expand All @@ -687,20 +676,15 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
frac = buffer.getUint8();
if (intpart < 0 && frac > 0) {
/*
* Negative values are stored with reverse
* fractional part order, for binary sort
* compatibility. Disk value intpart frac Time value
* Memory value 800000.00 0 0 00:00:00.00
* 0000000000.000000 7FFFFF.FF -1 255 -00:00:00.01
* FFFFFFFFFF.FFD8F0 7FFFFF.9D -1 99 -00:00:00.99
* FFFFFFFFFF.F0E4D0 7FFFFF.00 -1 0 -00:00:01.00
* FFFFFFFFFF.000000 7FFFFE.FF -1 255 -00:00:01.01
* FFFFFFFFFE.FFD8F0 7FFFFE.F6 -2 246 -00:00:01.10
* FFFFFFFFFE.FE7960 Formula to convert fractional
* part from disk format (now stored in "frac"
* variable) to absolute value: "0x100 - frac". To
* reconstruct in-memory value, we shift to the next
* integer value and then substruct fractional part.
* Negative values are stored with reverse fractional part order, for binary
* sort compatibility. Disk value intpart frac Time value Memory value 800000.00
* 0 0 00:00:00.00 0000000000.000000 7FFFFF.FF -1 255 -00:00:00.01
* FFFFFFFFFF.FFD8F0 7FFFFF.9D -1 99 -00:00:00.99 FFFFFFFFFF.F0E4D0 7FFFFF.00 -1
* 0 -00:00:01.00 FFFFFFFFFF.000000 7FFFFE.FF -1 255 -00:00:01.01
* FFFFFFFFFE.FFD8F0 7FFFFE.F6 -2 246 -00:00:01.10 FFFFFFFFFE.FE7960 Formula to
* convert fractional part from disk format (now stored in "frac" variable) to
* absolute value: "0x100 - frac". To reconstruct in-memory value, we shift to
* the next integer value and then substruct fractional part.
*/
intpart++; /* Shift to the next integer value */
frac -= 0x100; /* -(0x100 - frac) */
Expand All @@ -715,9 +699,8 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
frac = buffer.getBeUint16();
if (intpart < 0 && frac > 0) {
/*
* Fix reverse fractional part order:
* "0x10000 - frac". See comments for FSP=1 and
* FSP=2 above.
* Fix reverse fractional part order: "0x10000 - frac". See comments for FSP=1
* and FSP=2 above.
*/
intpart++; /* Shift to the next integer value */
frac -= 0x10000; /* -(0x10000-frac) */
Expand Down Expand Up @@ -789,8 +772,8 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
}
case LogEvent.MYSQL_TYPE_NEWDATE: {
/*
* log_event.h : This enumeration value is only used internally
* and cannot exist in a binlog.
* log_event.h : This enumeration value is only used internally and cannot exist
* in a binlog.
*/
logger.warn("MYSQL_TYPE_NEWDATE : This enumeration value is "
+ "only used internally and cannot exist in a binlog!");
Expand Down Expand Up @@ -862,8 +845,8 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
case LogEvent.MYSQL_TYPE_ENUM: {
final int int32;
/*
* log_event.h : This enumeration value is only used internally
* and cannot exist in a binlog.
* log_event.h : This enumeration value is only used internally and cannot exist
* in a binlog.
*/
switch (len) {
case 1:
Expand Down Expand Up @@ -929,24 +912,24 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
}
case LogEvent.MYSQL_TYPE_TINY_BLOB: {
/*
* log_event.h : This enumeration value is only used internally
* and cannot exist in a binlog.
* log_event.h : This enumeration value is only used internally and cannot exist
* in a binlog.
*/
logger.warn("MYSQL_TYPE_TINY_BLOB : This enumeration value is "
+ "only used internally and cannot exist in a binlog!");
}
case LogEvent.MYSQL_TYPE_MEDIUM_BLOB: {
/*
* log_event.h : This enumeration value is only used internally
* and cannot exist in a binlog.
* log_event.h : This enumeration value is only used internally and cannot exist
* in a binlog.
*/
logger.warn("MYSQL_TYPE_MEDIUM_BLOB : This enumeration value is "
+ "only used internally and cannot exist in a binlog!");
}
case LogEvent.MYSQL_TYPE_LONG_BLOB: {
/*
* log_event.h : This enumeration value is only used internally
* and cannot exist in a binlog.
* log_event.h : This enumeration value is only used internally and cannot exist
* in a binlog.
*/
logger.warn("MYSQL_TYPE_LONG_BLOB : This enumeration value is "
+ "only used internally and cannot exist in a binlog!");
Expand Down Expand Up @@ -1005,8 +988,7 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
case LogEvent.MYSQL_TYPE_VAR_STRING: {
/*
* Except for the data length calculation, MYSQL_TYPE_VARCHAR,
* MYSQL_TYPE_VAR_STRING and MYSQL_TYPE_STRING are handled the
* same way.
* MYSQL_TYPE_VAR_STRING and MYSQL_TYPE_STRING are handled the same way.
*/
len = meta;
if (len < 256) {
Expand Down Expand Up @@ -1078,11 +1060,9 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
// print_json_diff
int position = buffer.position();
try {
StringBuilder builder = JsonDiffConversion.print_json_diff(buffer,
len,
columnName,
columnIndex,
charset);
// https://github.com/alibaba/canal/pull/5018
StringBuilder builder = JsonDiffConversion
.print_json_diff(buffer, len, columnName, columnIndex, charset);
value = builder.toString();
buffer.position(position + len);
} catch (IllegalArgumentException e) {
Expand Down Expand Up @@ -1120,24 +1100,19 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina
/* fill binary */
byte[] binary = new byte[len];
buffer.fillBytes(binary, 0, len);

/* Warning unsupport cloumn type */
// logger.warn(String.format("!! Unsupport column type MYSQL_TYPE_GEOMETRY: meta=%d (%04X), len = %d",
// meta,
// meta,
// len));
// Warning unsupport cloumn type
// logger.warn(String.format("!! Unsupport column type MYSQL_TYPE_GEOMETRY:
// meta=%d (%04X), len = %d", meta,meta, len));
javaType = Types.BINARY;
value = binary;
length = len;
break;
}
case LogEvent.MYSQL_TYPE_BOOL :
case LogEvent.MYSQL_TYPE_INVALID :
case LogEvent.MYSQL_TYPE_BOOL:
case LogEvent.MYSQL_TYPE_INVALID:
default:
logger.error(String.format("!! Don't know how to handle column type=%d meta=%d (%04X)",
type,
meta,
meta));
logger.error(
String.format("!! Don't know how to handle column type=%d meta=%d (%04X)", type, meta, meta));
javaType = Types.OTHER;
value = null;
length = 0;
Expand All @@ -1148,16 +1123,12 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina

private void parseJsonFromFullValue(int len) {
if (0 == len) {
// fixed issue #1 by lava, json column of zero length
// has no
// value, value parsing should be skipped
// fixed issue #1 by lava, json column of zero length has no value, value
// parsing should be skipped
value = "";
} else {
int position = buffer.position();
Json_Value jsonValue = JsonConversion.parse_value(buffer.getUint8(),
buffer,
len - 1,
charset);
Json_Value jsonValue = JsonConversion.parse_value(buffer.getUint8(), buffer, len - 1, charset);
StringBuilder builder = new StringBuilder();
jsonValue.toJsonString(builder, charset);
value = builder.toString();
Expand Down

0 comments on commit 04c11ec

Please sign in to comment.