Skip to content

Commit

Permalink
improve commit strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
wudi committed Oct 17, 2023
1 parent a4cfd15 commit 2a42adc
Showing 1 changed file with 45 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.doris.flink.sink.committer;

import org.apache.flink.api.connector.sink.Committer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisOptions;
Expand All @@ -29,7 +27,10 @@
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.ResponseUtil;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
Expand All @@ -41,7 +42,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.doris.flink.sink.LoadStatus.FAIL;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;

/**
* The committer to commit transaction.
Expand Down Expand Up @@ -74,49 +75,51 @@ public List<DorisCommittable> commit(List<DorisCommittable> committableList) thr
}

private void commitTransaction(DorisCommittable committable) throws IOException {
int statusCode = -1;
String reasonPhrase = null;
int retry = 0;
//basic params
HttpPutBuilder builder = new HttpPutBuilder()
.addCommonHeader()
.baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword())
.addTxnId(committable.getTxnID())
.commit();

//hostPort
String hostPort = committable.getHostPort();
CloseableHttpResponse response = null;
while (retry++ < maxRetry) {
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder.setUrl(String.format(commitPattern, hostPort, committable.getDb()))
.baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword())
.addCommonHeader()
.addTxnId(committable.getTxnID())
.setEmptyEntity()
.commit();
try {
response = httpClient.execute(putBuilder.build());

LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
int retry = 0;
while (retry++ <= maxRetry) {
//get latest-url
String url = String.format(commitPattern, hostPort, committable.getDb());
HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();

// http execute...
try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
StatusLine statusLine = response.getStatusLine();
if (201 == statusLine.getStatusCode()) {
if (response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = new ObjectMapper().readValue(loadResult, new TypeReference<HashMap<String, String>>() {
});
if (!res.get("status").equals(SUCCESS) && !ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisRuntimeException("Commit failed " + loadResult);
} else {
LOG.info("load result {}", loadResult);
}
}
return;
}
String reasonPhrase = statusLine.getReasonPhrase();
LOG.warn("commit failed with {}, reason {}", hostPort, reasonPhrase);
if (retry == maxRetry) {
throw new DorisRuntimeException("stream load error: " + reasonPhrase);
}
hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG);
} catch (IOException e) {
LOG.error("commit transaction failed: ", e);
if (retry == maxRetry) {
throw new IOException("commit transaction failed: {}", e);
}
hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG);
continue;
}
statusCode = response.getStatusLine().getStatusCode();
reasonPhrase = response.getStatusLine().getReasonPhrase();
if (statusCode != 200) {
LOG.warn("commit failed with {}, reason {}", hostPort, reasonPhrase);
hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG);
} else {
break;
}
}

if (statusCode != 200) {
throw new DorisRuntimeException("stream load error: " + reasonPhrase);
}

ObjectMapper mapper = new ObjectMapper();
if (response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
});
if (res.get("status").equals(FAIL) && !ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisRuntimeException("Commit failed " + loadResult);
} else {
LOG.info("load result {}", loadResult);
}
}
}
Expand Down

0 comments on commit 2a42adc

Please sign in to comment.