Skip to content

Commit

Permalink
[Improve](cdc) support partition table for auto create table (apache#520
Browse files Browse the repository at this point in the history
)
  • Loading branch information
JNSimba authored Dec 2, 2024
1 parent ba0f97a commit 35a9a85
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.doris.flink.exception.CreateTableException;
import org.apache.doris.flink.tools.cdc.DorisTableConfig;

Expand Down Expand Up @@ -63,6 +64,10 @@ public static TableSchema createTableSchema(
tableSchema.setProperties(dorisTableConfig.getTableProperties());
tableSchema.setTableBuckets(
parseTableSchemaBuckets(dorisTableConfig.getTableBuckets(), table));
if (ObjectUtils.isNotEmpty(dorisTableConfig.getTablePartitions())
&& dorisTableConfig.getTablePartitions().containsKey(table)) {
tableSchema.setPartitionInfo(dorisTableConfig.getTablePartitions().get(table));
}
}
return tableSchema;
}
Expand Down Expand Up @@ -123,32 +128,60 @@ public static String generateCreateTableDDL(TableSchema schema) {
throw new CreateTableException("key " + key + " not found in column list");
}
FieldSchema field = fields.get(key);
buildColumn(sb, field, true);
buildColumn(sb, field, true, false);
}

// append partition column, auto partition column must be in keys
if (schema.getPartitionInfo() != null) {
String partitionCol = schema.getPartitionInfo().f0;
FieldSchema field = fields.get(partitionCol);
buildColumn(sb, field, true, true);
}

// append values
for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
// skip key column
if (keys.contains(entry.getKey())) {
continue;
}
// skip partition column
if (schema.getPartitionInfo() != null
&& entry.getKey().equals(schema.getPartitionInfo().f0)) {
continue;
}
FieldSchema field = entry.getValue();
buildColumn(sb, field, false);
buildColumn(sb, field, false, false);
}
sb = sb.deleteCharAt(sb.length() - 1);
sb.append(" ) ");
// append uniq model
if (DataModel.UNIQUE.equals(schema.getModel())) {
sb.append(schema.getModel().name())
.append(" KEY(")
.append(String.join(",", identifier(schema.getKeys())))
.append(")");
.append(String.join(",", identifier(schema.getKeys())));

if (schema.getPartitionInfo() != null) {
sb.append(",").append(identifier(schema.getPartitionInfo().f0));
}

sb.append(")");
}

// append table comment
if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) {
sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' ");
}

// append partition info if exists
if (schema.getPartitionInfo() != null) {
sb.append(" AUTO PARTITION BY RANGE ")
.append(
String.format(
"(date_trunc(`%s`, '%s'))",
schema.getPartitionInfo().f0, schema.getPartitionInfo().f1))
.append("()");
}

// append distribute key
sb.append(" DISTRIBUTED BY HASH(")
.append(String.join(",", identifier(schema.getDistributeKeys())))
Expand All @@ -165,6 +198,7 @@ public static String generateCreateTableDDL(TableSchema schema) {
} else {
sb.append(" BUCKETS AUTO ");
}

