From 0c4a50f1a0e0c174d61b9c117c05f2e7b07c53af Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 3 Jan 2025 16:05:09 +0800 Subject: [PATCH] improve some log --- .../doris/flink/sink/committer/DorisCommitter.java | 12 ++++++++---- .../doris/flink/sink/writer/DorisStreamLoad.java | 5 ++++- .../apache/doris/flink/sink/writer/DorisWriter.java | 1 - 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index e73d96cde..b1a700592 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -128,12 +128,16 @@ private void commitTransaction(DorisCommittable committable) throws IOException jsonMapper.readValue( loadResult, new TypeReference>() {}); - if (!res.get("status").equals(SUCCESS) - && !ResponseUtil.isCommitted(res.get("msg"))) { + if (res.get("status").equals(SUCCESS)) { + LOG.info("load result {}", loadResult); + } else if (ResponseUtil.isCommitted(res.get("msg"))) { + LOG.info( + "transaction {} has already committed successfully, skipping, load response is {}", + committable.getTxnID(), + res.get("msg")); + } else { throw new DorisRuntimeException( "commit transaction failed " + loadResult); - } else { - LOG.info("load result {}", loadResult); } return; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 02c2df494..f900f7418 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -391,7 +391,10 @@ public void abortTransaction(long txnID) throws Exception { String msg = res.get("msg"); // transaction already aborted if (msg != null && ResponseUtil.isAborted(msg)) { - LOG.warn("Failed to abort transaction, {}", msg); + LOG.info( + "transaction {} may have already been successfully aborted, skipping, abort response is {}", + txnID, + msg); return; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index fdb797f97..8e28213d8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -458,7 +458,6 @@ public void close() throws Exception { if (scheduledExecutorService != null) { scheduledExecutorService.shutdownNow(); } - LOG.info("Try to abort txn before closing."); abortPossibleSuccessfulTransaction(); if (dorisStreamLoadMap != null && !dorisStreamLoadMap.isEmpty()) {