Skip to content

Commit

Permalink
simple-react v0.86
Browse files Browse the repository at this point in the history
  • Loading branch information
johnmcclean committed May 21, 2015
1 parent 0ed5d57 commit 1b9a55e
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 25 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ apply plugin: 'jacoco'
apply from: custom('jacoco-version')

sourceCompatibility = 1.8
version = '0.85'
version = '0.86'
jar {
manifest {
attributes 'Implementation-Title': 'Simple React', 'Implementation-Version': version
Expand Down Expand Up @@ -62,7 +62,7 @@ modifyPom {

groupId 'com.aol.simplereact'
artifactId 'simple-react'
version '0.85'
version '0.86'

scm {
url 'scm:[email protected]:aol/simple-react.git'
Expand Down
18 changes: 0 additions & 18 deletions src/main/java/com/aol/simple/react/stream/FutureStreamBuilder.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
Expand Down Expand Up @@ -39,7 +40,6 @@
import com.aol.simple.react.stream.eager.EagerReact;
import com.aol.simple.react.stream.lazy.LazyReact;
import com.aol.simple.react.stream.simple.SimpleReact;
import com.nurkiewicz.asyncretry.RetryExecutor;

/**
*
Expand Down Expand Up @@ -939,6 +939,8 @@ default EagerFutureStream<U> reverse() {
return fromStream(FutureStream.super.reverse());
}



/**
* Shuffle a stream
*
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/com/aol/simple/react/stream/traits/FutureStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
Expand Down Expand Up @@ -1143,6 +1144,42 @@ default CloseableIterator<U> iterator() {
return new CloseableIterator<>(q.stream(getSubscription())
.iterator(), getSubscription(),q);
}



/*
* More efficient reverse implementation than Seq version
*
*
* @return
* @see org.jooq.lambda.Seq#reverse()
*/
default FutureStream<U> reverse() {
return fromStream(Seq.seq(reversedIterator()));
}
default Iterator<U> reversedIterator(){
Queue<U> q = toQueue();
if (getSubscription().closed())
return new CloseableIterator<>(Arrays.<U> asList().iterator(),
getSubscription(),null);
List l= q.stream(getSubscription()).toList();
ListIterator iterator = l.listIterator(l.size());
return new CloseableIterator<>(new Iterator(){

@Override
public boolean hasNext() {
return iterator.hasPrevious();
}

@Override
public Object next() {
return iterator.previous();
}

}, getSubscription(),q);


}

/*
* (non-Javadoc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ default <R> SimpleReactStream<R> then(final Function<U, R> fn) {
Function<Stream<CompletableFuture>,Stream<CompletableFuture>> streamMapper = s ->s.map(ft -> ft.thenApplyAsync(SimpleReactStream.<U,R>handleExceptions(fn),getTaskExecutor()));
return (SimpleReactStream<R>) this.withLastActive(getLastActive().stream(streamMapper));
}

/**
* Can only be used on Eager Streams
*
Expand Down
15 changes: 15 additions & 0 deletions src/test/java/com/aol/simple/react/async/BrownBagTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.aol.simple.react.async;

import java.util.stream.Stream;

import org.junit.Test;

public class BrownBagTest {

@Test
public void presentation(){
Stream.of(6,5,1,2).map(e->e*100)
.filter(e->e<551)
.forEach(System.out::println);
}
}
10 changes: 7 additions & 3 deletions src/test/java/com/aol/simple/react/async/QueueTest.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package com.aol.simple.react.async;

import static com.aol.simple.react.stream.traits.EagerFutureStream.parallel;
import static com.aol.simple.react.stream.traits.EagerFutureStream.parallelBuilder;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -37,8 +37,12 @@ public void setup() {

private final AtomicInteger found = new AtomicInteger(0);


@Test
public void backPressureTest() {



Queue<Integer> q = new Queue<>(new LinkedBlockingQueue<>(2));
new SimpleReact().react(() -> {
q.offer(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void testBreakoutAllCompletedAndTime() throws InterruptedException,
return it;

}).capture(e -> count++)
.block(status -> status.getAllCompleted() >1 && status.getElapsedMillis()>200);
.block(status -> status.getAllCompleted() >1 && status.getElapsedMillis()>20);

assertThat(result.size(), is(2));
assertThat(count, is(0));
Expand Down

0 comments on commit 1b9a55e

Please sign in to comment.