Skip to content

Commit

Permalink
fix block
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Jan 9, 2025
1 parent df088ba commit 2cc2c36
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ protected boolean isRedirectable(String method) {
return true;
}
})
.setRetryHandler((exception, executionCount, context) -> false)
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
.setDefaultRequestConfig(
RequestConfig.custom()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,18 @@ public DorisStreamLoad(
this.streamLoadProp = executionOptions.getStreamLoadProp();
this.enableDelete = executionOptions.getDeletable();
this.httpClient = httpClient;
String threadName =
String.format(
"stream-load-upload-%s-%s",
labelGenerator.getSubtaskId(), labelGenerator.getTableIdentifier());
this.executorService =
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new ExecutorThreadFactory("stream-load-upload"));
new ExecutorThreadFactory(threadName));
this.recordStream =
new RecordStream(
executionOptions.getBufferSize(),
Expand Down Expand Up @@ -347,11 +351,20 @@ public void startLoad(String label, boolean isResume) throws IOException {
} else {
executeMessage = "table " + table + " start execute load for label " + label;
}
Thread currentThread = Thread.currentThread();
pendingLoadFuture =
executorService.submit(
() -> {
LOG.info(executeMessage);
return httpClient.execute(putBuilder.build());
try {
return httpClient.execute(putBuilder.build());
} catch (Exception e) {
LOG.error("Failed to execute load, cause ", e);
// When an HTTP error occurs, the main thread should be
// interrupted to prevent blocking
currentThread.interrupt();
throw e;
}
});
} catch (Exception e) {
String err;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,12 @@ public String getConcatLabelPrefix() {
String concatPrefix = String.format("%s_%s_%s", labelPrefix, tableIdentifier, subtaskId);
return concatPrefix;
}

public int getSubtaskId() {
return subtaskId;
}

public String getTableIdentifier() {
return tableIdentifier;
}
}

0 comments on commit 2cc2c36

Please sign in to comment.