Skip to content

Commit

Permalink
Implement remaining methods, including reactive lib connections
Browse files Browse the repository at this point in the history
  • Loading branch information
emil-bar committed Jan 2, 2025
1 parent c7fb12d commit 404c0e6
Show file tree
Hide file tree
Showing 7 changed files with 752 additions and 23 deletions.
30 changes: 30 additions & 0 deletions flows/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,36 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>7.10.2</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>channels</artifactId>
Expand Down
60 changes: 37 additions & 23 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -39,7 +40,7 @@
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.CancellableFork;
import com.softwaremill.jox.structured.Fork;
import com.softwaremill.jox.structured.Scopes;
import com.softwaremill.jox.structured.Scope;
import com.softwaremill.jox.structured.UnsupervisedScope;

/**
Expand Down Expand Up @@ -368,7 +369,7 @@ public <S, U> Flow<U> mapStateful(Supplier<S> initializeState, StatefulMapper<T,
*/
public <S, U> Flow<U> mapStatefulConcat(Supplier<S> initializeState, StatefulMapper<T, S, Iterable<U>> f, OnComplete<S, U> onComplete) {
AtomicReference<S> state = new AtomicReference<>(initializeState.get());
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
last.run(t -> {
Map.Entry<S, Iterable<U>> result = f.apply(state.get(), t);
for (U u : result.getValue()) {
Expand Down Expand Up @@ -453,7 +454,7 @@ public <U> Flow<U> flatMap(Function<T, Flow<U>> mappingFunction) {
* completes as well.
*/
public Flow<T> take(int n) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
AtomicInteger taken = new AtomicInteger(0);
try {
last.run(t -> {
Expand Down Expand Up @@ -550,8 +551,8 @@ public Flow<List<T>> groupedWeightedWithin(long minWeight, Duration duration, Fu
if (minWeight <= 0) throw new IllegalArgumentException("requirement failed: minWeight must be > 0");
if (duration.toMillis() <= 0) throw new IllegalArgumentException("requirement failed: duration must be > 0");

return Flows.usingEmit(emit -> {
Scopes.unsupervised(scope -> {
return usingEmit(emit -> {
unsupervised(scope -> {
Source<T> flowSource = runToChannel(scope);
Channel<List<T>> outputChannel = Channel.withScopedBufferSize();
Channel<GroupingTimeout> timerChannel = Channel.withScopedBufferSize();
Expand Down Expand Up @@ -620,7 +621,7 @@ case ChannelError(Throwable cause):

private CancellableFork<GroupingTimeout> forkTimeout(UnsupervisedScope scope, Channel<GroupingTimeout> timerChannel, Duration duration) {
return scope.forkCancellable(() -> {
Thread.sleep(duration);
sleep(duration);
timerChannel.sendOrClosed(GroupingTimeout.INSTANCE);
return null;
});
Expand Down Expand Up @@ -651,7 +652,7 @@ public Flow<List<T>> groupedWeighted(long minWeight, Function<T, Long> costFn) {
throw new IllegalArgumentException("minWeight must be > 0");
}

return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
List<T> buffer = new ArrayList<>();
AtomicLong accumulatedCost = new AtomicLong(0L);
last.run(t -> {
Expand All @@ -674,14 +675,14 @@ public Flow<List<T>> groupedWeighted(long minWeight, Function<T, Long> costFn) {
* Discard all elements emitted by this flow. The returned flow completes only when this flow completes (successfully or with an error).
*/
public Flow<Void> drain() {
return Flows.usingEmit(_ -> last.run(_ -> {}));
return usingEmit(_ -> last.run(_ -> {}));
}

/**
* Always runs `f` after the flow completes, whether it's because all elements are emitted, or when there's an error.
*/
public Flow<T> onComplete(Runnable f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
try {
last.run(emit);
} finally {
Expand All @@ -694,7 +695,7 @@ public Flow<T> onComplete(Runnable f) {
* Runs `f` after the flow completes successfully, that is when all elements are emitted.
*/
public Flow<T> onDone(Runnable f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
last.run(emit);
f.run();
});
Expand All @@ -704,7 +705,7 @@ public Flow<T> onDone(Runnable f) {
* Runs `f` after the flow completes with an error. The error can't be recovered.
*/
public Flow<T> onError(Consumer<Throwable> f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
try {
last.run(emit);
} catch (Throwable e) {
Expand Down Expand Up @@ -734,7 +735,7 @@ public Flow<T> intersperse(T start, T inject, T end) {
}

private Flow<T> intersperse(Optional<T> start, T inject, Optional<T> end) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
if (start.isPresent()) {
emit.apply(start.get());
}
Expand Down Expand Up @@ -790,7 +791,7 @@ public Flow<T> throttle(int elements, Duration per) {
* Whether the flow should also emit the first element that failed the predicate (`false` by default).
*/
public Flow<T> takeWhile(Predicate<T> f, boolean includeFirstFailing) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
try {
last.run(t -> {
if (f.test(t)) {
Expand Down Expand Up @@ -826,7 +827,7 @@ public Flow<T> concat(Flow<T> other) {
* Number of elements to be dropped.
*/
public Flow<T> drop(int n) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
AtomicInteger dropped = new AtomicInteger(0);
last.run(t -> {
if (dropped.get() < n) {
Expand Down Expand Up @@ -854,7 +855,7 @@ public Flow<T> drop(int n) {
* Should the resulting flow complete when the right flow (`outer`) completes, before `this` flow.
*/
public Flow<T> merge(Flow<T> other, boolean propagateDoneLeft, boolean propagateDoneRight) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
unsupervised(scope -> {
Source<T> c1 = this.runToChannel(scope);
Source<T> c2 = other.runToChannel(scope);
Expand Down Expand Up @@ -895,7 +896,7 @@ public Flow<T> prepend(Flow<T> other) {
* An alternative flow to be used when this flow is empty.
*/
public Flow<T> orElse(Flow<T> alternative) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
AtomicBoolean receivedAtLeastOneElement = new AtomicBoolean(false);
last.run(t -> {
emit.apply(t);
Expand Down Expand Up @@ -938,7 +939,7 @@ public <U> Flow<U> interleave(Flow<U> other, int segmentSize, boolean eagerCompl
* returned flow. If the result of `f` is empty, nothing is emitted by the returned channel.
*/
public <U> Flow<U> mapConcat(Function<T, Iterable<U>> f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
last.run(t -> {
for (U u : f.apply(t)) {
emit.apply(u);
Expand All @@ -961,15 +962,15 @@ public <U> Flow<U> mapConcat(Function<T, Iterable<U>> f) {
* The mapping function.
*/
public <U> Flow<U> mapPar(int parallelism, Function<T, U> f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
Semaphore semaphore = new Semaphore(parallelism);
Channel<Fork<Optional<U>>> inProgress = new Channel<>(parallelism);
Channel<U> results = Channel.withScopedBufferSize();

// creating a nested scope, so that in case of errors, we can clean up any mapping forks in a "local" fashion,
// that is without closing the main scope; any error management must be done in the forks, as the scope is
// unsupervised
Scopes.unsupervised(scope -> {
unsupervised(scope -> {
// a fork which runs the `last` pipeline, and for each emitted element creates a fork
// notifying only the `results` channels, as it will cause the scope to end, and any other forks to be
// interrupted, including the inProgress-fork, which might be waiting on a join()
Expand Down Expand Up @@ -1029,7 +1030,7 @@ public <U> Flow<U> mapPar(int parallelism, Function<T, U> f) {
* The mapping function.
*/
public <U> Flow<U> mapParUnordered(int parallelism, Function<T, U> f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
Channel<U> results = Channel.withScopedBufferSize();
Semaphore s = new Semaphore(parallelism);
unsupervised(unsupervisedScope -> { // the outer scope, used for the fork which runs the `last` pipeline
Expand Down Expand Up @@ -1075,7 +1076,7 @@ public Flow<List<T>> sliding(int n, int step) {
if (n <= 0) throw new IllegalArgumentException("n must be > 0");
if (step <= 0) throw new IllegalArgumentException("step must be > 0");

return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
final AtomicReference<List<T>> buf = new AtomicReference<>(new ArrayList<>());
last.run(t -> {
var buffer = buf.get();
Expand Down Expand Up @@ -1113,7 +1114,7 @@ public Flow<List<T>> sliding(int n, int step) {
* @see #alsoToTap for a version that drops elements when the `other` sink is not available for receive.
*/
public Flow<T> alsoTo(Sink<T> other) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
try {
last.run(t -> {
try {
Expand Down Expand Up @@ -1144,7 +1145,7 @@ public Flow<T> alsoTo(Sink<T> other) {
* @see #alsoTo for a version that ensures that elements are emitted both by the returned flow and sent to the `other` sink.
*/
public Flow<T> alsoToTap(Sink<T> other) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
try {
last.run(t -> {
try {
Expand All @@ -1161,6 +1162,19 @@ public Flow<T> alsoToTap(Sink<T> other) {
});
}

/** Converts this {@link Flow} into a {@link Publisher}. The flow is run every time the publisher is subscribed to.
* <p>
* Must be run within a concurrency scope, as upon subscribing, a fork is created to run the publishing process. Hence, the scope should
* remain active as long as the publisher is used.
* <p>
* Elements emitted by the flow are buffered, using a buffer of capacity given by the {@link Channel#BUFFER_SIZE} in scope or default value {@link Channel#DEFAULT_BUFFER_SIZE} is used.
* <p>
* The returned publisher implements the JDK 9+ {@code Flow.Publisher} API.
*/
public Publisher<T> toPublisher(Scope scope) {
return new FromFlowPublisher<>(scope, last);
}

// endregion

private void forkPropagate(UnsupervisedScope unsupervisedScope, Sink<?> propagateExceptionsTo, Callable<Void> runnable) {
Expand Down
Loading

0 comments on commit 404c0e6

Please sign in to comment.