Skip to content

Commit

Permalink
add auto redirect and uniq default open 2pc
Browse files Browse the repository at this point in the history
  • Loading branch information
wudi committed Oct 7, 2023
1 parent 5ad474a commit 89891f5
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public class DorisConnectionOptions implements Serializable {
protected final String password;
protected String jdbcUrl;
protected String benodes;
/**
* Used to enable automatic redirection of fe,
* When it is not enabled, it will actively request the be list, and the polling will initiate a streamload request to be.
*/
protected boolean autoRedirect;

public DorisConnectionOptions(String fenodes, String username, String password) {
this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes is empty");
Expand All @@ -45,10 +50,11 @@ public DorisConnectionOptions(String fenodes, String username, String password,
}

public DorisConnectionOptions(String fenodes, String benodes, String username, String password,
String jdbcUrl) {
String jdbcUrl, boolean autoRedirect) {
this(fenodes, username, password);
this.benodes = benodes;
this.jdbcUrl = jdbcUrl;
this.autoRedirect = autoRedirect;
}

public String getFenodes() {
Expand All @@ -71,21 +77,31 @@ public String getJdbcUrl(){
return jdbcUrl;
}

public boolean isAutoRedirect() {
return autoRedirect;
}

/**
* Builder for {@link DorisConnectionOptions}.
*/
public static class DorisConnectionOptionsBuilder {
private String fenodes;
private String benodes;
private String username;
private String password;

private String jdbcUrl;
private boolean autoRedirect;

public DorisConnectionOptionsBuilder withFenodes(String fenodes) {
this.fenodes = fenodes;
return this;
}

public DorisConnectionOptionsBuilder withBenodes(String benodes) {
this.benodes = benodes;
return this;
}

public DorisConnectionOptionsBuilder withUsername(String username) {
this.username = username;
return this;
Expand All @@ -101,8 +117,13 @@ public DorisConnectionOptionsBuilder withJdbcUrl(String jdbcUrl) {
return this;
}

public DorisConnectionOptionsBuilder withAutoRedirect(boolean autoRedirect) {
this.autoRedirect = autoRedirect;
return this;
}

public DorisConnectionOptions build() {
return new DorisConnectionOptions(fenodes, username, password, jdbcUrl);
return new DorisConnectionOptions(fenodes, benodes, username, password, jdbcUrl, autoRedirect);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class DorisExecutionOptions implements Serializable {
*/
private final Properties streamLoadProp;
private final Boolean enableDelete;
private final Boolean enable2PC;
private Boolean enable2PC;
private boolean force2PC;

//batch mode param
private final int flushQueueSize;
Expand All @@ -73,7 +74,8 @@ public DorisExecutionOptions(int checkInterval,
int bufferFlushMaxRows,
int bufferFlushMaxBytes,
long bufferFlushIntervalMs,
boolean ignoreUpdateBefore) {
boolean ignoreUpdateBefore,
boolean force2PC) {
Preconditions.checkArgument(maxRetries >= 0);
this.checkInterval = checkInterval;
this.maxRetries = maxRetries;
Expand All @@ -84,6 +86,7 @@ public DorisExecutionOptions(int checkInterval,
this.streamLoadProp = streamLoadProp;
this.enableDelete = enableDelete;
this.enable2PC = enable2PC;
this.force2PC = force2PC;

this.enableBatchMode = enableBatchMode;
this.flushQueueSize = flushQueueSize;
Expand Down Expand Up @@ -176,6 +179,14 @@ public boolean getIgnoreUpdateBefore(){
return ignoreUpdateBefore;
}

public void setEnable2PC(Boolean enable2PC) {
this.enable2PC = enable2PC;
}

public boolean force2PC() {
return force2PC;
}

/**
* Builder of {@link DorisExecutionOptions}.
*/
Expand All @@ -190,6 +201,9 @@ public static class Builder {
private boolean enableDelete = true;
private boolean enable2PC = true;

//A flag used to determine whether to forcibly open 2pc. By default, the uniq model close 2pc.
private boolean force2PC = false;

private int flushQueueSize = DEFAULT_FLUSH_QUEUE_SIZE;
private int bufferFlushMaxRows = DEFAULT_BUFFER_FLUSH_MAX_ROWS;
private int bufferFlushMaxBytes = DEFAULT_BUFFER_FLUSH_MAX_BYTES;
Expand Down Expand Up @@ -244,6 +258,13 @@ public Builder disable2PC() {
return this;
}

public Builder enable2PC() {
this.enable2PC = true;
//Force open 2pc
this.force2PC = true;
return this;
}

public Builder enableBatchMode() {
this.enableBatchMode = true;
return this;
Expand Down Expand Up @@ -278,7 +299,7 @@ public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) {
public DorisExecutionOptions build() {
return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, useCache,
streamLoadProp, enableDelete, enable2PC, enableBatchMode, flushQueueSize, bufferFlushMaxRows,
bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore);
bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore, force2PC);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public DorisOptions(String fenodes, String username, String password, String tab
}

public DorisOptions(String fenodes, String beNodes, String username, String password,
String tableIdentifier, String jdbcUrl) {
super(fenodes, beNodes, username, password, jdbcUrl);
String tableIdentifier, String jdbcUrl, boolean redirect) {
super(fenodes, beNodes, username, password, jdbcUrl, redirect);
this.tableIdentifier = tableIdentifier;
}

Expand All @@ -70,6 +70,7 @@ public static class Builder {
private String jdbcUrl;
private String username;
private String password;
private boolean autoRedirect;
private String tableIdentifier;

/**
Expand Down Expand Up @@ -120,10 +121,15 @@ public Builder setJdbcUrl(String jdbcUrl) {
return this;
}

public Builder setAutoRedirect(boolean autoRedirect) {
this.autoRedirect = autoRedirect;
return this;
}

public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
return new DorisOptions(fenodes, benodes, username, password, tableIdentifier, jdbcUrl);
return new DorisOptions(fenodes, benodes, username, password, tableIdentifier, jdbcUrl, autoRedirect);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

public class ConnectedFailedException extends DorisRuntimeException {
public ConnectedFailedException(String server, Throwable cause) {
super("Connect to " + server + "failed.", cause);
super("Connect to " + server + " failed.", cause);
}

public ConnectedFailedException(String server, int statusCode, Throwable cause) {
super("Connect to " + server + "failed, status code is " + statusCode + ".", cause);
super("Connect to " + server + " failed, status code is " + statusCode + ".", cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ static List<BackendRow> parseBackend(String response, Logger logger) throws Dori
public static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) {
String feNodes = options.getFenodes();
List<String> feNodeList = allEndpoints(feNodes, logger);

if(options.isAutoRedirect() && !feNodeList.isEmpty()){
return convert(feNodeList);
}

for (String feNode: feNodeList) {
try {
String beUrl = "http://" + feNode + BACKENDS_V2;
Expand All @@ -373,6 +378,21 @@ public static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, D
throw new DorisRuntimeException(errMsg);
}

/**
* When the user turns on redirection,
* there is no need to explicitly obtain the be list, just treat the fe list as the be list.
* @param feNodeList
* @return
*/
private static List<BackendV2.BackendRowV2> convert(List<String> feNodeList){
List<BackendV2.BackendRowV2> nodeList = new ArrayList<>();
for(String node : feNodeList){
String[] split = node.split(":");
nodeList.add(BackendV2.BackendRowV2.of(split[0], Integer.valueOf(split[1]), true));
}
return nodeList;
}

static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) {
ObjectMapper mapper = new ObjectMapper();
BackendV2 backend;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,13 @@ public String toBackendString(){
return ip + ":" + httpPort;
}

public static BackendRowV2 of(String ip, int httpPort, boolean alive){
BackendRowV2 rowV2 = new BackendRowV2();
rowV2.setIp(ip);
rowV2.setHttpPort(httpPort);
rowV2.setAlive(alive);
return rowV2;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.committer.DorisCommitter;
import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
import org.apache.doris.flink.sink.writer.DorisWriter;
Expand All @@ -31,6 +32,8 @@
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
Expand All @@ -43,6 +46,7 @@
*/
public class DorisSink<IN> implements Sink<IN, DorisCommittable, DorisWriterState, DorisCommittable> {

private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class);
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
private final DorisExecutionOptions dorisExecutionOptions;
Expand All @@ -56,6 +60,18 @@ public DorisSink(DorisOptions dorisOptions,
this.dorisReadOptions = dorisReadOptions;
this.dorisExecutionOptions = dorisExecutionOptions;
this.serializer = serializer;
checkKeyType();
}

/**
* The uniq model has 2pc close by default unless 2pc is forced open
*/
private void checkKeyType() {
if (dorisExecutionOptions.enabled2PC()
&& !dorisExecutionOptions.force2PC()
&& RestService.isUniqueKeyType(dorisOptions, dorisReadOptions, LOG)){
dorisExecutionOptions.setEnable2PC(false);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public class DorisConfigOptions {
public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the doris table name.");
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the doris user name.");
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the doris password.");

public static final ConfigOption<String> JDBC_URL = ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris jdbc url address.");
public static final ConfigOption<Boolean> AUTO_REDIRECT = ConfigOptions.key("auto-redirect").booleanType().defaultValue(false).withDescription("Use automatic redirection of fe without explicitly obtaining the be list");

// source config options
public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
Expand Down Expand Up @@ -176,7 +176,7 @@ public class DorisConfigOptions {
public static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions
.key("sink.buffer-size")
.intType()
.defaultValue(256 * 1024)
.defaultValue(1024 * 1024)
.withDescription("the buffer size to cache data for stream load.");
public static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions
.key("sink.buffer-count")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;

import static org.apache.doris.flink.table.DorisConfigOptions.BENODES;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -38,6 +36,8 @@
import java.util.Properties;
import java.util.Set;

import static org.apache.doris.flink.table.DorisConfigOptions.AUTO_REDIRECT;
import static org.apache.doris.flink.table.DorisConfigOptions.BENODES;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
Expand Down Expand Up @@ -179,6 +179,7 @@ private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
final DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(fenodes)
.setBenodes(benodes)
.setAutoRedirect(readableConfig.get(AUTO_REDIRECT))
.setJdbcUrl(readableConfig.get(JDBC_URL))
.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));

Expand Down Expand Up @@ -214,8 +215,12 @@ private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableCo
builder.setStreamLoadProp(streamLoadProp);
builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE));
builder.setIgnoreUpdateBefore(readableConfig.get(SINK_IGNORE_UPDATE_BEFORE));

if (!readableConfig.get(SINK_ENABLE_2PC)) {
builder.disable2PC();
} else if (readableConfig.getOptional(SINK_ENABLE_2PC).isPresent()){
//force open 2pc
builder.enable2PC();
}

if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void build() throws Exception {

private DorisConnectionOptions getDorisConnectionOptions() {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL);
Expand All @@ -145,6 +146,7 @@ private DorisConnectionOptions getDorisConnectionOptions() {
Preconditions.checkNotNull(jdbcUrl, "jdbcurl is empty in sink-conf");
DorisConnectionOptions.DorisConnectionOptionsBuilder builder = new DorisConnectionOptions.DorisConnectionOptionsBuilder()
.withFenodes(fenodes)
.withBenodes(benodes)
.withUsername(user)
.withPassword(passwd)
.withJdbcUrl(jdbcUrl);
Expand All @@ -168,6 +170,7 @@ public DorisSink<String> buildDorisSink(String table) {
.setTableIdentifier(database + "." + table)
.setUsername(user)
.setPassword(passwd);
sinkConfig.getOptional(DorisConfigOptions.AUTO_REDIRECT).ifPresent(dorisBuilder::setAutoRedirect);

Properties pro = new Properties();
//default json data format
Expand Down

0 comments on commit 89891f5

Please sign in to comment.