JAMES-2566 Improve completableFutureUtil::chainAll

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/78113b27
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/78113b27
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/78113b27

Branch: refs/heads/master
Commit: 78113b27748d1823bd356bf93087dd7fcdf9a7cf
Parents: eedcc93
Author: Benoit Tellier <btell...@linagora.com>
Authored: Thu Oct 18 09:16:06 2018 +0700
Committer: Benoit Tellier <btell...@linagora.com>
Committed: Wed Oct 31 09:12:08 2018 +0700

----------------------------------------------------------------------
 .../james/util/CompletableFutureUtil.java       | 37 ++++++++++----------
 1 file changed, 18 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/78113b27/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java
----------------------------------------------------------------------
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java
 
b/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java
index 19c94b4..f51261a 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.util;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Optional;
@@ -29,6 +30,8 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
+import com.google.common.collect.ImmutableList;
+
 public class CompletableFutureUtil {
 
     public static <T> CompletableFuture<Optional<T>> 
unwrap(CompletableFuture<Optional<CompletableFuture<T>>> base) {
@@ -50,26 +53,22 @@ public class CompletableFutureUtil {
                 .map(CompletableFuture::join));
     }
 
-    public static <R, T> CompletableFuture<Stream<R>> chainAll(Stream<T> 
futureStream,
-        Function<T, CompletableFuture<R>> transformationToChain) {
-        return futureStream
-            .map(t -> (Supplier<CompletableFuture<R>>) (() -> 
transformationToChain.apply(t)))
-            .reduce(CompletableFuture.<Stream<R>>completedFuture(Stream.of()),
-                (accumulator, supplier) ->
-                    accumulator.thenCompose(
-                        accumulatedStream ->
-                            supplier.get()
-                                .thenCompose(r ->
-                                    
CompletableFuture.completedFuture(Stream.<R>concat(accumulatedStream, 
Stream.of(r))))
-                    ),
-                getCompletableFutureBinaryOperator());
-    }
+    @SuppressWarnings("unchecked")
+    public static <R, T> CompletableFuture<Stream<R>> chainAll(Stream<T> 
futureStream, Function<T, CompletableFuture<R>> transformationToChain) {
+        ImmutableList<T> elements = 
futureStream.collect(ImmutableList.toImmutableList());
+        ArrayList<R> results = new ArrayList<>(elements.size());
 
-    private static <R> BinaryOperator<CompletableFuture<Stream<R>>> 
getCompletableFutureBinaryOperator() {
-        return (future1, future2) ->
-            future1.thenCompose(stream1 ->
-                future2.<Stream<R>>thenCompose(stream2 ->
-                    CompletableFuture.completedFuture(Stream.concat(stream1, 
stream2))));
+        CompletableFuture<Void> futureEmptyStream = 
CompletableFuture.completedFuture(null);
+
+        BiFunction<CompletableFuture, Supplier<CompletableFuture<R>>, 
CompletableFuture> accumulator =
+            (future, supplier) -> future.thenCompose(any -> 
supplier.get().thenAccept(results::add));
+
+        BinaryOperator<CompletableFuture> combiner = (f1, f2) -> 
f1.thenCompose(any -> f2);
+
+        return elements.stream()
+            .map(t -> (Supplier<CompletableFuture<R>>) (() -> 
transformationToChain.apply(t)))
+            .reduce(futureEmptyStream, accumulator, combiner)
+            .thenApply(any -> results.stream());
     }
 
     public static <T, U> CompletableFuture<Stream<U>> 
map(CompletableFuture<Stream<T>> futurStream, Function<T, U> action) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to