// append properties
int index = 0;
for (Map.Entry<String, String> entry : properties.entrySet()) {
Expand All @@ -186,13 +220,19 @@ public static String generateCreateTableDDL(TableSchema schema) {
return sb.toString();
}

private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) {
private static void buildColumn(
StringBuilder sql, FieldSchema field, boolean isKey, boolean autoPartitionCol) {
String fieldType = field.getTypeString();
if (isKey && DorisType.STRING.equals(fieldType)) {
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
}
sql.append(identifier(field.getName())).append(" ").append(fieldType);

// auto partition need set partition-column not null
if (autoPartitionCol) {
sql.append(" NOT NULL ");
}

if (field.getDefaultValue() != null) {
sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.flink.catalog.doris;

import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -32,9 +34,11 @@ public class TableSchema {
private DataModel model = DataModel.DUPLICATE;
private List<String> distributeKeys = new ArrayList<>();
private Map<String, String> properties = new HashMap<>();

private Integer tableBuckets;

// Currently only supports auto partition, eg: DATE_TRUNC(column,interval)
private Tuple2<String, String> partitionInfo;

public String getDatabase() {
return database;
}
Expand Down Expand Up @@ -107,6 +111,14 @@ public Integer getTableBuckets() {
return tableBuckets;
}

public Tuple2<String, String> getPartitionInfo() {
return partitionInfo;
}

public void setPartitionInfo(Tuple2<String, String> partitionInfo) {
this.partitionInfo = partitionInfo;
}

@Override
public String toString() {
return "TableSchema{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,12 @@ private static void syncDatabase(
.setIgnoreIncompatible(ignoreIncompatible)
.setSchemaChangeMode(schemaChangeMode)
.create();
databaseSync.build();

boolean needExecute = databaseSync.build();
if (!needExecute) {
// create table only
return;
}
if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
jobName =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void create() {
this.converter = new TableNameConverter(tablePrefix, tableSuffix, multiToOneRulesPattern);
}

public void build() throws Exception {
public boolean build() throws Exception {
DorisConnectionOptions options = getDorisConnectionOptions();
DorisSystem dorisSystem = new DorisSystem(options);

Expand Down Expand Up @@ -156,7 +156,7 @@ public void build() throws Exception {
}
if (createTableOnly) {
System.out.println("Create table finished.");
System.exit(0);
return false;
}
LOG.info("table mapping: {}", tableMapping);
config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
Expand All @@ -181,6 +181,7 @@ public void build() throws Exception {
.uid(uidName);
}
}
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.tools.cdc;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;

import java.io.Serializable;
import java.util.HashMap;
Expand All @@ -30,11 +31,14 @@ public class DorisTableConfig implements Serializable {
// PROPERTIES parameter in doris table creation statement. such as: replication_num=1.
public static final String REPLICATION_NUM = "replication_num";
public static final String TABLE_BUCKETS = "table-buckets";
public static final String TABLE_PARTITIONS = "table-partitions";

private final Map<String, String> tableProperties;
// The specific parameters extracted from --table-conf need to be parsed and integrated into the
// doris table creation statement. such as: table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50".
private Map<String, Integer> tableBuckets;
// table:partitionColumn:interval
private Map<String, Tuple2<String, String>> tablePartitions;

// Only for testing
@VisibleForTesting
Expand All @@ -55,6 +59,11 @@ public DorisTableConfig(Map<String, String> tableConfig) {
this.tableBuckets = buildTableBucketMap(tableConfig.get(TABLE_BUCKETS));
tableConfig.remove(TABLE_BUCKETS);
}
if (tableConfig.containsKey(TABLE_PARTITIONS)) {
this.tablePartitions = buildTablePartitionMap(tableConfig.get(TABLE_PARTITIONS));
tableConfig.remove(TABLE_PARTITIONS);
}

tableProperties = tableConfig;
}

Expand All @@ -66,6 +75,10 @@ public Map<String, String> getTableProperties() {
return tableProperties;
}

public Map<String, Tuple2<String, String>> getTablePartitions() {
return tablePartitions;
}

/**
* Build table bucket Map.
*
Expand All @@ -83,4 +96,23 @@ public Map<String, Integer> buildTableBucketMap(String tableBuckets) {
}
return tableBucketsMap;
}

/**
* Build table partition Map.
*
* @param tablePartitions the string of tablePartitions,
* eg:tbl1:dt_column:month,tb2:dt_column:day
* @return The table name and buckets map. The key is table name, the value is partition column
* and interval.
*/
@VisibleForTesting
public Map<String, Tuple2<String, String>> buildTablePartitionMap(String tablePartitions) {
Map<String, Tuple2<String, String>> tablePartitionMap = new LinkedHashMap<>();
String[] tablePartitionArray = tablePartitions.split(",");
for (String tablePartition : tablePartitionArray) {
String[] tp = tablePartition.split(":");
tablePartitionMap.put(tp[0].trim(), Tuple2.of(tp[1].trim(), tp[2].trim()));
}
return tablePartitionMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -95,6 +96,9 @@ protected void submitE2EJob(String jobName, String[] args) {
LOG.info("{} e2e job will submit to start. ", jobName);
CdcTools.setStreamExecutionEnvironmentForTesting(configFlinkEnvironment());
CdcTools.main(args);
if (Arrays.asList(args).contains("--create-table-only")) {
return;
}
jobClient = CdcTools.getJobClient();
if (Objects.isNull(jobClient)) {
LOG.warn("Failed get flink job client. jobName={}", jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.apache.doris.flink.container.ContainerUtils;
import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -381,6 +384,58 @@ public void testMySQL2DorisEnableDelete() throws Exception {
cancelE2EJob(jobName);
}

@Test
public void testMySQL2DorisCreateTableOnly() throws Exception {
String jobName = "testMySQL2DorisCreateTableOnly";
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql");
startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt");

String createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_uniq");
Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`id`)"));
Assert.assertTrue(createTblSQL.contains("BUCKETS 10"));

createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_dup");
Assert.assertTrue(createTblSQL.contains("DUPLICATE KEY(`id`, `name`)"));
Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO"));

createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_from_uniqindex");
Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`name`)"));
Assert.assertTrue(createTblSQL.contains("BUCKETS 30"));

createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_from_uniqindex2");
Assert.assertTrue(
createTblSQL.contains("UNIQUE KEY(`id`, `name`)")
|| createTblSQL.contains("UNIQUE KEY(`id`, `age`)"));
Assert.assertTrue(createTblSQL.contains("BUCKETS 30"));

createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_from_multiindex");
Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`id`)"));
Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO"));

/*
The auto partition behavior of doris 2.1.0 to 2.1.4 has changed, temporarily skipped
createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_part_uniq");
Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`id`, `create_dtime`)"));
Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO"));
createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_part_dup");
Assert.assertTrue(createTblSQL.contains("DUPLICATE KEY(`id`, `create_dtime`, `name`)"));
Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO"));
*/
}

private String getCreateTableSQL(String database, String table) throws Exception {
Statement statement = getDorisQueryConnection().createStatement();
ResultSet resultSet =
statement.executeQuery(String.format("SHOW CREATE TABLE %s.%s", database, table));
while (resultSet.next()) {
String createTblSql = resultSet.getString(2);
LOG.info("Create table sql: {}", createTblSql.replace("\n", ""));
return createTblSql;
}
throw new RuntimeException("Table not exist " + table);
}

@After
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.flink.tools.cdc;

import org.apache.flink.api.java.tuple.Tuple2;

import org.junit.Before;
import org.junit.Test;

Expand All @@ -43,4 +45,14 @@ public void buildTableBucketMapTest() {
assertEquals(40, tableBucketsMap.get("b.*").intValue());
assertEquals(50, tableBucketsMap.get(".*").intValue());
}

@Test
public void buildTablePartitionMapTest() {
String tablePartitions = "tbl1:dt_col_d:day,tbl2:dt_col_w:week,tbl3:dt_col_m:month";
Map<String, Tuple2<String, String>> tablePartitionMap =
dorisTableConfig.buildTablePartitionMap(tablePartitions);
assertEquals(Tuple2.of("dt_col_d", "day"), tablePartitionMap.get("tbl1"));
assertEquals(Tuple2.of("dt_col_w", "week"), tablePartitionMap.get("tbl2"));
assertEquals(Tuple2.of("dt_col_m", "month"), tablePartitionMap.get("tbl3"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mysql-sync-database
--including-tables "create_tbl_.*"
--create-table-only
--table-conf table-buckets=create_tbl_uniq:10,create_tbl_from_uniqindex.*:30
--table-conf replication_num=1
Loading

0 comments on commit 35a9a85

Please sign in to comment.