Skip to content

Commit

Permalink
improve check commit success
Browse files Browse the repository at this point in the history
  • Loading branch information
wudi committed Oct 17, 2023
1 parent 84b9bda commit accf4f8
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.doris.flink.sink.committer;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.connector.sink.Committer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
Expand All @@ -31,6 +29,7 @@
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.ResponseUtil;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
Expand All @@ -45,7 +44,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.doris.flink.sink.LoadStatus.FAIL;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;

/**
* The committer to commit transaction.
Expand Down Expand Up @@ -109,7 +108,7 @@ private void commitTransaction(DorisCommittable committable) throws IOException
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
});
if (res.get("status").equals(FAIL) && !ResponseUtil.isCommitted(res.get("msg"))) {
if (!res.get("status").equals(SUCCESS) && !ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisRuntimeException("Commit failed " + loadResult);
} else {
LOG.info("load result {}", loadResult);
Expand Down

0 comments on commit accf4f8

Please sign in to comment.