Repository: james-project Updated Branches: refs/heads/master 91c204301 -> d1d75d16f
JAMES-2628 Migrate CassandraTableManager to Reactor Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/bf2dced2 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/bf2dced2 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/bf2dced2 Branch: refs/heads/master Commit: bf2dced2e48d978c3f0266e9c2ef48896f564db5 Parents: 5b46ce3 Author: Gautier DI FOLCO <gdifo...@linagora.com> Authored: Tue Dec 11 17:25:36 2018 +0100 Committer: Gautier DI FOLCO <gdifo...@linagora.com> Committed: Wed Dec 12 11:58:06 2018 +0100 ---------------------------------------------------------------------- backends-common/cassandra/pom.xml | 4 ++ .../cassandra/init/CassandraTableManager.java | 39 ++++++++------------ 2 files changed, 20 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/bf2dced2/backends-common/cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/pom.xml b/backends-common/cassandra/pom.xml index 78375ad..9b8d1d8 100644 --- a/backends-common/cassandra/pom.xml +++ b/backends-common/cassandra/pom.xml @@ -80,6 +80,10 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </dependency> <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> http://git-wip-us.apache.org/repos/asf/james-project/blob/bf2dced2/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java index 0a9129c..f406b5c 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java @@ -19,20 +19,18 @@ package org.apache.james.backends.cassandra.init; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; - import javax.inject.Inject; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.components.CassandraTable; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; -import org.apache.james.util.FluentFutureStream; import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class CassandraTableManager { @@ -60,27 +58,22 @@ public class CassandraTableManager { public void clearAllTables() { CassandraAsyncExecutor executor = new CassandraAsyncExecutor(session); - FluentFutureStream.of( - module.moduleTables() - .stream() + Flux.fromIterable(module.moduleTables()) + .publishOn(Schedulers.elastic()) .map(CassandraTable::getName) - .map(name -> truncate(executor, name))) - .join(); + .flatMap(name -> truncate(executor, name)) + .then() + .block(); } - private CompletableFuture<?> truncate(CassandraAsyncExecutor executor, String name) { - return executor.execute( - QueryBuilder.select() - .from(name) - .limit(1) - .setFetchSize(1)) - .thenCompose(resultSet -> truncateIfNeeded(executor, name, resultSet)); + private Mono<?> truncate(CassandraAsyncExecutor executor, String name) { + return Mono.fromFuture(executor.execute( + QueryBuilder.select() + .from(name) + .limit(1) + .setFetchSize(1))) + .filter(resultSet -> !resultSet.isExhausted()) + .flatMap(ignored -> Mono.fromFuture(executor.execute(QueryBuilder.truncate(name)))); } - private CompletionStage<ResultSet> truncateIfNeeded(CassandraAsyncExecutor executor, String name, ResultSet resultSet) { - if (resultSet.isExhausted()) { - return CompletableFuture.completedFuture(null); - } - return executor.execute(QueryBuilder.truncate(name)); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org