From 1b9a55e64709f27f76613292afc3fdc0fad1deaf Mon Sep 17 00:00:00 2001 From: John McClean Date: Thu, 21 May 2015 17:36:10 +0100 Subject: [PATCH] simple-react v0.86 --- build.gradle | 4 +- .../react/stream/FutureStreamBuilder.java | 18 --------- .../stream/traits/EagerFutureStream.java | 4 +- .../react/stream/traits/FutureStream.java | 37 +++++++++++++++++++ .../stream/traits/SimpleReactStream.java | 1 + .../aol/simple/react/async/BrownBagTest.java | 15 ++++++++ .../com/aol/simple/react/async/QueueTest.java | 10 +++-- .../aol/simple/react/simple/BlockingTest.java | 2 +- 8 files changed, 66 insertions(+), 25 deletions(-) delete mode 100644 src/main/java/com/aol/simple/react/stream/FutureStreamBuilder.java create mode 100644 src/test/java/com/aol/simple/react/async/BrownBagTest.java diff --git a/build.gradle b/build.gradle index 9a88478e0c..1bd7bbbac3 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ apply plugin: 'jacoco' apply from: custom('jacoco-version') sourceCompatibility = 1.8 -version = '0.85' +version = '0.86' jar { manifest { attributes 'Implementation-Title': 'Simple React', 'Implementation-Version': version @@ -62,7 +62,7 @@ modifyPom { groupId 'com.aol.simplereact' artifactId 'simple-react' - version '0.85' + version '0.86' scm { url 'scm:git@github.com:aol/simple-react.git' diff --git a/src/main/java/com/aol/simple/react/stream/FutureStreamBuilder.java b/src/main/java/com/aol/simple/react/stream/FutureStreamBuilder.java deleted file mode 100644 index d7b0b68fce..0000000000 --- a/src/main/java/com/aol/simple/react/stream/FutureStreamBuilder.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.aol.simple.react.stream; - -import java.util.concurrent.ExecutorService; - -import lombok.AllArgsConstructor; -import lombok.experimental.Builder; - -import com.nurkiewicz.asyncretry.RetryExecutor; - -@AllArgsConstructor -@Builder -public class FutureStreamBuilder { - - private final ExecutorService executor; - private final RetryExecutor retrier; - private final Boolean eager; - -} diff --git a/src/main/java/com/aol/simple/react/stream/traits/EagerFutureStream.java b/src/main/java/com/aol/simple/react/stream/traits/EagerFutureStream.java index e70fa98682..25e5fb1fa4 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/EagerFutureStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/EagerFutureStream.java @@ -8,6 +8,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Optional; import java.util.Random; @@ -39,7 +40,6 @@ import com.aol.simple.react.stream.eager.EagerReact; import com.aol.simple.react.stream.lazy.LazyReact; import com.aol.simple.react.stream.simple.SimpleReact; -import com.nurkiewicz.asyncretry.RetryExecutor; /** * @@ -939,6 +939,8 @@ default EagerFutureStream reverse() { return fromStream(FutureStream.super.reverse()); } + + /** * Shuffle a stream * diff --git a/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java b/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java index c034156040..b7a895baa9 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java @@ -6,6 +6,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Optional; import java.util.Random; @@ -1143,6 +1144,42 @@ default CloseableIterator iterator() { return new CloseableIterator<>(q.stream(getSubscription()) .iterator(), getSubscription(),q); } + + + + /* + * More efficient reverse implementation than Seq version + * + * + * @return + * @see org.jooq.lambda.Seq#reverse() + */ + default FutureStream reverse() { + return fromStream(Seq.seq(reversedIterator())); + } + default Iterator reversedIterator(){ + Queue q = toQueue(); + if (getSubscription().closed()) + return new CloseableIterator<>(Arrays. asList().iterator(), + getSubscription(),null); + List l= q.stream(getSubscription()).toList(); + ListIterator iterator = l.listIterator(l.size()); + return new CloseableIterator<>(new Iterator(){ + + @Override + public boolean hasNext() { + return iterator.hasPrevious(); + } + + @Override + public Object next() { + return iterator.previous(); + } + + }, getSubscription(),q); + + + } /* * (non-Javadoc) diff --git a/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java b/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java index 37c08e8ac6..2bd82b9388 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java @@ -236,6 +236,7 @@ default SimpleReactStream then(final Function fn) { Function,Stream> streamMapper = s ->s.map(ft -> ft.thenApplyAsync(SimpleReactStream.handleExceptions(fn),getTaskExecutor())); return (SimpleReactStream) this.withLastActive(getLastActive().stream(streamMapper)); } + /** * Can only be used on Eager Streams * diff --git a/src/test/java/com/aol/simple/react/async/BrownBagTest.java b/src/test/java/com/aol/simple/react/async/BrownBagTest.java new file mode 100644 index 0000000000..f773c4352f --- /dev/null +++ b/src/test/java/com/aol/simple/react/async/BrownBagTest.java @@ -0,0 +1,15 @@ +package com.aol.simple.react.async; + +import java.util.stream.Stream; + +import org.junit.Test; + +public class BrownBagTest { + + @Test + public void presentation(){ + Stream.of(6,5,1,2).map(e->e*100) + .filter(e->e<551) + .forEach(System.out::println); + } +} diff --git a/src/test/java/com/aol/simple/react/async/QueueTest.java b/src/test/java/com/aol/simple/react/async/QueueTest.java index b6ea23e504..28bb1d4a9c 100644 --- a/src/test/java/com/aol/simple/react/async/QueueTest.java +++ b/src/test/java/com/aol/simple/react/async/QueueTest.java @@ -1,17 +1,17 @@ package com.aol.simple.react.async; import static com.aol.simple.react.stream.traits.EagerFutureStream.parallel; -import static com.aol.simple.react.stream.traits.EagerFutureStream.parallelBuilder; import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.LinkedBlockingQueue; @@ -37,8 +37,12 @@ public void setup() { private final AtomicInteger found = new AtomicInteger(0); + @Test public void backPressureTest() { + + + Queue q = new Queue<>(new LinkedBlockingQueue<>(2)); new SimpleReact().react(() -> { q.offer(1); diff --git a/src/test/java/com/aol/simple/react/simple/BlockingTest.java b/src/test/java/com/aol/simple/react/simple/BlockingTest.java index 5f6f19ed3d..ed7896b91c 100644 --- a/src/test/java/com/aol/simple/react/simple/BlockingTest.java +++ b/src/test/java/com/aol/simple/react/simple/BlockingTest.java @@ -213,7 +213,7 @@ public void testBreakoutAllCompletedAndTime() throws InterruptedException, return it; }).capture(e -> count++) - .block(status -> status.getAllCompleted() >1 && status.getElapsedMillis()>200); + .block(status -> status.getAllCompleted() >1 && status.getElapsedMillis()>20); assertThat(result.size(), is(2)); assertThat(count, is(0));