Skip to content

Commit

Permalink
Updated to avoid using a separate a listener executor that requires b…
Browse files Browse the repository at this point in the history
…eing shutdown (#1980)
  • Loading branch information
ivakegg authored Jun 2, 2023
1 parent 56bda12 commit bcf2b1a
Showing 1 changed file with 10 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@

public class BatchScannerSession extends ScannerSession implements Iterator<Entry<Key,Value>>, FutureCallback<Scan>, SessionArbiter, UncaughtExceptionHandler {

private static final int THIRTY_MINUTES = 108000000;

private static final double RANGE_MULTIPLIER = 5;

private static final double QUEUE_MULTIPLIER = 25;
Expand All @@ -75,8 +73,6 @@ public class BatchScannerSession extends ScannerSession implements Iterator<Entr

protected ExecutorService service = null;

ExecutorService listenerService = null;

protected StringBuilder threadId = new StringBuilder();

protected List<Function<ScannerChunk,ScannerChunk>> visitorFunctions = Lists.newArrayList();
Expand Down Expand Up @@ -166,10 +162,8 @@ public BatchScannerSession(String tableName, Set<Authorizations> auths, Resource
currentBatch = Queues.newLinkedBlockingDeque();

setThreads(1);

listenerService = Executors.newFixedThreadPool(1);

addListener(new BatchScannerListener(), listenerService);

addListener(new BatchScannerListener(), MoreExecutors.newDirectExecutorService());

serverFailureMap = Maps.newConcurrentMap();

Expand Down Expand Up @@ -370,26 +364,26 @@ protected void submitTasks(List<ScannerChunk> newChunks) {

chunk.setQueryId(settings.getId().toString());

scan = new SpeculativeScan(localTableName, localAuths, chunk, delegatorReference, delegatedResourceInitializer, resultQueue, listenerService);
scan = new SpeculativeScan(localTableName, localAuths, chunk, delegatorReference, delegatedResourceInitializer, resultQueue, service);

scan.setVisitors(visitorFunctions);

Scan childScan = new Scan(localTableName, localAuths, new ScannerChunk(chunk), delegatorReference, BatchResource.class,
((SpeculativeScan) scan).getQueue(), listenerService);
((SpeculativeScan) scan).getQueue(), service);

childScan.setVisitors(visitorFunctions);

((SpeculativeScan) scan).addScan(childScan);

childScan = new Scan(localTableName, localAuths, new ScannerChunk(chunk), delegatorReference, delegatedResourceInitializer,
((SpeculativeScan) scan).getQueue(), listenerService);
((SpeculativeScan) scan).getQueue(), service);

childScan.setVisitors(visitorFunctions);

((SpeculativeScan) scan).addScan(childScan);

} else {
scan = new Scan(localTableName, localAuths, chunk, delegatorReference, delegatedResourceInitializer, resultQueue, listenerService);
scan = new Scan(localTableName, localAuths, chunk, delegatorReference, delegatedResourceInitializer, resultQueue, service);
}

if (backoffEnabled) {
Expand Down Expand Up @@ -430,16 +424,16 @@ protected void submitTasks() {
if (log.isTraceEnabled()) {
log.trace("Using speculative execution");
}
scan = new SpeculativeScan(localTableName, localAuths, chunk, delegatorReference, delegatedResourceInitializer, resultQueue, listenerService);
scan = new SpeculativeScan(localTableName, localAuths, chunk, delegatorReference, delegatedResourceInitializer, resultQueue, service);

((SpeculativeScan) scan).addScan(new Scan(localTableName, localAuths, new ScannerChunk(chunk), delegatorReference, BatchResource.class,
((SpeculativeScan) scan).getQueue(), listenerService));
((SpeculativeScan) scan).getQueue(), service));

((SpeculativeScan) scan).addScan(new Scan(localTableName, localAuths, new ScannerChunk(chunk), delegatorReference,
delegatedResourceInitializer, ((SpeculativeScan) scan).getQueue(), listenerService));
delegatedResourceInitializer, ((SpeculativeScan) scan).getQueue(), service));

} else {
scan = new Scan(localTableName, localAuths, chunk, delegatorReference, delegatedResourceInitializer, resultQueue, listenerService);
scan = new Scan(localTableName, localAuths, chunk, delegatorReference, delegatedResourceInitializer, resultQueue, service);
}

if (backoffEnabled) {
Expand Down Expand Up @@ -645,7 +639,6 @@ public void failed(State from, Throwable failure) {
*/
protected void shutdownServices() {
service.shutdownNow();
listenerService.shutdownNow();
int count = 0;
try {
while (!service.awaitTermination(250, TimeUnit.MILLISECONDS) && count < MAX_WAIT) {
Expand All @@ -665,7 +658,6 @@ protected void shutdownServices() {
public void close() {
stopAsync();
service.shutdownNow();
listenerService.shutdownNow();
}

public void addVisitor(Function<ScannerChunk,ScannerChunk> visitorFunction) {
Expand Down

0 comments on commit bcf2b1a

Please sign in to comment.