Repository: james-project Updated Branches: refs/heads/master d04e65506 -> db1c5dac5
JAMES-2623 port Runnables to Reactor to avoid using Common ForkJoin pool Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/db1c5dac Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/db1c5dac Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/db1c5dac Branch: refs/heads/master Commit: db1c5dac54e711f7d09e5ba38ce8824a5b51d015 Parents: d04e655 Author: Matthieu Baechler <[email protected]> Authored: Sat Nov 24 09:01:13 2018 +0100 Committer: Raphael Ouazana <[email protected]> Committed: Tue Dec 18 09:55:37 2018 +0100 ---------------------------------------------------------------------- .../org/apache/james/CleanupTasksPerformer.java | 6 ++- .../apache/james/AggregateJunitExtension.java | 21 ++++----- server/container/util/pom.xml | 4 ++ .../java/org/apache/james/util/Runnables.java | 34 +++++++------- .../org/apache/james/util/RunnablesTest.java | 49 ++++++++++++++++++++ 5 files changed, 83 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java ---------------------------------------------------------------------- diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java b/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java index 0fcbdb9..56772ac 100644 --- a/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java +++ b/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java @@ -26,6 +26,8 @@ import javax.inject.Inject; import org.apache.james.task.Task; import org.apache.james.util.Runnables; +import reactor.core.publisher.Flux; + public class CleanupTasksPerformer { public interface CleanupTask extends Task { @@ -41,8 +43,8 @@ public class CleanupTasksPerformer { public void clean() { Runnables - .runParrallelStream( - cleanupTasks.stream() + .runParallel( + Flux.fromIterable(cleanupTasks) .map(cleanupTask -> cleanupTask::run)); } http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java ---------------------------------------------------------------------- diff --git a/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java b/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java index a8d867c..8cf131c 100644 --- a/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java +++ b/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.extension.ParameterResolutionException; import com.github.fge.lambdas.Throwing; import com.google.common.collect.Lists; +import reactor.core.publisher.Flux; public class AggregateJunitExtension implements RegistrableExtension { @@ -40,30 +41,26 @@ public class AggregateJunitExtension implements RegistrableExtension { @Override public void beforeAll(ExtensionContext extensionContext) { - Runnables.runParrallelStream(registrableExtensions - .stream() - .map(ext -> Throwing.runnable(() -> ext.beforeAll(extensionContext)))); + Runnables.runParallel(Flux.fromIterable(registrableExtensions) + .map(ext -> Throwing.runnable(() -> ext.beforeAll(extensionContext)))); } @Override public void beforeEach(ExtensionContext extensionContext) { - Runnables.runParrallelStream(registrableExtensions - .stream() - .map(ext -> Throwing.runnable(() -> ext.beforeEach(extensionContext)))); + Runnables.runParallel(Flux.fromIterable(registrableExtensions) + .map(ext -> Throwing.runnable(() -> ext.beforeEach(extensionContext)))); } @Override public void afterEach(ExtensionContext extensionContext) { - Runnables.runParrallelStream(Lists.reverse(registrableExtensions) - .stream() - .map(ext -> Throwing.runnable(() -> ext.afterEach(extensionContext)))); + Runnables.runParallel(Flux.fromIterable(Lists.reverse(registrableExtensions)) + .map(ext -> Throwing.runnable(() -> ext.afterEach(extensionContext)))); } @Override public void afterAll(ExtensionContext extensionContext) { - Runnables.runParrallelStream(Lists.reverse(registrableExtensions) - .stream() - .map(ext -> Throwing.runnable(() -> ext.afterAll(extensionContext)))); + Runnables.runParallel(Flux.fromIterable(Lists.reverse(registrableExtensions)) + .map(ext -> Throwing.runnable(() -> ext.afterAll(extensionContext)))); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/util/pom.xml ---------------------------------------------------------------------- diff --git a/server/container/util/pom.xml b/server/container/util/pom.xml index 47933e0..fa76097 100644 --- a/server/container/util/pom.xml +++ b/server/container/util/pom.xml @@ -71,6 +71,10 @@ <artifactId>commons-io</artifactId> </dependency> <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </dependency> + <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/util/src/main/java/org/apache/james/util/Runnables.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/main/java/org/apache/james/util/Runnables.java b/server/container/util/src/main/java/org/apache/james/util/Runnables.java index 0d4bc91..b0f707e 100644 --- a/server/container/util/src/main/java/org/apache/james/util/Runnables.java +++ b/server/container/util/src/main/java/org/apache/james/util/Runnables.java @@ -19,27 +19,27 @@ package org.apache.james.util; -import java.util.Arrays; -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; -import java.util.stream.Stream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class Runnables { public static void runParallel(Runnable... runnables) { - Stream<Runnable> stream = Arrays.stream(runnables); - runParrallelStream(stream); + Flux<Runnable> stream = Flux.just(runnables); + runParallel(stream); } - public static void runParrallelStream(Stream<Runnable> stream) { - FluentFutureStream.of(stream - .map(runnable -> CompletableFuture.supplyAsync(toVoidSupplier(runnable)))) - .join(); - } - - private static Supplier<Void> toVoidSupplier(Runnable runnable) { - return () -> { - runnable.run(); - return null; - }; + public static void runParallel(Flux<Runnable> runnables) { + runnables + .publishOn(Schedulers.elastic()) + .parallel() + .runOn(Schedulers.elastic()) + .flatMap(runnable -> { + runnable.run(); + return Mono.empty(); + }) + .sequential() + .then() + .block(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java b/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java new file mode 100644 index 0000000..e080dcc --- /dev/null +++ b/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.james.util; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.Test; + +import reactor.core.publisher.Flux; + +class RunnablesTest { + + @Test + void shouldActuallyRunThings() { + AtomicBoolean sideEffect = new AtomicBoolean(false); + Runnables.runParallel(() -> sideEffect.set(true)); + assertThat(sideEffect).isTrue(); + } + + @Test + void shouldActuallyRunInParallel() throws InterruptedException { + int parallel = 2; + CountDownLatch countDownLatch = new CountDownLatch(parallel); + Runnable runnable = countDownLatch::countDown; + Runnables.runParallel(Flux.range(0, 2).map(i -> runnable)); + assertThat(countDownLatch.await(2, TimeUnit.MINUTES)).isTrue(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
