From 2cc2c36c6d0cfa02bf8b5cc962ab48700a217b43 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 9 Jan 2025 10:42:21 +0800 Subject: [PATCH] fix block --- .../org/apache/doris/flink/sink/HttpUtil.java | 1 + .../flink/sink/writer/DorisStreamLoad.java | 17 +++++++++++++++-- .../doris/flink/sink/writer/LabelGenerator.java | 8 ++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java index 53d3ce13b..d1600de69 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java @@ -60,6 +60,7 @@ protected boolean isRedirectable(String method) { return true; } }) + .setRetryHandler((exception, executionCount, context) -> false) .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE) .setDefaultRequestConfig( RequestConfig.custom() diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index f900f7418..d23434e83 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -115,6 +115,10 @@ public DorisStreamLoad( this.streamLoadProp = executionOptions.getStreamLoadProp(); this.enableDelete = executionOptions.getDeletable(); this.httpClient = httpClient; + String threadName = + String.format( + "stream-load-upload-%s-%s", + labelGenerator.getSubtaskId(), labelGenerator.getTableIdentifier()); this.executorService = new ThreadPoolExecutor( 1, @@ -122,7 +126,7 @@ public DorisStreamLoad( 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), - new ExecutorThreadFactory("stream-load-upload")); + new ExecutorThreadFactory(threadName)); this.recordStream = new RecordStream( executionOptions.getBufferSize(), @@ -347,11 +351,20 @@ public void startLoad(String label, boolean isResume) throws IOException { } else { executeMessage = "table " + table + " start execute load for label " + label; } + Thread currentThread = Thread.currentThread(); pendingLoadFuture = executorService.submit( () -> { LOG.info(executeMessage); - return httpClient.execute(putBuilder.build()); + try { + return httpClient.execute(putBuilder.build()); + } catch (Exception e) { + LOG.error("Failed to execute load, cause ", e); + // When an HTTP error occurs, the main thread should be + // interrupted to prevent blocking + currentThread.interrupt(); + throw e; + } }); } catch (Exception e) { String err; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java index d80315f55..84c14b793 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java @@ -91,4 +91,12 @@ public String getConcatLabelPrefix() { String concatPrefix = String.format("%s_%s_%s", labelPrefix, tableIdentifier, subtaskId); return concatPrefix; } + + public int getSubtaskId() { + return subtaskId; + } + + public String getTableIdentifier() { + return tableIdentifier; + } }