Skip to content

Commit

Permalink
OF-2505: BOSH: add back-pressure to network IO when processing can't …
Browse files Browse the repository at this point in the history
…keep up

For BOSH and websocket connections, this commit introduces a new feature (disabled by default) that allows Openfire to be configured to apply 'back pressure' that could be useful in a scenario where the network IO thread pool is outpacing the processing thread pool.

By enabling the feature using `xmpp.httpbind.worker.backpressure-enabled` (and setting the queue to a limited value with `xmpp.httpbind.worker.queue-capacity`, the queuing of new tasks by the network IO thread pool blocks when the queue is at capacity.
  • Loading branch information
guusdk committed Sep 13, 2022
1 parent c16aa2b commit e272048
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import org.jivesoftware.openfire.mbean.ThreadPoolExecutorDelegate;
import org.jivesoftware.openfire.mbean.ThreadPoolExecutorDelegateMBean;
import org.jivesoftware.openfire.session.ConnectionSettings;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.NamedThreadFactory;
import org.jivesoftware.util.SystemProperty;
import org.jivesoftware.util.TaskEngine;
import org.jivesoftware.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,10 +41,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

/**
* Manages sessions for all users connecting to Openfire using the HTTP binding protocol,
Expand Down Expand Up @@ -95,9 +89,17 @@ public void start() {

this.sessionManager = SessionManager.getInstance();

final BlockingQueue<Runnable> queue;
if (BACKPRESSURE_ENABLED.getValue()) {
// Do _not_ use ThreadPoolExecutor.CallerRunsPolicy, as that can mess up the order in which stanzas are processed!
queue = new BlocksOnOfferQueue<>(QUEUE_CAPACITY.getValue());
}
else {
queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY.getValue());
}

stanzaWorkerPool = new ThreadPoolExecutor(MIN_POOL_SIZE.getValue(), MAX_POOL_SIZE.getValue(), POOL_KEEP_ALIVE.getValue().getSeconds(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), // unbounded task queue
new NamedThreadFactory( "httpbind-worker-", true, null, Thread.currentThread().getThreadGroup(), null )
queue, new NamedThreadFactory("httpbind-worker-", true, null, Thread.currentThread().getThreadGroup(), null)
);

if (JMXManager.isEnabled()) {
Expand Down Expand Up @@ -141,6 +143,27 @@ public void start() {
.setDynamic(false)
.build();

/**
* Maximum amount of tasks (roughly: stanzas) that can be queued for processing, before the processing executor
* will start to invoke the rejection policy.
*/
public static SystemProperty<Integer> QUEUE_CAPACITY = SystemProperty.Builder.ofType(Integer.class)
.setKey("xmpp.httpbind.worker.queue-capacity")
.setDynamic(false)
.setDefaultValue(Integer.MAX_VALUE)
.setMinValue(1)
.build();

/**
* Backpressure slows down the provider of tasks when the queue is full, by making the threads invoking
* {@link #execute(Runnable)} block until there is room again in the queue.
*/
public static SystemProperty<Boolean> BACKPRESSURE_ENABLED = SystemProperty.Builder.ofType(Boolean.class)
.setKey("xmpp.httpbind.worker.backpressure-enabled")
.setDynamic(false)
.setDefaultValue(false)
.build();

/**
* Interval in which a check is executed that will cleanup unused/inactive BOSH sessions.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.jivesoftware.openfire.http;

/**
* Enumerates frequently used RejectedExecutionHandler implementations.
*
* @author Guus der Kinderen, [email protected]
*/
public enum ThreadPoolExecutorRejectionPolicy
{
/**
* Indicates desired usage of {@link java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy}.
*/
DiscardOldestPolicy,

/**
* Indicates desired usage of {@link java.util.concurrent.ThreadPoolExecutor.AbortPolicy}.
*/
AbortPolicy,

/**
* Indicates desired usage of {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy}.
*/
CallerRunsPolicy,

/**
* Indicates desired usage of {@link java.util.concurrent.ThreadPoolExecutor.DiscardPolicy}.
*/
DiscardPolicy
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (C) 2022 Ignite Realtime Foundation. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.util;

import javax.annotation.Nonnull;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* A LinkedBlockingQueue of which the {@link #offer(Object)} method blocks, instead of immediately returning.
*
* This class is designed to be used as a queue for ThreadPoolExecutors that wish to slow down the producing threads
* when the queue is reaching capacity, and where {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy}
* cannot be used (for example, because it would cause rejected tasks to be executed _before_ already queued tasks).
*
* Note that the lock used to guard access in {@link LinkedBlockingQueue#offer(Object, long, TimeUnit)}, which is used
* by this implementation of {@link #offer(Object)}, uses an unfair lock. No strict ordering of execution of produced
* tasks can be guaranteed.
*
* @param <E>
* @author Guus der Kinderen, [email protected]
*/
public class BlocksOnOfferQueue<E> extends LinkedBlockingQueue<E>
{
public BlocksOnOfferQueue(int capacity) {
super(capacity);
}

@Override
public boolean offer(@Nonnull E e) {
try {
return super.offer(e, 999, TimeUnit.DAYS); // 'indefinitely'.
} catch (InterruptedException ex) {
return false;
}
}
}

0 comments on commit e272048

Please sign in to comment.