MAILBOX-296 Enhance CompletableFuture with a chain all method
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0a243a9c Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0a243a9c Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0a243a9c Branch: refs/heads/master Commit: 0a243a9c6ce650fb53a0777eb0c3138432fca766 Parents: eea6903 Author: benwa <[email protected]> Authored: Thu May 18 16:46:25 2017 +0700 Committer: benwa <[email protected]> Committed: Fri May 19 17:30:35 2017 +0700 ---------------------------------------------------------------------- .../james/util/CompletableFutureUtil.java | 22 +++++++ .../james/util/CompletableFutureUtilTest.java | 68 ++++++++++++++++++++ 2 files changed, 90 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/0a243a9c/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java index c18217c..a3c7f51 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java @@ -47,6 +47,28 @@ public class CompletableFutureUtil { .orElse(CompletableFuture.completedFuture(Stream.of())); } + public static <R, T> CompletableFuture<Stream<R>> chainAll(Stream<T> futureStream, + Function<T, CompletableFuture<R>> transformationToChain) { + return futureStream + .map(t -> (Supplier<CompletableFuture<R>>) (() -> transformationToChain.apply(t))) + .reduce(CompletableFuture.<Stream<R>>completedFuture(Stream.of()), + (accumulator, supplier) -> + accumulator.thenCompose( + accumulatedStream -> + supplier.get() + .thenCompose(r -> + CompletableFuture.completedFuture(Stream.<R>concat(accumulatedStream, Stream.of(r)))) + ), + getCompletableFutureBinaryOperator()); + } + + private static <R> BinaryOperator<CompletableFuture<Stream<R>>> getCompletableFutureBinaryOperator() { + return (future1, future2) -> + future1.thenCompose(stream1 -> + future2.<Stream<R>>thenCompose(stream2 -> + CompletableFuture.completedFuture(Stream.concat(stream1, stream2)))); + } + public static <T> CompletableFuture<Stream<T>> performOnAll(CompletableFuture<Stream<T>> futurStream, Function<T, CompletableFuture<Void>> action) { return thenComposeOnAll(futurStream, value -> keepValue(() -> http://git-wip-us.apache.org/repos/asf/james-project/blob/0a243a9c/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java index 8035b50..8b637ea 100644 --- a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java +++ b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java @@ -22,16 +22,34 @@ package org.apache.james.util; import static org.assertj.core.api.Assertions.assertThat; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.testcontainers.shaded.com.google.common.base.Throwables; import com.github.steveash.guavate.Guavate; +import com.google.common.collect.ImmutableList; public class CompletableFutureUtilTest { + private ExecutorService executorService; + + @Before + public void setUp() { + executorService = Executors.newFixedThreadPool(4); + } + + @After + public void tearDown() { + executorService.shutdownNow(); + } @Test public void allOfShouldUnboxEmptyStream() { @@ -43,6 +61,56 @@ public class CompletableFutureUtilTest { } @Test + public void chainAllShouldPreserveExecutionOrder() { + int itemCount = 10; + ImmutableList<Integer> ints = IntStream.range(0, itemCount) + .boxed() + .collect(Guavate.toImmutableList()); + + ConcurrentLinkedDeque<Integer> queue = new ConcurrentLinkedDeque<>(); + + CompletableFutureUtil.chainAll(ints.stream(), + i -> CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(itemCount - i); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + queue.add(i); + return i; + }, executorService)) + .join(); + + assertThat(queue) + .containsExactlyElementsOf(ints); + } + + @Test + public void chainAllShouldNotThrowOnEmptyStream() { + Stream<Integer> result = CompletableFutureUtil.chainAll(Stream.<Integer>of(), + i -> CompletableFuture.supplyAsync(() -> i, executorService)) + .join(); + + assertThat(result.collect(Guavate.toImmutableList())) + .isEmpty(); + } + + @Test + public void chainAllShouldPreserveOrder() { + int itemCount = 10; + ImmutableList<Integer> ints = IntStream.range(0, itemCount) + .boxed() + .collect(Guavate.toImmutableList()); + + Stream<Integer> result = CompletableFutureUtil.chainAll(ints.stream(), + i -> CompletableFuture.supplyAsync(() -> i, executorService)) + .join(); + + assertThat(result.collect(Guavate.toImmutableList())) + .containsExactlyElementsOf(ints); + } + + @Test public void allOfShouldUnboxStream() { long value1 = 18L; long value2 = 19L; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
