MAILBOX-296 Enhance CompletableFuture with a chain all method

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0a243a9c
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0a243a9c
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0a243a9c

Branch: refs/heads/master
Commit: 0a243a9c6ce650fb53a0777eb0c3138432fca766
Parents: eea6903
Author: benwa <[email protected]>
Authored: Thu May 18 16:46:25 2017 +0700
Committer: benwa <[email protected]>
Committed: Fri May 19 17:30:35 2017 +0700

----------------------------------------------------------------------
 .../james/util/CompletableFutureUtil.java       | 22 +++++++
 .../james/util/CompletableFutureUtilTest.java   | 68 ++++++++++++++++++++
 2 files changed, 90 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/0a243a9c/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
 
b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
index c18217c..a3c7f51 100644
--- 
a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
+++ 
b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
@@ -47,6 +47,28 @@ public class CompletableFutureUtil {
             .orElse(CompletableFuture.completedFuture(Stream.of()));
     }
 
+    public static <R, T> CompletableFuture<Stream<R>> chainAll(Stream<T> 
futureStream,
+        Function<T, CompletableFuture<R>> transformationToChain) {
+        return futureStream
+            .map(t -> (Supplier<CompletableFuture<R>>) (() -> 
transformationToChain.apply(t)))
+            .reduce(CompletableFuture.<Stream<R>>completedFuture(Stream.of()),
+                (accumulator, supplier) ->
+                    accumulator.thenCompose(
+                        accumulatedStream ->
+                            supplier.get()
+                                .thenCompose(r ->
+                                    
CompletableFuture.completedFuture(Stream.<R>concat(accumulatedStream, 
Stream.of(r))))
+                    ),
+                getCompletableFutureBinaryOperator());
+    }
+
+    private static <R> BinaryOperator<CompletableFuture<Stream<R>>> 
getCompletableFutureBinaryOperator() {
+        return (future1, future2) ->
+            future1.thenCompose(stream1 ->
+                future2.<Stream<R>>thenCompose(stream2 ->
+                    CompletableFuture.completedFuture(Stream.concat(stream1, 
stream2))));
+    }
+
     public static <T> CompletableFuture<Stream<T>> 
performOnAll(CompletableFuture<Stream<T>> futurStream, Function<T, 
CompletableFuture<Void>> action) {
         return thenComposeOnAll(futurStream, value ->
             keepValue(() ->

http://git-wip-us.apache.org/repos/asf/james-project/blob/0a243a9c/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
 
b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
index 8035b50..8b637ea 100644
--- 
a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
+++ 
b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
@@ -22,16 +22,34 @@ package org.apache.james.util;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.base.Throwables;
 
 import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
 
 public class CompletableFutureUtilTest {
+    private ExecutorService executorService;
+
+    @Before
+    public void setUp() {
+        executorService = Executors.newFixedThreadPool(4);
+    }
+
+    @After
+    public void tearDown() {
+        executorService.shutdownNow();
+    }
 
     @Test
     public void allOfShouldUnboxEmptyStream() {
@@ -43,6 +61,56 @@ public class CompletableFutureUtilTest {
     }
 
     @Test
+    public void chainAllShouldPreserveExecutionOrder() {
+        int itemCount = 10;
+        ImmutableList<Integer> ints = IntStream.range(0, itemCount)
+            .boxed()
+            .collect(Guavate.toImmutableList());
+
+        ConcurrentLinkedDeque<Integer> queue = new ConcurrentLinkedDeque<>();
+
+        CompletableFutureUtil.chainAll(ints.stream(),
+            i -> CompletableFuture.supplyAsync(() -> {
+                try {
+                    Thread.sleep(itemCount - i);
+                } catch (InterruptedException e) {
+                    throw Throwables.propagate(e);
+                }
+                queue.add(i);
+                return i;
+            }, executorService))
+            .join();
+
+        assertThat(queue)
+            .containsExactlyElementsOf(ints);
+    }
+
+    @Test
+    public void chainAllShouldNotThrowOnEmptyStream() {
+        Stream<Integer> result = 
CompletableFutureUtil.chainAll(Stream.<Integer>of(),
+            i -> CompletableFuture.supplyAsync(() -> i, executorService))
+            .join();
+
+        assertThat(result.collect(Guavate.toImmutableList()))
+            .isEmpty();
+    }
+
+    @Test
+    public void chainAllShouldPreserveOrder() {
+        int itemCount = 10;
+        ImmutableList<Integer> ints = IntStream.range(0, itemCount)
+            .boxed()
+            .collect(Guavate.toImmutableList());
+
+        Stream<Integer> result = CompletableFutureUtil.chainAll(ints.stream(),
+            i -> CompletableFuture.supplyAsync(() -> i, executorService))
+            .join();
+
+        assertThat(result.collect(Guavate.toImmutableList()))
+            .containsExactlyElementsOf(ints);
+    }
+
+    @Test
     public void allOfShouldUnboxStream() {
         long value1 = 18L;
         long value2 = 19L;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to