Skip to content

Commit

Permalink
[improve](cdc) add config for uniq index to primary key with create t…
Browse files Browse the repository at this point in the history
…able (apache#524)
  • Loading branch information
JNSimba authored Dec 6, 2024
1 parent fad3c4c commit b7b5802
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,11 @@ public void setTableSchemaBuckets(
private void tryCreateTableIfAbsent(
DorisSystem dorisSystem, String targetDb, String dorisTable, SourceSchema schema) {
if (!dorisSystem.tableExists(targetDb, dorisTable)) {
if (dorisTableConfig.isConvertUniqToPk()
&& CollectionUtil.isNullOrEmpty(schema.primaryKeys)
&& !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) {
schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs);
}
TableSchema dorisSchema =
DorisSchemaFactory.createTableSchema(
database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ public class DorisTableConfig implements Serializable {
public static final String REPLICATION_NUM = "replication_num";
public static final String TABLE_BUCKETS = "table-buckets";
public static final String TABLE_PARTITIONS = "table-partitions";
public static final String CONVERT_UNIQ_TO_PK = "convert-uniq-to-pk";

private final Map<String, String> tableProperties;
// The specific parameters extracted from --table-conf need to be parsed and integrated into the
// doris table creation statement. such as: table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50".
private Map<String, Integer> tableBuckets;
// table:partitionColumn:interval
private Map<String, Tuple2<String, String>> tablePartitions;
// uniq index to primary key
private boolean convertUniqToPk = false;

// Only for testing
@VisibleForTesting
Expand All @@ -64,6 +67,11 @@ public DorisTableConfig(Map<String, String> tableConfig) {
tableConfig.remove(TABLE_PARTITIONS);
}

if (tableConfig.containsKey(CONVERT_UNIQ_TO_PK)) {
this.convertUniqToPk = Boolean.parseBoolean(tableConfig.get(CONVERT_UNIQ_TO_PK));
tableConfig.remove(CONVERT_UNIQ_TO_PK);
}

tableProperties = tableConfig;
}

Expand All @@ -79,6 +87,10 @@ public Map<String, Tuple2<String, String>> getTablePartitions() {
return tablePartitions;
}

public boolean isConvertUniqToPk() {
return convertUniqToPk;
}

/**
* Build table bucket Map.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ public JdbcSourceSchema(
super(databaseName, schemaName, tableName, tableComment);
fields = getColumnInfo(metaData, databaseName, schemaName, tableName);
primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName, tableName);
if (primaryKeys.isEmpty()) {
List<String> uniqIndex = getUniqIndex(metaData, databaseName, schemaName, tableName);
primaryKeys.addAll(uniqIndex);
}
uniqueIndexs = getUniqIndex(metaData, databaseName, schemaName, tableName);
}

public LinkedHashMap<String, FieldSchema> getColumnInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public abstract class SourceSchema {
protected final String tableComment;
protected LinkedHashMap<String, FieldSchema> fields;
public List<String> primaryKeys;
public List<String> uniqueIndexs;
public DataModel model = DataModel.UNIQUE;

public SourceSchema(
Expand Down Expand Up @@ -64,7 +65,6 @@ public static String getString(String databaseName, String schemaName, String ta
if (!StringUtils.isNullOrWhitespaceOnly(schemaName)) {
identifier.add(schemaName);
}

if (!StringUtils.isNullOrWhitespaceOnly(tableName)) {
identifier.add(tableName);
}
Expand Down Expand Up @@ -115,6 +115,10 @@ public List<String> getPrimaryKeys() {
return primaryKeys;
}

public List<String> getUniqueIndexs() {
return uniqueIndexs;
}

public String getTableComment() {
return tableComment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ mysql-sync-database
--including-tables "create_tbl_.*"
--create-table-only
--table-conf table-buckets=create_tbl_uniq:10,create_tbl_from_uniqindex.*:30
--table-conf replication_num=1
--table-conf replication_num=1
--table-conf convert-uniq-to-pk=true

0 comments on commit b7b5802

Please sign in to comment.