http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java index 2fd0426..df5dccc 100644 --- a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java +++ b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java @@ -49,15 +49,15 @@ class CassandraActiveScriptDAOTest { @Test void getActiveSctiptInfoShouldReturnEmptyByDefault() { - assertThat(activeScriptDAO.getActiveSctiptInfo(USER).join().isPresent()) + assertThat(activeScriptDAO.getActiveSctiptInfo(USER).blockOptional().isPresent()) .isFalse(); } @Test void getActiveSctiptInfoShouldReturnStoredName() { - activeScriptDAO.activate(USER, SCRIPT_NAME).join(); + activeScriptDAO.activate(USER, SCRIPT_NAME).block(); - Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join(); + Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional(); assertThat(actual.isPresent()).isTrue(); assertThat(actual.get().getName()).isEqualTo(SCRIPT_NAME); @@ -65,30 +65,30 @@ class CassandraActiveScriptDAOTest { @Test void activateShouldAllowRename() { - activeScriptDAO.activate(USER, SCRIPT_NAME).join(); + activeScriptDAO.activate(USER, SCRIPT_NAME).block(); - activeScriptDAO.activate(USER, NEW_SCRIPT_NAME).join(); + activeScriptDAO.activate(USER, NEW_SCRIPT_NAME).block(); - Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join(); + Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional(); assertThat(actual.isPresent()).isTrue(); assertThat(actual.get().getName()).isEqualTo(NEW_SCRIPT_NAME); } @Test void unactivateShouldAllowRemovingActiveScript() { - activeScriptDAO.activate(USER, SCRIPT_NAME).join(); + activeScriptDAO.activate(USER, SCRIPT_NAME).block(); - activeScriptDAO.unactivate(USER).join(); + activeScriptDAO.unactivate(USER).block(); - Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join(); + Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional(); assertThat(actual.isPresent()).isFalse(); } @Test void unactivateShouldWorkWhenNoneStore() { - activeScriptDAO.unactivate(USER).join(); + activeScriptDAO.unactivate(USER).block(); - Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join(); + Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional(); assertThat(actual.isPresent()).isFalse(); } }
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java index 5d38f61..5e3150d 100644 --- a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java +++ b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java @@ -70,54 +70,54 @@ class CassandraSieveDAOTest { @Test void getScriptShouldReturnEmptyByDefault() { - assertThat(sieveDAO.getScript(USER, SCRIPT_NAME).join().isPresent()) - .isFalse(); + assertThat(sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional()) + .isEmpty(); } @Test void getScriptShouldReturnStoredScript() { - sieveDAO.insertScript(USER, SCRIPT).join(); + sieveDAO.insertScript(USER, SCRIPT).block(); - Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join(); + Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional(); assertThat(actual).contains(SCRIPT); } @Test void insertScriptShouldUpdateContent() { - sieveDAO.insertScript(USER, SCRIPT).join(); + sieveDAO.insertScript(USER, SCRIPT).block(); - sieveDAO.insertScript(USER, SCRIPT_NEW_CONTENT).join(); + sieveDAO.insertScript(USER, SCRIPT_NEW_CONTENT).block(); - Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join(); + Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional(); assertThat(actual).contains(SCRIPT_NEW_CONTENT); } @Test void insertScriptShouldUpdateActivate() { - sieveDAO.insertScript(USER, SCRIPT).join(); + sieveDAO.insertScript(USER, SCRIPT).block(); - sieveDAO.insertScript(USER, ACTIVE_SCRIPT).join(); + sieveDAO.insertScript(USER, ACTIVE_SCRIPT).block(); - Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join(); + Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional(); assertThat(actual).contains(ACTIVE_SCRIPT); } @Test void deleteScriptInCassandraShouldWork() { - sieveDAO.insertScript(USER, SCRIPT).join(); + sieveDAO.insertScript(USER, SCRIPT).block(); - sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).join(); + sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).block(); - Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join(); + Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional(); assertThat(actual).isEmpty(); } @Test void deleteScriptInCassandraShouldWorkWhenNoneStore() { - sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).join(); + sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).block(); - Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join(); + Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional(); assertThat(actual).isEmpty(); } @@ -130,8 +130,8 @@ class CassandraSieveDAOTest { @Test void listScriptsShouldReturnSingleStoredValue() { - sieveDAO.insertScript(USER, SCRIPT).join(); - sieveDAO.insertScript(USER, SCRIPT2).join(); + sieveDAO.insertScript(USER, SCRIPT).block(); + sieveDAO.insertScript(USER, SCRIPT2).block(); List<ScriptSummary> scriptSummaryList = sieveDAO.listScripts(USER).join(); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java index adca5b0..7cacb55 100644 --- a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java +++ b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java @@ -116,7 +116,7 @@ class CassandraSieveQuotaDAOTest { void spaceUsedByShouldReturnStoredValue() { long spaceUsed = 18L; - sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join(); + sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block(); assertThat(sieveQuotaDAO.spaceUsedBy(USER).join()).isEqualTo(spaceUsed); } @@ -125,8 +125,8 @@ class CassandraSieveQuotaDAOTest { void updateSpaceUsedShouldBeAdditive() { long spaceUsed = 18L; - sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join(); - sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join(); + sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block(); + sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block(); assertThat(sieveQuotaDAO.spaceUsedBy(USER).join()).isEqualTo(2 * spaceUsed); } @@ -135,8 +135,8 @@ class CassandraSieveQuotaDAOTest { void updateSpaceUsedShouldWorkWithNegativeValues() { long spaceUsed = 18L; - sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join(); - sieveQuotaDAO.updateSpaceUsed(USER, -1 * spaceUsed).join(); + sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block(); + sieveQuotaDAO.updateSpaceUsed(USER, -1 * spaceUsed).block(); assertThat(sieveQuotaDAO.spaceUsedBy(USER).join()).isEqualTo(0L); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java index 6655798..6f005e8 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java @@ -22,7 +22,6 @@ package org.apache.james.mailrepository.cassandra; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; @@ -33,10 +32,11 @@ import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepository; import org.apache.james.mailrepository.api.MailRepositoryUrl; import org.apache.james.util.CompletableFutureUtil; -import org.apache.james.util.FluentFutureStream; import org.apache.mailet.Mail; import com.github.fge.lambdas.Throwing; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CassandraMailRepository implements MailRepository { @@ -60,28 +60,28 @@ public class CassandraMailRepository implements MailRepository { public MailKey store(Mail mail) throws MessagingException { MailKey mailKey = MailKey.forMail(mail); - mimeMessageStore.save(mail.getMessage()) + Mono.fromFuture(mimeMessageStore.save(mail.getMessage()) .thenCompose(Throwing.function(parts -> mailDAO.store(url, mail, parts.getHeaderBlobId(), - parts.getBodyBlobId()))) - .thenCompose(any -> keysDAO.store(url, mailKey)) - .thenCompose(this::increaseSizeIfStored) - .join(); + parts.getBodyBlobId())))) + .then(keysDAO.store(url, mailKey)) + .flatMap(this::increaseSizeIfStored) + .block(); return mailKey; } - private CompletionStage<Void> increaseSizeIfStored(Boolean isStored) { + private Mono<Void> increaseSizeIfStored(Boolean isStored) { if (isStored) { return countDAO.increment(url); } - return CompletableFuture.completedFuture(null); + return Mono.empty(); } @Override public Iterator<MailKey> list() { return keysDAO.list(url) - .join() + .toIterable() .iterator(); } @@ -108,33 +108,34 @@ public class CassandraMailRepository implements MailRepository { @Override public void remove(Mail mail) { - removeAsync(MailKey.forMail(mail)).join(); + removeAsync(MailKey.forMail(mail)).block(); } @Override public void remove(Collection<Mail> toRemove) { - FluentFutureStream.of(toRemove.stream() + Flux.fromIterable(toRemove) .map(MailKey::forMail) - .map(this::removeAsync)) - .join(); + .flatMap(this::removeAsync) + .then() + .block(); } @Override public void remove(MailKey key) { - removeAsync(key).join(); + removeAsync(key).block(); } - private CompletableFuture<Void> removeAsync(MailKey key) { + private Mono<Void> removeAsync(MailKey key) { return keysDAO.remove(url, key) - .thenCompose(this::decreaseSizeIfDeleted) - .thenCompose(any -> mailDAO.remove(url, key)); + .flatMap(this::decreaseSizeIfDeleted) + .then(mailDAO.remove(url, key)); } - private CompletionStage<Void> decreaseSizeIfDeleted(Boolean isDeleted) { + private Mono<Void> decreaseSizeIfDeleted(Boolean isDeleted) { if (isDeleted) { return countDAO.decrement(url); } - return CompletableFuture.completedFuture(null); + return Mono.empty(); } @Override @@ -145,9 +146,9 @@ public class CassandraMailRepository implements MailRepository { @Override public void removeAll() { keysDAO.list(url) - .thenCompose(stream -> FluentFutureStream.of(stream.map(this::removeAsync)) - .completableFuture()) - .join(); + .flatMap(this::removeAsync) + .then() + .block(); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java index cc2d4ea..540aa1a 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java @@ -40,6 +40,7 @@ import org.apache.james.mailrepository.api.MailRepositoryUrl; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import reactor.core.publisher.Mono; public class CassandraMailRepositoryCountDAO { @@ -75,13 +76,13 @@ public class CassandraMailRepositoryCountDAO { .where(eq(REPOSITORY_NAME, bindMarker(REPOSITORY_NAME)))); } - public CompletableFuture<Void> increment(MailRepositoryUrl url) { - return executor.executeVoid(increment.bind() + public Mono<Void> increment(MailRepositoryUrl url) { + return executor.executeVoidReactor(increment.bind() .setString(REPOSITORY_NAME, url.asString())); } - public CompletableFuture<Void> decrement(MailRepositoryUrl url) { - return executor.executeVoid(decrement.bind() + public Mono<Void> decrement(MailRepositoryUrl url) { + return executor.executeVoidReactor(decrement.bind() .setString(REPOSITORY_NAME, url.asString())); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java index 93bffec..8bd5902 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java @@ -28,9 +28,6 @@ import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.KEYS import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.MAIL_KEY; import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.REPOSITORY_NAME; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; - import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; @@ -40,6 +37,8 @@ import org.apache.james.mailrepository.api.MailRepositoryUrl; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CassandraMailRepositoryKeysDAO { @@ -80,20 +79,20 @@ public class CassandraMailRepositoryKeysDAO { .value(MAIL_KEY, bindMarker(MAIL_KEY))); } - public CompletableFuture<Boolean> store(MailRepositoryUrl url, MailKey key) { + public Mono<Boolean> store(MailRepositoryUrl url, MailKey key) { return executor.executeReturnApplied(insertKey.bind() .setString(REPOSITORY_NAME, url.asString()) .setString(MAIL_KEY, key.asString())); } - public CompletableFuture<Stream<MailKey>> list(MailRepositoryUrl url) { - return executor.execute(listKeys.bind() + public Flux<MailKey> list(MailRepositoryUrl url) { + return executor.executeReactor(listKeys.bind() .setString(REPOSITORY_NAME, url.asString())) - .thenApply(cassandraUtils::convertToStream) - .thenApply(stream -> stream.map(row -> new MailKey(row.getString(MAIL_KEY)))); + .flatMapMany(cassandraUtils::convertToFlux) + .map(row -> new MailKey(row.getString(MAIL_KEY))); } - public CompletableFuture<Boolean> remove(MailRepositoryUrl url, MailKey key) { + public Mono<Boolean> remove(MailRepositoryUrl url, MailKey key) { return executor.executeReturnApplied(deleteKey.bind() .setString(REPOSITORY_NAME, url.asString()) .setString(MAIL_KEY, key.asString())); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java index 7c3596b..6e884b4 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java @@ -84,6 +84,7 @@ import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import reactor.core.publisher.Mono; public class CassandraMailRepositoryMailDAO implements CassandraMailRepositoryMailDaoAPI { @@ -161,8 +162,8 @@ public class CassandraMailRepositoryMailDAO implements CassandraMailRepositoryMa } @Override - public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) { - return executor.executeVoid(deleteMail.bind() + public Mono<Void> remove(MailRepositoryUrl url, MailKey key) { + return executor.executeVoidReactor(deleteMail.bind() .setString(REPOSITORY_NAME, url.asString()) .setString(MAIL_KEY, key.asString())); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java index 435bcf1..bf49097 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java @@ -31,10 +31,12 @@ import org.apache.james.mailrepository.api.MailRepositoryUrl; import org.apache.james.server.core.MailImpl; import org.apache.mailet.Mail; +import reactor.core.publisher.Mono; + public interface CassandraMailRepositoryMailDaoAPI { CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) throws MessagingException; - CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key); + Mono<Void> remove(MailRepositoryUrl url, MailKey key); CompletableFuture<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java index 36e8e28..5b4edfe 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java @@ -79,6 +79,7 @@ import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import reactor.core.publisher.Mono; public class CassandraMailRepositoryMailDaoV2 implements CassandraMailRepositoryMailDaoAPI { @@ -151,8 +152,9 @@ public class CassandraMailRepositoryMailDaoV2 implements CassandraMailRepository ); } - public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) { - return executor.executeVoid(deleteMail.bind() + @Override + public Mono<Void> remove(MailRepositoryUrl url, MailKey key) { + return executor.executeVoidReactor(deleteMail.bind() .setString(REPOSITORY_NAME, url.asString()) .setString(MAIL_KEY, key.asString())); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java index 8b01a38..f83766a 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java @@ -32,6 +32,8 @@ import org.apache.james.util.OptionalUtils; import org.apache.mailet.Mail; import com.google.common.annotations.VisibleForTesting; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class MergingCassandraMailRepositoryMailDao implements CassandraMailRepositoryMailDaoAPI { @@ -51,8 +53,8 @@ public class MergingCassandraMailRepositoryMailDao implements CassandraMailRepos } @Override - public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) { - return CompletableFuture.allOf(v1.remove(url, key), v2.remove(url, key)); + public Mono<Void> remove(MailRepositoryUrl url, MailKey key) { + return Flux.merge(v1.remove(url, key), v2.remove(url, key)).then(); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java index 98b6fa2..32eb13d 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java @@ -50,7 +50,7 @@ class CassandraMailRepositoryCountDAOTest { @Test void getCountShouldReturnOneWhenIncrementedOneTime() { - testee.increment(URL).join(); + testee.increment(URL).block(); assertThat(testee.getCount(URL).join()) .isEqualTo(1L); @@ -58,7 +58,7 @@ class CassandraMailRepositoryCountDAOTest { @Test void incrementShouldNotAffectOtherUrls() { - testee.increment(URL).join(); + testee.increment(URL).block(); assertThat(testee.getCount(URL2).join()) .isEqualTo(0L); @@ -66,8 +66,8 @@ class CassandraMailRepositoryCountDAOTest { @Test void incrementCanBeAppliedSeveralTime() { - testee.increment(URL).join(); - testee.increment(URL).join(); + testee.increment(URL).block(); + testee.increment(URL).block(); assertThat(testee.getCount(URL).join()) .isEqualTo(2L); @@ -75,11 +75,11 @@ class CassandraMailRepositoryCountDAOTest { @Test void decrementShouldDecreaseCount() { - testee.increment(URL).join(); - testee.increment(URL).join(); - testee.increment(URL).join(); + testee.increment(URL).block(); + testee.increment(URL).block(); + testee.increment(URL).block(); - testee.decrement(URL).join(); + testee.decrement(URL).block(); assertThat(testee.getCount(URL).join()) .isEqualTo(2L); @@ -87,7 +87,7 @@ class CassandraMailRepositoryCountDAOTest { @Test void decrementCanLeadToNegativeCount() { - testee.decrement(URL).join(); + testee.decrement(URL).block(); assertThat(testee.getCount(URL).join()) .isEqualTo(-1L); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java index 3c457af..db7f98a 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java @@ -23,7 +23,9 @@ import static org.assertj.core.api.Assertions.assertThat; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepositoryUrl; import org.junit.jupiter.api.BeforeEach; @@ -36,9 +38,11 @@ class CassandraMailRepositoryKeysDAOTest { static final MailKey KEY_1 = new MailKey("key1"); static final MailKey KEY_2 = new MailKey("key2"); static final MailKey KEY_3 = new MailKey("key3"); + static final CassandraModule MODULE = CassandraModule.aggregateModules(CassandraMailRepositoryModule.MODULE, + CassandraSchemaVersionModule.MODULE); @RegisterExtension - static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailRepositoryModule.MODULE); + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE); CassandraMailRepositoryKeysDAO testee; @@ -49,75 +53,75 @@ class CassandraMailRepositoryKeysDAOTest { @Test void listShouldBeEmptyByDefault() { - assertThat(testee.list(URL).join()) + assertThat(testee.list(URL).collectList().block()) .isEmpty(); } @Test void listShouldReturnEmptyByDefault() { - testee.store(URL, KEY_1).join(); + testee.store(URL, KEY_1).block(); - assertThat(testee.list(URL).join()) + assertThat(testee.list(URL).collectList().block()) .containsOnly(KEY_1); } @Test void listShouldNotReturnElementsOfOtherRepositories() { - testee.store(URL, KEY_1).join(); + testee.store(URL, KEY_1).block(); - assertThat(testee.list(URL2).join()) + assertThat(testee.list(URL2).collectList().block()) .isEmpty(); } @Test void listShouldReturnSeveralElements() { - testee.store(URL, KEY_1).join(); - testee.store(URL, KEY_2).join(); - testee.store(URL, KEY_3).join(); + testee.store(URL, KEY_1).block(); + testee.store(URL, KEY_2).block(); + testee.store(URL, KEY_3).block(); - assertThat(testee.list(URL).join()) + assertThat(testee.list(URL).collectList().block()) .containsOnly(KEY_1, KEY_2, KEY_3); } @Test void listShouldNotReturnRemovedElements() { - testee.store(URL, KEY_1).join(); - testee.store(URL, KEY_2).join(); - testee.store(URL, KEY_3).join(); + testee.store(URL, KEY_1).block(); + testee.store(URL, KEY_2).block(); + testee.store(URL, KEY_3).block(); - testee.remove(URL, KEY_2).join(); + testee.remove(URL, KEY_2).block(); - assertThat(testee.list(URL).join()) + assertThat(testee.list(URL).collectList().block()) .containsOnly(KEY_1, KEY_3); } @Test void removeShouldBeIdempotent() { - testee.remove(URL, KEY_2).join(); + testee.remove(URL, KEY_2).block(); } @Test void removeShouldNotAffectOtherRepositories() { - testee.store(URL, KEY_1).join(); + testee.store(URL, KEY_1).block(); - testee.remove(URL2, KEY_2).join(); + testee.remove(URL2, KEY_2).block(); - assertThat(testee.list(URL).join()) + assertThat(testee.list(URL).collectList().block()) .containsOnly(KEY_1); } @Test void removeShouldReturnTrueWhenKeyDeleted() { - testee.store(URL, KEY_1).join(); + testee.store(URL, KEY_1).block(); - boolean isDeleted = testee.remove(URL, KEY_1).join(); + boolean isDeleted = testee.remove(URL, KEY_1).block(); assertThat(isDeleted).isTrue(); } @Test void removeShouldReturnFalseWhenKeyNotDeleted() { - boolean isDeleted = testee.remove(URL2, KEY_2).join(); + boolean isDeleted = testee.remove(URL2, KEY_2).block(); assertThat(isDeleted).isFalse(); } @@ -125,16 +129,16 @@ class CassandraMailRepositoryKeysDAOTest { @Test void storeShouldReturnTrueWhenNotPreviouslyStored() { - boolean isStored = testee.store(URL, KEY_1).join(); + boolean isStored = testee.store(URL, KEY_1).block(); assertThat(isStored).isTrue(); } @Test void storeShouldReturnFalseWhenPreviouslyStored() { - testee.store(URL, KEY_1).join(); + testee.store(URL, KEY_1).block(); - boolean isStored = testee.store(URL, KEY_1).join(); + boolean isStored = testee.store(URL, KEY_1).block(); assertThat(isStored).isFalse(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java index 2377b6b..f597eff 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java @@ -104,7 +104,7 @@ class CassandraMailRepositoryMailDAOTest { blobIdBody) .join(); - testee.remove(URL, KEY_1).join(); + testee.remove(URL, KEY_1).block(); assertThat(testee.read(URL, KEY_1).join()) .isEmpty(); @@ -374,7 +374,7 @@ class CassandraMailRepositoryMailDAOTest { blobIdBody2) .join(); - testee.remove(URL, KEY_1).join(); + testee.remove(URL, KEY_1).block(); Optional<CassandraMailRepositoryMailDaoAPI.MailDTO> v1Entry = v1.read(URL, KEY_1).join(); Optional<CassandraMailRepositoryMailDaoAPI.MailDTO> v2Entry = v2.read(URL, KEY_1).join(); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java index ca2eb2c..b39ed2d 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java @@ -33,6 +33,7 @@ import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.api.Store; @@ -56,17 +57,19 @@ import org.junit.jupiter.api.extension.ExtensionContext; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Mono; @ExtendWith(CassandraMailRepositoryWithFakeImplementationsTest.MailRepositoryCassandraClusterExtension.class) class CassandraMailRepositoryWithFakeImplementationsTest { - static final MailRepositoryUrl URL = MailRepositoryUrl.from("proto://url"); - static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); + private static final MailRepositoryUrl URL = MailRepositoryUrl.from("proto://url"); + private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); static class MailRepositoryCassandraClusterExtension extends CassandraClusterExtension { public MailRepositoryCassandraClusterExtension() { super(CassandraModule.aggregateModules( CassandraMailRepositoryModule.MODULE, - CassandraBlobModule.MODULE)); + CassandraBlobModule.MODULE, + CassandraSchemaVersionModule.MODULE)); } @Override @@ -122,7 +125,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest { .isInstanceOf(RuntimeException.class) .hasMessage("java.lang.RuntimeException: Expected failure while saving"); - assertThat(keysDAO.list(URL).join()).isEmpty(); + assertThat(keysDAO.list(URL).collectList().block()).isEmpty(); } } @@ -155,9 +158,9 @@ class CassandraMailRepositoryWithFakeImplementationsTest { } @Override - public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) { - return CompletableFuture.supplyAsync(() -> { - throw new RuntimeException("Expected failure while remeving mail parts"); + public Mono<Void> remove(MailRepositoryUrl url, MailKey key) { + return Mono.fromCallable(() -> { + throw new RuntimeException("Expected failure while removing mail parts"); }); } @@ -186,7 +189,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest { .isInstanceOf(RuntimeException.class) .hasMessage("java.lang.RuntimeException: Expected failure while storing mail parts"); - assertThat(keysDAO.list(URL).join()).isEmpty(); + assertThat(keysDAO.list(URL).collectList().block()).isEmpty(); } @Test @@ -234,8 +237,8 @@ class CassandraMailRepositoryWithFakeImplementationsTest { } @Override - public CompletableFuture<Boolean> store(MailRepositoryUrl url, MailKey key) { - return CompletableFuture.supplyAsync(() -> { + public Mono<Boolean> store(MailRepositoryUrl url, MailKey key) { + return Mono.fromCallable(() -> { throw new RuntimeException("Expected failure while storing keys"); }); } @@ -255,7 +258,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest { assertThatThrownBy(() -> cassandraMailRepository.store(mail)) .isInstanceOf(RuntimeException.class) - .hasMessage("java.lang.RuntimeException: Expected failure while storing keys"); + .hasMessage("Expected failure while storing keys"); assertThat(countDAO.getCount(URL).join()).isEqualTo(0); } @@ -274,7 +277,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest { assertThatThrownBy(() -> cassandraMailRepository.store(mail)) .isInstanceOf(RuntimeException.class) - .hasMessage("java.lang.RuntimeException: Expected failure while storing keys"); + .hasMessage("Expected failure while storing keys"); ResultSet resultSet = cassandra.getConf().execute(select() .from(BlobTable.TABLE_NAME)); @@ -295,7 +298,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest { assertThatThrownBy(() -> cassandraMailRepository.store(mail)) .isInstanceOf(RuntimeException.class) - .hasMessage("java.lang.RuntimeException: Expected failure while storing keys"); + .hasMessage("Expected failure while storing keys"); ResultSet resultSet = cassandra.getConf().execute(select() .from(MailRepositoryTable.CONTENT_TABLE_NAME)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
