-
Notifications
You must be signed in to change notification settings - Fork 135
ReactiveStreams : QueueBasedSubscriber
Is a more advanced / complicated ReactiveStreams subscriber (depending on your point of view). It allows multiple subscriptions to be backed by a cyclops-react async.Queue (or topic) with demand signalled concurrently (configurable) across those subscriptions. By default errors are not propagated, but users can plugin their own error handler. Connected streams will remain connected until the Queue is closed.
For most use cases, please use SeqSubscriber instead.
For this simple use case the connected Stream will print out 1,2,3 but remain connected to the QueueBasedSubscriber (as we have to close the Queue to close the connection).
QueueBasedSubscriber<Integer> sub = QueueBasedSubscriber.subscriber(QueueFactories.boundedNonBlockingQueue(1000), new Counter(), 10);
ReactiveSeq.of(1,2,3)
.subscribe(sub);
sub.reactiveSeq()
.forEachWithError(System.out::println, System.err::println);
The input parameters to QueueBasedSubsriber are
- Queue to back the subscriptions with
- Counter which should be set to level of connected subscriptions
- Max concurrency across subscriptions
Allowing multiple subscriptions for a single subscriber would break the reactive-streams TCK, so with QueueBasedSubscriber we create multiple subscribers backed by the same async.Queue.
final Counter c = new Counter();
c.active.set(publishers.size() + 1); //set the number of publishers we will connnect to
final QueueBasedSubscriber<T> mainSub = QueueBasedSubscriber.subscriber(factory, c, publishers.size());
//define an initializing Supplier
final Supplier<Continuation> sp = () -> {
subscribe(mainSub); // when the supplier is run subscribe
for (final Publisher<T> next : publishers) { // for each Publisher generate a new Subscriber backed by the same Queue
next.subscribe(QueueBasedSubscriber.subscriber(mainSub.getQueue(), c, publishers.size()));
}
init.close(); //finish the initialization of the QueueBasedSubscriber
return Continuation.empty();
};
final Continuation continuation = new Continuation(
sp);
init.addContinuation(continuation); //configure the Queue to initialize
return ReactiveSeq.fromStream(init.jdkStream());
oops - my bad