diff --git a/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonDiffConversion.java b/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonDiffConversion.java index 5934ecea9d..67362eddd4 100644 --- a/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonDiffConversion.java +++ b/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonDiffConversion.java @@ -73,12 +73,13 @@ public static StringBuilder print_json_diff(LogBuffer buffer, long len, String c buffer.forward((int) value_length); } - if (buffer.position - position >= len) { + // see https://github.com/alibaba/canal/pull/5018 + if (buffer.position() - position >= len) { break; } } - if (buffer.position - position != len) { + if (buffer.position() - position != len) { throw new IllegalArgumentException("reading json diff"); } @@ -137,7 +138,8 @@ public static StringBuilder print_json_diff(LogBuffer buffer, long len, String c builder.append(jsonBuilder); } - if (buffer.position - position >= len) { + // see https://github.com/alibaba/canal/pull/5018 + if (buffer.position() - position >= len) { builder.append(")"); break; } @@ -153,7 +155,7 @@ public static StringBuilder print_json_diff(LogBuffer buffer, long len, String c diff_i++; } - if (buffer.position - position != len) { + if (buffer.position() - position != len) { throw new IllegalArgumentException("reading json diff"); } diff --git a/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java b/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java index ad9d0f7d2d..86fbed0498 100644 --- a/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java +++ b/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java @@ -4,8 +4,6 @@ import java.nio.charset.Charset; import java.sql.Timestamp; import java.sql.Types; -import java.time.ZoneId; -import java.time.zone.ZoneRules; import java.util.BitSet; import org.apache.commons.logging.Log; @@ -32,27 +30,27 @@ public final class RowsLogBuffer { public static final Integer[] integerCache = new Integer[1024 * 128]; public static final int integerCacheLimit = longCache.length + 127; - public static final long DATETIMEF_INT_OFS = 0x8000000000L; - public static final long TIMEF_INT_OFS = 0x800000L; - public static final long TIMEF_OFS = 0x800000000000L; - private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' }; + public static final long DATETIMEF_INT_OFS = 0x8000000000L; + public static final long TIMEF_INT_OFS = 0x800000L; + public static final long TIMEF_OFS = 0x800000000000L; + private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' }; - private final LogBuffer buffer; - private final int columnLen; - private final int jsonColumnCount; - private final Charset charset; + private final LogBuffer buffer; + private final int columnLen; + private final int jsonColumnCount; + private final Charset charset; - private final BitSet nullBits; - private int nullBitIndex; + private final BitSet nullBits; + private int nullBitIndex; // Read value_options if this is AI for PARTIAL_UPDATE_ROWS_EVENT - private final boolean partial; - private final BitSet partialBits; + private final boolean partial; + private final BitSet partialBits; - private boolean fNull; - private int javaType; - private int length; - private Serializable value; + private boolean fNull; + private int javaType; + private int length; + private Serializable value; public RowsLogBuffer(LogBuffer buffer, final int columnLen, Charset charset, int jsonColumnCount, boolean partial){ this.buffer = buffer; @@ -71,8 +69,7 @@ public final boolean nextOneRow(BitSet columns) { /** * Extracting next row from packed buffer. * - * @see mysql-5.1.60/sql/log_event.cc - - * Rows_log_event::print_verbose_one_row + * @see mysql-5.1.60/sql/log_event.cc - Rows_log_event::print_verbose_one_row */ public final boolean nextOneRow(BitSet columns, boolean after) { final boolean hasOneRow = buffer.hasRemaining(); @@ -105,8 +102,7 @@ public final boolean nextOneRow(BitSet columns, boolean after) { /** * Extracting next field value from packed buffer. * - * @see mysql-5.1.60/sql/log_event.cc - - * Rows_log_event::print_verbose_one_row + * @see mysql-5.1.60/sql/log_event.cc - Rows_log_event::print_verbose_one_row */ public final Serializable nextValue(final String columName, final int columnIndex, final int type, final int meta) { return nextValue(columName, columnIndex, type, meta, false); @@ -115,8 +111,7 @@ public final Serializable nextValue(final String columName, final int columnInde /** * Extracting next field value from packed buffer. * - * @see mysql-5.1.60/sql/log_event.cc - - * Rows_log_event::print_verbose_one_row + * @see mysql-5.1.60/sql/log_event.cc - Rows_log_event::print_verbose_one_row */ public final Serializable nextValue(final String columName, final int columnIndex, final int type, final int meta, boolean isBinary) { @@ -300,10 +295,8 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina len = byte1; break; default: - throw new IllegalArgumentException(String.format("!! Don't know how to handle column type=%d meta=%d (%04X)", - type, - meta, - meta)); + throw new IllegalArgumentException(String + .format("!! Don't know how to handle column type=%d meta=%d (%04X)", type, meta, meta)); } } } else { @@ -359,8 +352,8 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina } case LogEvent.MYSQL_TYPE_DECIMAL: { /* - * log_event.h : This enumeration value is only used internally - * and cannot exist in a binlog. + * log_event.h : This enumeration value is only used internally and cannot exist + * in a binlog. */ logger.warn("MYSQL_TYPE_DECIMAL : This enumeration value is " + "only used internally and cannot exist in a binlog!"); @@ -541,12 +534,10 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina } case LogEvent.MYSQL_TYPE_DATETIME2: { /* - * DATETIME and DATE low-level memory and disk representation - * routines 1 bit sign (used when on disk) 17 bits year*13+month - * (year 0-9999, month 0-12) 5 bits day (0-31) 5 bits hour - * (0-23) 6 bits minute (0-59) 6 bits second (0-59) 24 bits - * microseconds (0-999999) Total: 64 bits = 8 bytes - * SYYYYYYY.YYYYYYYY + * DATETIME and DATE low-level memory and disk representation routines 1 bit + * sign (used when on disk) 17 bits year*13+month (year 0-9999, month 0-12) 5 + * bits day (0-31) 5 bits hour (0-23) 6 bits minute (0-59) 6 bits second (0-59) + * 24 bits microseconds (0-999999) Total: 64 bits = 8 bytes SYYYYYYY.YYYYYYYY * .YYdddddh.hhhhmmmm.mmssssss.ffffffff.ffffffff.ffffffff */ long intpart = buffer.getBeUlong40() - DATETIMEF_INT_OFS; // big-endian @@ -665,12 +656,10 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina } case LogEvent.MYSQL_TYPE_TIME2: { /* - * TIME low-level memory and disk representation routines - * In-memory format: 1 bit sign (Used for sign, when on disk) 1 - * bit unused (Reserved for wider hour range, e.g. for - * intervals) 10 bit hour (0-836) 6 bit minute (0-59) 6 bit - * second (0-59) 24 bits microseconds (0-999999) Total: 48 bits - * = 6 bytes + * TIME low-level memory and disk representation routines In-memory format: 1 + * bit sign (Used for sign, when on disk) 1 bit unused (Reserved for wider hour + * range, e.g. for intervals) 10 bit hour (0-836) 6 bit minute (0-59) 6 bit + * second (0-59) 24 bits microseconds (0-999999) Total: 48 bits = 6 bytes * Suhhhhhh.hhhhmmmm.mmssssss.ffffffff.ffffffff.ffffffff */ long intpart = 0; @@ -687,20 +676,15 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina frac = buffer.getUint8(); if (intpart < 0 && frac > 0) { /* - * Negative values are stored with reverse - * fractional part order, for binary sort - * compatibility. Disk value intpart frac Time value - * Memory value 800000.00 0 0 00:00:00.00 - * 0000000000.000000 7FFFFF.FF -1 255 -00:00:00.01 - * FFFFFFFFFF.FFD8F0 7FFFFF.9D -1 99 -00:00:00.99 - * FFFFFFFFFF.F0E4D0 7FFFFF.00 -1 0 -00:00:01.00 - * FFFFFFFFFF.000000 7FFFFE.FF -1 255 -00:00:01.01 - * FFFFFFFFFE.FFD8F0 7FFFFE.F6 -2 246 -00:00:01.10 - * FFFFFFFFFE.FE7960 Formula to convert fractional - * part from disk format (now stored in "frac" - * variable) to absolute value: "0x100 - frac". To - * reconstruct in-memory value, we shift to the next - * integer value and then substruct fractional part. + * Negative values are stored with reverse fractional part order, for binary + * sort compatibility. Disk value intpart frac Time value Memory value 800000.00 + * 0 0 00:00:00.00 0000000000.000000 7FFFFF.FF -1 255 -00:00:00.01 + * FFFFFFFFFF.FFD8F0 7FFFFF.9D -1 99 -00:00:00.99 FFFFFFFFFF.F0E4D0 7FFFFF.00 -1 + * 0 -00:00:01.00 FFFFFFFFFF.000000 7FFFFE.FF -1 255 -00:00:01.01 + * FFFFFFFFFE.FFD8F0 7FFFFE.F6 -2 246 -00:00:01.10 FFFFFFFFFE.FE7960 Formula to + * convert fractional part from disk format (now stored in "frac" variable) to + * absolute value: "0x100 - frac". To reconstruct in-memory value, we shift to + * the next integer value and then substruct fractional part. */ intpart++; /* Shift to the next integer value */ frac -= 0x100; /* -(0x100 - frac) */ @@ -715,9 +699,8 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina frac = buffer.getBeUint16(); if (intpart < 0 && frac > 0) { /* - * Fix reverse fractional part order: - * "0x10000 - frac". See comments for FSP=1 and - * FSP=2 above. + * Fix reverse fractional part order: "0x10000 - frac". See comments for FSP=1 + * and FSP=2 above. */ intpart++; /* Shift to the next integer value */ frac -= 0x10000; /* -(0x10000-frac) */ @@ -789,8 +772,8 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina } case LogEvent.MYSQL_TYPE_NEWDATE: { /* - * log_event.h : This enumeration value is only used internally - * and cannot exist in a binlog. + * log_event.h : This enumeration value is only used internally and cannot exist + * in a binlog. */ logger.warn("MYSQL_TYPE_NEWDATE : This enumeration value is " + "only used internally and cannot exist in a binlog!"); @@ -862,8 +845,8 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina case LogEvent.MYSQL_TYPE_ENUM: { final int int32; /* - * log_event.h : This enumeration value is only used internally - * and cannot exist in a binlog. + * log_event.h : This enumeration value is only used internally and cannot exist + * in a binlog. */ switch (len) { case 1: @@ -929,24 +912,24 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina } case LogEvent.MYSQL_TYPE_TINY_BLOB: { /* - * log_event.h : This enumeration value is only used internally - * and cannot exist in a binlog. + * log_event.h : This enumeration value is only used internally and cannot exist + * in a binlog. */ logger.warn("MYSQL_TYPE_TINY_BLOB : This enumeration value is " + "only used internally and cannot exist in a binlog!"); } case LogEvent.MYSQL_TYPE_MEDIUM_BLOB: { /* - * log_event.h : This enumeration value is only used internally - * and cannot exist in a binlog. + * log_event.h : This enumeration value is only used internally and cannot exist + * in a binlog. */ logger.warn("MYSQL_TYPE_MEDIUM_BLOB : This enumeration value is " + "only used internally and cannot exist in a binlog!"); } case LogEvent.MYSQL_TYPE_LONG_BLOB: { /* - * log_event.h : This enumeration value is only used internally - * and cannot exist in a binlog. + * log_event.h : This enumeration value is only used internally and cannot exist + * in a binlog. */ logger.warn("MYSQL_TYPE_LONG_BLOB : This enumeration value is " + "only used internally and cannot exist in a binlog!"); @@ -1005,8 +988,7 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina case LogEvent.MYSQL_TYPE_VAR_STRING: { /* * Except for the data length calculation, MYSQL_TYPE_VARCHAR, - * MYSQL_TYPE_VAR_STRING and MYSQL_TYPE_STRING are handled the - * same way. + * MYSQL_TYPE_VAR_STRING and MYSQL_TYPE_STRING are handled the same way. */ len = meta; if (len < 256) { @@ -1078,11 +1060,9 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina // print_json_diff int position = buffer.position(); try { - StringBuilder builder = JsonDiffConversion.print_json_diff(buffer, - len, - columnName, - columnIndex, - charset); + // https://github.com/alibaba/canal/pull/5018 + StringBuilder builder = JsonDiffConversion + .print_json_diff(buffer, len, columnName, columnIndex, charset); value = builder.toString(); buffer.position(position + len); } catch (IllegalArgumentException e) { @@ -1120,24 +1100,19 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina /* fill binary */ byte[] binary = new byte[len]; buffer.fillBytes(binary, 0, len); - - /* Warning unsupport cloumn type */ - // logger.warn(String.format("!! Unsupport column type MYSQL_TYPE_GEOMETRY: meta=%d (%04X), len = %d", - // meta, - // meta, - // len)); + // Warning unsupport cloumn type + // logger.warn(String.format("!! Unsupport column type MYSQL_TYPE_GEOMETRY: + // meta=%d (%04X), len = %d", meta,meta, len)); javaType = Types.BINARY; value = binary; length = len; break; } - case LogEvent.MYSQL_TYPE_BOOL : - case LogEvent.MYSQL_TYPE_INVALID : + case LogEvent.MYSQL_TYPE_BOOL: + case LogEvent.MYSQL_TYPE_INVALID: default: - logger.error(String.format("!! Don't know how to handle column type=%d meta=%d (%04X)", - type, - meta, - meta)); + logger.error( + String.format("!! Don't know how to handle column type=%d meta=%d (%04X)", type, meta, meta)); javaType = Types.OTHER; value = null; length = 0; @@ -1148,16 +1123,12 @@ final Serializable fetchValue(String columnName, int columnIndex, int type, fina private void parseJsonFromFullValue(int len) { if (0 == len) { - // fixed issue #1 by lava, json column of zero length - // has no - // value, value parsing should be skipped + // fixed issue #1 by lava, json column of zero length has no value, value + // parsing should be skipped value = ""; } else { int position = buffer.position(); - Json_Value jsonValue = JsonConversion.parse_value(buffer.getUint8(), - buffer, - len - 1, - charset); + Json_Value jsonValue = JsonConversion.parse_value(buffer.getUint8(), buffer, len - 1, charset); StringBuilder builder = new StringBuilder(); jsonValue.toJsonString(builder, charset); value = builder.toString();