http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java index f7d7b5d..0b87025 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java @@ -29,12 +29,13 @@ import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule; import org.apache.james.mailbox.model.AttachmentId; import org.apache.james.mailbox.store.mail.model.Username; -import org.apache.james.util.FluentFutureStream; import org.apache.james.util.streams.JamesCollectors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Flux; + class CassandraAttachmentOwnerDAOTest { private static final AttachmentId ATTACHMENT_ID = AttachmentId.from("id1"); private static final Username OWNER_1 = Username.fromRawValue("owner1"); @@ -59,7 +60,7 @@ class CassandraAttachmentOwnerDAOTest { @Test void retrieveOwnersShouldReturnAddedOwner() { - testee.addOwner(ATTACHMENT_ID, OWNER_1).join(); + testee.addOwner(ATTACHMENT_ID, OWNER_1).block(); assertThat(testee.retrieveOwners(ATTACHMENT_ID).join()) .containsOnly(OWNER_1); @@ -67,8 +68,8 @@ class CassandraAttachmentOwnerDAOTest { @Test void retrieveOwnersShouldReturnAddedOwners() { - testee.addOwner(ATTACHMENT_ID, OWNER_1).join(); - testee.addOwner(ATTACHMENT_ID, OWNER_2).join(); + testee.addOwner(ATTACHMENT_ID, OWNER_1).block(); + testee.addOwner(ATTACHMENT_ID, OWNER_2).block(); assertThat(testee.retrieveOwners(ATTACHMENT_ID).join()) .containsOnly(OWNER_1, OWNER_2); @@ -81,10 +82,10 @@ class CassandraAttachmentOwnerDAOTest { IntStream.range(0, referenceCountExceedingPaging) .boxed() .collect(JamesCollectors.chunker(128)) - .forEach(chunk -> FluentFutureStream.of( - chunk.stream() - .map(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner" + i)))) - .join()); + .forEach(chunk -> Flux.fromIterable(chunk) + .flatMap(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner" + i))) + .then() + .block()); assertThat(testee.retrieveOwners(ATTACHMENT_ID).join()) .hasSize(referenceCountExceedingPaging);
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java index 09dc1ee..4af1bdc 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java @@ -36,8 +36,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import com.github.steveash.guavate.Guavate; - class CassandraMailboxDAOTest { private static final int UID_VALIDITY_1 = 145; private static final int UID_VALIDITY_2 = 147; @@ -70,30 +68,30 @@ class CassandraMailboxDAOTest { @Test void retrieveMailboxShouldReturnEmptyWhenNone() { - assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).join()) + assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).blockOptional()) .isEmpty(); } @Test void saveShouldAddAMailbox() { - testee.save(mailbox1).join(); + testee.save(mailbox1).block(); Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1) - .join(); + .blockOptional(); assertThat(readMailbox.isPresent()).isTrue(); assertThat(readMailbox.get()).isEqualToComparingFieldByField(mailbox1); } @Test void saveShouldOverride() { - testee.save(mailbox1).join(); + testee.save(mailbox1).block(); mailbox2.setMailboxId(CASSANDRA_ID_1); - testee.save(mailbox2).join(); + testee.save(mailbox2).block(); Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1) - .join(); + .blockOptional(); assertThat(readMailbox.isPresent()).isTrue(); assertThat(readMailbox.get()).isEqualToComparingFieldByField(mailbox2); } @@ -101,47 +99,47 @@ class CassandraMailboxDAOTest { @Test void retrieveAllMailboxesShouldBeEmptyByDefault() { List<SimpleMailbox> mailboxes = testee.retrieveAllMailboxes() - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(mailboxes).isEmpty(); } @Test void retrieveAllMailboxesShouldReturnSingleMailbox() { - testee.save(mailbox1).join(); + testee.save(mailbox1).block(); List<SimpleMailbox> mailboxes = testee.retrieveAllMailboxes() - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(mailboxes).containsOnly(mailbox1); } @Test void retrieveAllMailboxesShouldReturnMultiMailboxes() { - testee.save(mailbox1).join(); - testee.save(mailbox2).join(); + testee.save(mailbox1).block(); + testee.save(mailbox2).block(); List<SimpleMailbox> mailboxes = testee.retrieveAllMailboxes() - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(mailboxes).containsOnly(mailbox1, mailbox2); } @Test void deleteShouldNotFailWhenMailboxIsAbsent() { - testee.delete(CASSANDRA_ID_1).join(); + testee.delete(CASSANDRA_ID_1).block(); } @Test void deleteShouldRemoveExistingMailbox() { - testee.save(mailbox1).join(); + testee.save(mailbox1).block(); - testee.delete(CASSANDRA_ID_1).join(); + testee.delete(CASSANDRA_ID_1).block(); - assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).join()) + assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).blockOptional()) .isEmpty(); } @@ -152,14 +150,14 @@ class CassandraMailboxDAOTest { @Test void updateShouldChangeMailboxPath() { - testee.save(mailbox1).join(); + testee.save(mailbox1).block(); testee.updatePath(CASSANDRA_ID_1, NEW_MAILBOX_PATH).join(); mailbox1.setNamespace(NEW_MAILBOX_PATH.getNamespace()); mailbox1.setUser(NEW_MAILBOX_PATH.getUser()); mailbox1.setName(NEW_MAILBOX_PATH.getName()); - Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1).join(); + Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1).blockOptional(); assertThat(readMailbox.isPresent()).isTrue(); assertThat(readMailbox.get()).isEqualToComparingFieldByField(mailbox1); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java index 1c3fb37..4ffa3dd 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java @@ -113,7 +113,7 @@ public class CassandraMailboxMapperTest { testee.save(newMailbox)) .isInstanceOf(TooLongMailboxNameException.class); - assertThat(mailboxPathV2DAO.retrieveId(MAILBOX_PATH).join()) + assertThat(mailboxPathV2DAO.retrieveId(MAILBOX_PATH).blockOptional()) .isPresent(); } @@ -124,9 +124,9 @@ public class CassandraMailboxMapperTest { @Test public void deleteShouldDeleteMailboxAndMailboxPathFromV1Table() { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); testee.delete(MAILBOX); @@ -137,9 +137,9 @@ public class CassandraMailboxMapperTest { @Test public void deleteShouldDeleteMailboxAndMailboxPathFromV2Table() { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); testee.delete(MAILBOX); @@ -150,9 +150,9 @@ public class CassandraMailboxMapperTest { @Test public void findMailboxByPathShouldReturnMailboxWhenExistsInV1Table() throws Exception { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); Mailbox mailbox = testee.findMailboxByPath(MAILBOX_PATH); @@ -162,9 +162,9 @@ public class CassandraMailboxMapperTest { @Test public void findMailboxByPathShouldReturnMailboxWhenExistsInV2Table() throws Exception { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); Mailbox mailbox = testee.findMailboxByPath(MAILBOX_PATH); @@ -174,11 +174,11 @@ public class CassandraMailboxMapperTest { @Test public void findMailboxByPathShouldReturnMailboxWhenExistsInBothTables() throws Exception { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); Mailbox mailbox = testee.findMailboxByPath(MAILBOX_PATH); @@ -188,11 +188,11 @@ public class CassandraMailboxMapperTest { @Test public void deleteShouldRemoveMailboxWhenInBothTables() { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); testee.delete(MAILBOX); @@ -203,9 +203,9 @@ public class CassandraMailboxMapperTest { @Test public void deleteShouldRemoveMailboxWhenInV1Tables() { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); testee.delete(MAILBOX); @@ -216,9 +216,9 @@ public class CassandraMailboxMapperTest { @Test public void deleteShouldRemoveMailboxWhenInV2Table() { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); testee.delete(MAILBOX); @@ -229,7 +229,7 @@ public class CassandraMailboxMapperTest { @Test public void findMailboxByPathShouldThrowWhenDoesntExistInBothTables() { mailboxDAO.save(MAILBOX) - .join(); + .block(); assertThatThrownBy(() -> testee.findMailboxByPath(MAILBOX_PATH)) .isInstanceOf(MailboxNotFoundException.class); @@ -238,9 +238,9 @@ public class CassandraMailboxMapperTest { @Test public void findMailboxWithPathLikeShouldReturnMailboxesWhenExistsInV1Table() { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); List<Mailbox> mailboxes = testee.findMailboxWithPathLike(new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD)); @@ -250,11 +250,11 @@ public class CassandraMailboxMapperTest { @Test public void findMailboxWithPathLikeShouldReturnMailboxesWhenExistsInBothTables() { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); List<Mailbox> mailboxes = testee.findMailboxWithPathLike(new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD)); @@ -264,9 +264,9 @@ public class CassandraMailboxMapperTest { @Test public void findMailboxWithPathLikeShouldReturnMailboxesWhenExistsInV2Table() { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); List<Mailbox> mailboxes = testee.findMailboxWithPathLike(new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD)); @@ -276,16 +276,16 @@ public class CassandraMailboxMapperTest { @Test public void hasChildrenShouldReturnChildWhenExistsInV1Table() { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); CassandraId childMailboxId = CassandraId.timeBased(); MailboxPath childMailboxPath = MailboxPath.forUser(USER, "name.child"); Mailbox childMailbox = new SimpleMailbox(childMailboxPath, UID_VALIDITY, childMailboxId); mailboxDAO.save(childMailbox) - .join(); + .block(); mailboxPathDAO.save(childMailboxPath, childMailboxId) - .join(); + .block(); boolean hasChildren = testee.hasChildren(MAILBOX, '.'); @@ -295,18 +295,18 @@ public class CassandraMailboxMapperTest { @Test public void hasChildrenShouldReturnChildWhenExistsInBothTables() { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); CassandraId childMailboxId = CassandraId.timeBased(); MailboxPath childMailboxPath = MailboxPath.forUser(USER, "name.child"); Mailbox childMailbox = new SimpleMailbox(childMailboxPath, UID_VALIDITY, childMailboxId); mailboxDAO.save(childMailbox) - .join(); + .block(); mailboxPathDAO.save(childMailboxPath, childMailboxId) - .join(); + .block(); boolean hasChildren = testee.hasChildren(MAILBOX, '.'); @@ -316,16 +316,16 @@ public class CassandraMailboxMapperTest { @Test public void hasChildrenShouldReturnChildWhenExistsInV2Table() { mailboxDAO.save(MAILBOX) - .join(); + .block(); mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID) - .join(); + .block(); CassandraId childMailboxId = CassandraId.timeBased(); MailboxPath childMailboxPath = MailboxPath.forUser(USER, "name.child"); Mailbox childMailbox = new SimpleMailbox(childMailboxPath, UID_VALIDITY, childMailboxId); mailboxDAO.save(childMailbox) - .join(); + .block(); mailboxPathV2DAO.save(childMailboxPath, childMailboxId) - .join(); + .block(); boolean hasChildren = testee.hasChildren(MAILBOX, '.'); @@ -334,11 +334,11 @@ public class CassandraMailboxMapperTest { @Test public void findMailboxWithPathLikeShouldRemoveDuplicatesAndKeepV2() { - mailboxDAO.save(MAILBOX).join(); - mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID).join(); + mailboxDAO.save(MAILBOX).block(); + mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID).block(); - mailboxDAO.save(MAILBOX_BIS).join(); - mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID_2).join(); + mailboxDAO.save(MAILBOX_BIS).block(); + mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID_2).block(); assertThat(testee.findMailboxWithPathLike( new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD))) http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java index 429b8af..dbf64f2 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java @@ -35,9 +35,9 @@ class CassandraMailboxPathDAOImplTest extends CassandraMailboxPathDAOTest { @Test void countAllShouldReturnEntryCount() { - testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join(); - testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).join(); - testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).join(); + testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block(); + testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).block(); + testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).block(); CassandraMailboxPathDAOImpl daoV1 = (CassandraMailboxPathDAOImpl) testee; @@ -55,9 +55,9 @@ class CassandraMailboxPathDAOImplTest extends CassandraMailboxPathDAOTest { @Test void readAllShouldReturnAllStoredData() { - testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join(); - testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).join(); - testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).join(); + testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block(); + testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).block(); + testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).block(); CassandraMailboxPathDAOImpl daoV1 = (CassandraMailboxPathDAOImpl) testee; http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java index 43592ae..acd6245 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java @@ -34,8 +34,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import com.github.steveash.guavate.Guavate; - import nl.jqno.equalsverifier.EqualsVerifier; public abstract class CassandraMailboxPathDAOTest { @@ -70,43 +68,42 @@ public abstract class CassandraMailboxPathDAOTest { @Test void saveShouldInsertNewEntry() { - assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join()).isTrue(); + assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block()).isTrue(); - assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join()) + assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional()) .contains(INBOX_ID_AND_PATH); } @Test void saveOnSecondShouldBeFalse() { - assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join()).isTrue(); - assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join()).isFalse(); + assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block()).isTrue(); + assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block()).isFalse(); } @Test void retrieveIdShouldReturnEmptyWhenEmptyData() { - assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join() - .isPresent()) - .isFalse(); + assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional()) + .isEmpty(); } @Test void retrieveIdShouldReturnStoredData() { - testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join(); + testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block(); - assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join()) + assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional()) .contains(INBOX_ID_AND_PATH); } @Test void getUserMailboxesShouldReturnAllMailboxesOfUser() { - testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join(); - testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).join(); - testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).join(); + testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block(); + testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).block(); + testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).block(); List<CassandraIdAndPath> cassandraIds = testee .listUserMailboxes(USER_INBOX_MAILBOXPATH.getNamespace(), USER_INBOX_MAILBOXPATH.getUser()) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(cassandraIds) .hasSize(2) @@ -115,16 +112,16 @@ public abstract class CassandraMailboxPathDAOTest { @Test void deleteShouldNotThrowWhenEmpty() { - testee.delete(USER_INBOX_MAILBOXPATH).join(); + testee.delete(USER_INBOX_MAILBOXPATH).block(); } @Test void deleteShouldDeleteTheExistingMailboxId() { - testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join(); + testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block(); - testee.delete(USER_INBOX_MAILBOXPATH).join(); + testee.delete(USER_INBOX_MAILBOXPATH).block(); - assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join()) + assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional()) .isEmpty(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java index 95df7fb..914d1e9 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java @@ -21,6 +21,7 @@ package org.apache.james.mailbox.cassandra.mail; import java.util.List; import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MailboxSessionUtil; import org.apache.james.mailbox.MessageUid; @@ -55,7 +56,7 @@ public class CassandraMapperProvider implements MapperProvider { public CassandraMapperProvider(CassandraCluster cassandra) { this.cassandra = cassandra; messageUidProvider = new MessageUidProvider(); - cassandraModSeqProvider = new CassandraModSeqProvider(this.cassandra.getConf()); + cassandraModSeqProvider = new CassandraModSeqProvider(this.cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION); mapperFactory = createMapperFactory(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java index da90b78..4eabd41 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java @@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Date; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import javax.mail.Flags; @@ -64,6 +63,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Bytes; import nl.jqno.equalsverifier.EqualsVerifier; +import reactor.core.publisher.Flux; class CassandraMessageDAOTest { private static final int BODY_START = 16; @@ -186,8 +186,8 @@ class CassandraMessageDAOTest { .build(); } - private MessageWithoutAttachment toMessage(CompletableFuture<Stream<CassandraMessageDAO.MessageResult>> readOptional) throws InterruptedException, java.util.concurrent.ExecutionException { - return readOptional.join() + private MessageWithoutAttachment toMessage(Flux<CassandraMessageDAO.MessageResult> read) throws InterruptedException, java.util.concurrent.ExecutionException { + return read.toStream() .map(CassandraMessageDAO.MessageResult::message) .map(Pair::getLeft) .findAny() http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java index 26947ab..67b9c90 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java @@ -57,7 +57,7 @@ class CassandraMessageIdDAOTest { @Test void deleteShouldNotThrowWhenRowDoesntExist() { testee.delete(CassandraId.timeBased(), MessageUid.of(1)) - .join(); + .block(); } @Test @@ -72,7 +72,7 @@ class CassandraMessageIdDAOTest { .build()) .join(); - testee.delete(mailboxId, messageUid).join(); + testee.delete(mailboxId, messageUid).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.isPresent()).isFalse(); @@ -98,7 +98,7 @@ class CassandraMessageIdDAOTest { .build())) .join(); - testee.delete(mailboxId, messageUid).join(); + testee.delete(mailboxId, messageUid).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.isPresent()).isFalse(); @@ -142,7 +142,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId).join(); + testee.updateMetadata(expectedComposedMessageId).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.get()).isEqualTo(expectedComposedMessageId); @@ -167,7 +167,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags(Flag.ANSWERED)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId).join(); + testee.updateMetadata(expectedComposedMessageId).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.get()).isEqualTo(expectedComposedMessageId); @@ -192,7 +192,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags(Flag.DELETED)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId).join(); + testee.updateMetadata(expectedComposedMessageId).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.get()).isEqualTo(expectedComposedMessageId); @@ -217,7 +217,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags(Flag.DRAFT)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId).join(); + testee.updateMetadata(expectedComposedMessageId).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.get()).isEqualTo(expectedComposedMessageId); @@ -242,7 +242,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags(Flag.FLAGGED)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId).join(); + testee.updateMetadata(expectedComposedMessageId).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.get()).isEqualTo(expectedComposedMessageId); @@ -267,7 +267,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags(Flag.RECENT)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId).join(); + testee.updateMetadata(expectedComposedMessageId).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.get()).isEqualTo(expectedComposedMessageId); @@ -292,7 +292,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags(Flag.SEEN)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId).join(); + testee.updateMetadata(expectedComposedMessageId).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.get()).isEqualTo(expectedComposedMessageId); @@ -317,7 +317,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags(Flag.USER)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId).join(); + testee.updateMetadata(expectedComposedMessageId).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.get()).isEqualTo(expectedComposedMessageId); @@ -344,7 +344,7 @@ class CassandraMessageIdDAOTest { .flags(flags) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId).join(); + testee.updateMetadata(expectedComposedMessageId).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.get()).isEqualTo(expectedComposedMessageId); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java index a200680..f563e5f 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java @@ -20,9 +20,9 @@ package org.apache.james.mailbox.cassandra.mail; import static org.assertj.core.api.Assertions.assertThat; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; import javax.mail.Flags; import javax.mail.Flags.Flag; @@ -42,12 +42,14 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import com.datastax.driver.core.utils.UUIDs; -import com.github.steveash.guavate.Guavate; class CassandraMessageIdToImapUidDAOTest { + public static final CassandraModule MODULE = CassandraModule.aggregateModules( + CassandraSchemaVersionModule.MODULE, + CassandraMessageModule.MODULE); + @RegisterExtension - static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules( - CassandraMessageModule.MODULE, CassandraSchemaVersionModule.MODULE)); + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE); private CassandraMessageId.Factory messageIdFactory; @@ -62,7 +64,7 @@ class CassandraMessageIdToImapUidDAOTest { @Test void deleteShouldNotThrowWhenRowDoesntExist() { testee.delete(messageIdFactory.of(UUIDs.timeBased()), CassandraId.timeBased()) - .join(); + .block(); } @Test @@ -77,10 +79,10 @@ class CassandraMessageIdToImapUidDAOTest { .build()) .join(); - testee.delete(messageId, mailboxId).join(); + testee.delete(messageId, mailboxId).block(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); - assertThat(messages.collect(Guavate.toImmutableList())).isEmpty(); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); + assertThat(messages).isEmpty(); } @Test @@ -103,15 +105,15 @@ class CassandraMessageIdToImapUidDAOTest { .build())) .join(); - testee.delete(messageId, mailboxId).join(); + testee.delete(messageId, mailboxId).block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(mailboxId2, messageId, messageUid2)) .flags(new Flags()) .modSeq(1) .build(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).join(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).collectList().block(); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -132,8 +134,8 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -149,7 +151,7 @@ class CassandraMessageIdToImapUidDAOTest { .build(); testee.insert(composedMessageIdWithFlags).join(); - Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 1).join(); + Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 1).block(); assertThat(result).isTrue(); } @@ -167,7 +169,7 @@ class CassandraMessageIdToImapUidDAOTest { .build(); testee.insert(composedMessageIdWithFlags).join(); - Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 3).join(); + Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 3).block(); assertThat(result).isFalse(); } @@ -191,10 +193,10 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId, 1).join(); + testee.updateMetadata(expectedComposedMessageId, 1).block(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -216,10 +218,10 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags(Flag.ANSWERED)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId, 1).join(); + testee.updateMetadata(expectedComposedMessageId, 1).block(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -241,10 +243,10 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags(Flag.DELETED)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId, 1).join(); + testee.updateMetadata(expectedComposedMessageId, 1).block(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -266,10 +268,10 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags(Flag.DRAFT)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId, 1).join(); + testee.updateMetadata(expectedComposedMessageId, 1).block(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -291,10 +293,10 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags(Flag.FLAGGED)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId, 1).join(); + testee.updateMetadata(expectedComposedMessageId, 1).block(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -316,10 +318,10 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags(Flag.RECENT)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId, 1).join(); + testee.updateMetadata(expectedComposedMessageId, 1).block(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -341,10 +343,11 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags(Flag.SEEN)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId, 1).join(); + assertThat(testee.updateMetadata(expectedComposedMessageId, 1).block()) + .isTrue(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -366,10 +369,10 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags(Flag.USER)) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId, 1).join(); + testee.updateMetadata(expectedComposedMessageId, 1).block(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -393,10 +396,10 @@ class CassandraMessageIdToImapUidDAOTest { .flags(flags) .modSeq(2) .build(); - testee.updateMetadata(expectedComposedMessageId, 1).join(); + testee.updateMetadata(expectedComposedMessageId, 1).block(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -416,9 +419,9 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join(); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId); + assertThat(messages).containsOnly(expectedComposedMessageId); } @Test @@ -451,8 +454,8 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build(); - Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).join(); + List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).collectList().block(); - assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId, expectedComposedMessageId2); + assertThat(messages).containsOnly(expectedComposedMessageId, expectedComposedMessageId2); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java index a0b691d..a8a2722 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java @@ -20,14 +20,19 @@ package org.apache.james.mailbox.cassandra.mail; import static org.assertj.core.api.Assertions.assertThat; +import java.time.Duration; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutionException; import java.util.stream.LongStream; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.modules.CassandraModSeqModule; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox; +import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -46,7 +51,7 @@ class CassandraModSeqProviderTest { @BeforeEach void setUp(CassandraCluster cassandra) { - modSeqProvider = new CassandraModSeqProvider(cassandra.getConf()); + modSeqProvider = new CassandraModSeqProvider(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION); MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash"); mailbox = new SimpleMailbox(path, 1234); mailbox.setMailboxId(CASSANDRA_ID); @@ -77,13 +82,17 @@ class CassandraModSeqProviderTest { } @Test - void nextModSeqShouldGenerateUniqueValuesWhenParallelCalls() { - int nbEntries = 100; - long nbValues = LongStream.range(0, nbEntries) - .parallel() - .map(Throwing.longUnaryOperator(x -> modSeqProvider.nextModSeq(null, mailbox))) - .distinct() - .count(); - assertThat(nbValues).isEqualTo(nbEntries); + void nextModSeqShouldGenerateUniqueValuesWhenParallelCalls() throws ExecutionException, InterruptedException { + int nbEntries = 10; + + ConcurrentSkipListSet<Long> modSeqs = new ConcurrentSkipListSet<>(); + ConcurrentTestRunner.builder() + .operation( + (threadNumber, step) -> modSeqs.add(modSeqProvider.nextModSeq(null, mailbox))) + .threadCount(10) + .operationCount(nbEntries) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + + assertThat(modSeqs).hasSize(100); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java index 619ced7..5a3fe69 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java @@ -20,16 +20,21 @@ package org.apache.james.mailbox.cassandra.mail; import static org.assertj.core.api.Assertions.assertThat; +import java.time.Duration; import java.util.Optional; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutionException; import java.util.stream.LongStream; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.modules.CassandraUidModule; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox; +import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -47,7 +52,7 @@ class CassandraUidProviderTest { @BeforeEach void setUp(CassandraCluster cassandra) { - uidProvider = new CassandraUidProvider(cassandra.getConf()); + uidProvider = new CassandraUidProvider(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION); MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash"); mailbox = new SimpleMailbox(path, 1234); mailbox.setMailboxId(CASSANDRA_ID); @@ -77,14 +82,17 @@ class CassandraUidProviderTest { } @Test - void nextUidShouldGenerateUniqueValuesWhenParallelCalls() { + void nextUidShouldGenerateUniqueValuesWhenParallelCalls() throws ExecutionException, InterruptedException { + int threadCount = 10; int nbEntries = 100; - long nbValues = LongStream.range(0, nbEntries) - .parallel() - .boxed() - .map(Throwing.function(x -> uidProvider.nextUid(null, mailbox))) - .distinct() - .count(); - assertThat(nbValues).isEqualTo(nbEntries); + + ConcurrentSkipListSet<MessageUid> messageUids = new ConcurrentSkipListSet<>(); + ConcurrentTestRunner.builder() + .operation((threadNumber, step) -> messageUids.add(uidProvider.nextUid(null, mailbox))) + .threadCount(threadCount) + .operationCount(nbEntries / threadCount) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + + assertThat(messageUids).hasSize(nbEntries); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java index 9730bda..1596e1c 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java @@ -57,7 +57,7 @@ class CassandraUserMailboxRightsDAOTest { testee.update(MAILBOX_ID, ACLDiff.computeDiff( MailboxACL.EMPTY, new MailboxACL(new Entry(ENTRY_KEY, RIGHTS)))) - .join(); + .block(); assertThat(testee.retrieve(USER_NAME, MAILBOX_ID).join()) .contains(RIGHTS); @@ -68,12 +68,12 @@ class CassandraUserMailboxRightsDAOTest { testee.update(MAILBOX_ID, ACLDiff.computeDiff( MailboxACL.EMPTY, new MailboxACL(new Entry(ENTRY_KEY, RIGHTS)))) - .join(); + .block(); testee.update(MAILBOX_ID, ACLDiff.computeDiff( new MailboxACL(new Entry(ENTRY_KEY, RIGHTS)), new MailboxACL(new Entry(ENTRY_KEY, OTHER_RIGHTS)))) - .join(); + .block(); assertThat(testee.retrieve(USER_NAME, MAILBOX_ID).join()) .contains(OTHER_RIGHTS); @@ -81,7 +81,7 @@ class CassandraUserMailboxRightsDAOTest { @Test void listRightsForUserShouldReturnEmptyWhenEmptyData() { - assertThat(testee.listRightsForUser(USER_NAME).join()) + assertThat(testee.listRightsForUser(USER_NAME).collectList().block()) .isEmpty(); } @@ -90,13 +90,13 @@ class CassandraUserMailboxRightsDAOTest { testee.update(MAILBOX_ID, ACLDiff.computeDiff( MailboxACL.EMPTY, new MailboxACL(new Entry(ENTRY_KEY, RIGHTS)))) - .join(); + .block(); testee.update(MAILBOX_ID, ACLDiff.computeDiff( new MailboxACL(new Entry(ENTRY_KEY, RIGHTS)), MailboxACL.EMPTY)) - .join(); + .block(); assertThat(testee.retrieve(USER_NAME, MAILBOX_ID).join()) .isEmpty(); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java index a298106..85616b1 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java @@ -46,6 +46,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Mono; + class AttachmentV2MigrationTest { private static final AttachmentId ATTACHMENT_ID = AttachmentId.from("id1"); private static final AttachmentId ATTACHMENT_ID_2 = AttachmentId.from("id2"); @@ -107,9 +109,9 @@ class AttachmentV2MigrationTest { migration.run(); - assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID).join()) + assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID).blockOptional()) .contains(CassandraAttachmentDAOV2.from(attachment1, BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); - assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).join()) + assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).blockOptional()) .contains(CassandraAttachmentDAOV2.from(attachment2, BLOB_ID_FACTORY.forPayload(attachment2.getBytes()))); assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).join()) .isEqualTo(attachment1.getBytes()); @@ -124,9 +126,9 @@ class AttachmentV2MigrationTest { migration.run(); - assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID).join()) + assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID).blockOptional()) .isEmpty(); - assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID_2).join()) + assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID_2).blockOptional()) .isEmpty(); } @@ -190,7 +192,7 @@ class AttachmentV2MigrationTest { .thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); when(blobsDAO.save(attachment2.getBytes())) .thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment2.getBytes()))); - when(attachmentDAOV2.storeAttachment(any())).thenReturn(CompletableFuture.completedFuture(null)); + when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty()); when(attachmentDAO.deleteAttachment(any())).thenThrow(new RuntimeException()); assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); @@ -210,7 +212,7 @@ class AttachmentV2MigrationTest { .thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); when(blobsDAO.save(attachment2.getBytes())) .thenThrow(new RuntimeException()); - when(attachmentDAOV2.storeAttachment(any())).thenReturn(CompletableFuture.completedFuture(null)); + when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty()); when(attachmentDAO.deleteAttachment(any())).thenReturn(CompletableFuture.completedFuture(null)); assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java index cf19f73..f17ab80 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java @@ -91,7 +91,7 @@ class MailboxPathV2MigrationTest { void newValuesShouldBeSavedInMostRecentDAO() throws Exception { mailboxMapper.save(MAILBOX_1); - assertThat(daoV2.retrieveId(MAILBOX_PATH_1).join()) + assertThat(daoV2.retrieveId(MAILBOX_PATH_1).blockOptional()) .contains(new CassandraIdAndPath(MAILBOX_ID_1, MAILBOX_PATH_1)); } @@ -99,33 +99,33 @@ class MailboxPathV2MigrationTest { void newValuesShouldNotBeSavedInOldDAO() throws Exception { mailboxMapper.save(MAILBOX_1); - assertThat(daoV1.retrieveId(MAILBOX_PATH_1).join()) + assertThat(daoV1.retrieveId(MAILBOX_PATH_1).blockOptional()) .isEmpty(); } @Test void readingOldValuesShouldMigrateThem() throws Exception { - daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).join(); - mailboxDAO.save(MAILBOX_1).join(); + daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).block(); + mailboxDAO.save(MAILBOX_1).block(); mailboxMapper.findMailboxByPath(MAILBOX_PATH_1); SoftAssertions softly = new SoftAssertions(); - softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).join()).isEmpty(); - softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).join()) + softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).blockOptional()).isEmpty(); + softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).blockOptional()) .contains(new CassandraIdAndPath(MAILBOX_ID_1, MAILBOX_PATH_1)); softly.assertAll(); } @Test void migrationTaskShouldMoveDataToMostRecentDao() { - daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).join(); + daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).block(); new MailboxPathV2Migration(daoV1, daoV2).run(); SoftAssertions softly = new SoftAssertions(); - softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).join()).isEmpty(); - softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).join()) + softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).blockOptional()).isEmpty(); + softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).blockOptional()) .contains(new CassandraIdAndPath(MAILBOX_ID_1, MAILBOX_PATH_1)); softly.assertAll(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java index 637cc69..9c43818 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java @@ -26,11 +26,16 @@ import java.io.InputStream; import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.stream.Stream; +import com.github.fge.lambdas.Throwing; +import com.google.common.collect.ImmutableList; +import org.apache.commons.io.IOUtils; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MailboxSessionUtil; import org.apache.james.mailbox.MessageManager; +import org.apache.james.mailbox.model.Attachment; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.MessageAttachment; import org.apache.james.mailbox.model.MessageRange; @@ -149,10 +154,20 @@ public abstract class AbstractMailboxManagerAttachmentTest { assertThat(messages.hasNext()).isTrue(); List<MessageAttachment> attachments = messages.next().getAttachments(); assertThat(attachments).hasSize(2); - assertThat(attachmentMapper.getAttachment(attachments.get(0).getAttachmentId()).getStream()) - .hasSameContentAs(ClassLoader.getSystemResourceAsStream("eml/4037_014.jpg")); - assertThat(attachmentMapper.getAttachment(attachments.get(1).getAttachmentId()).getStream()) - .hasSameContentAs(ClassLoader.getSystemResourceAsStream("eml/4037_015.jpg")); + ImmutableList<byte[]> attachmentContents = attachments + .stream() + .map(MessageAttachment::getAttachmentId) + .map(Throwing.function(attachmentMapper::getAttachment)) + .map(Attachment::getBytes) + .collect(ImmutableList.toImmutableList()); + + ImmutableList<byte[]> files = Stream.of("eml/4037_014.jpg", "eml/4037_015.jpg") + .map(ClassLoader::getSystemResourceAsStream) + .map(Throwing.function(IOUtils::toByteArray)) + .collect(ImmutableList.toImmutableList()); + + assertThat(attachmentContents) + .containsExactlyInAnyOrder(files.get(0), files.get(1)); } @Test http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/container/util/pom.xml ---------------------------------------------------------------------- diff --git a/server/container/util/pom.xml b/server/container/util/pom.xml index fa76097..85c3a6e 100644 --- a/server/container/util/pom.xml +++ b/server/container/util/pom.xml @@ -116,6 +116,12 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> + <dependency> + <groupId>backport-util-concurrent</groupId> + <artifactId>backport-util-concurrent</artifactId> + <version>3.1</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java index bd1d177..830b01c 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java @@ -112,7 +112,7 @@ public class CassandraDomainList extends AbstractDomainList { public void addDomain(Domain domain) throws DomainListException { boolean executed = executor.executeReturnApplied(insertStatement.bind() .setString(DOMAIN, domain.asString())) - .join(); + .block(); if (!executed) { throw new DomainListException(domain.name() + " already exists."); } @@ -122,7 +122,7 @@ public class CassandraDomainList extends AbstractDomainList { public void removeDomain(Domain domain) throws DomainListException { boolean executed = executor.executeReturnApplied(removeStatement.bind() .setString(DOMAIN, domain.asString())) - .join(); + .block(); if (!executed) { throw new DomainListException(domain.name() + " was not found"); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java index 47c598d..10049e1 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java @@ -32,8 +32,6 @@ import static org.apache.james.sieve.cassandra.tables.CassandraSieveActiveTable. import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.Date; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; import javax.inject.Inject; @@ -44,6 +42,7 @@ import org.apache.james.sieverepository.api.ScriptName; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; +import reactor.core.publisher.Mono; public class CassandraActiveScriptDAO { private final CassandraAsyncExecutor cassandraAsyncExecutor; @@ -66,23 +65,23 @@ public class CassandraActiveScriptDAO { .where(eq(USER_NAME, bindMarker(USER_NAME)))); } - public CompletableFuture<Optional<ActiveScriptInfo>> getActiveSctiptInfo(User user) { - return cassandraAsyncExecutor.executeSingleRow( + public Mono<ActiveScriptInfo> getActiveSctiptInfo(User user) { + return cassandraAsyncExecutor.executeSingleRowReactor( selectActiveName.bind() .setString(USER_NAME, user.asString())) - .thenApply(rowOptional -> rowOptional.map(row -> new ActiveScriptInfo( + .map(row -> new ActiveScriptInfo( new ScriptName(row.getString(SCRIPT_NAME)), - ZonedDateTime.ofInstant(row.getTimestamp(DATE).toInstant(), ZoneOffset.UTC)))); + ZonedDateTime.ofInstant(row.getTimestamp(DATE).toInstant(), ZoneOffset.UTC))); } - public CompletableFuture<Void> unactivate(User user) { - return cassandraAsyncExecutor.executeVoid( + public Mono<Void> unactivate(User user) { + return cassandraAsyncExecutor.executeVoidReactor( deleteActive.bind() .setString(USER_NAME, user.asString())); } - public CompletableFuture<Void> activate(User user, ScriptName scriptName) { - return cassandraAsyncExecutor.executeVoid( + public Mono<Void> activate(User user, ScriptName scriptName) { + return cassandraAsyncExecutor.executeVoidReactor( insertActive.bind() .setString(USER_NAME, user.asString()) .setString(SCRIPT_NAME, scriptName.getValue()) http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java index 429ca4b..d3cf741 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java @@ -34,7 +34,6 @@ import static org.apache.james.sieve.cassandra.tables.CassandraSieveTable.TABLE_ import static org.apache.james.sieve.cassandra.tables.CassandraSieveTable.USER_NAME; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.inject.Inject; @@ -50,6 +49,7 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.Select; import com.github.steveash.guavate.Guavate; +import reactor.core.publisher.Mono; public class CassandraSieveDAO { @@ -98,8 +98,8 @@ public class CassandraSieveDAO { .where(eq(USER_NAME, bindMarker(USER_NAME))); } - public CompletableFuture<Void> insertScript(User user, Script script) { - return cassandraAsyncExecutor.executeVoid( + public Mono<Void> insertScript(User user, Script script) { + return cassandraAsyncExecutor.executeVoidReactor( insertScriptStatement.bind() .setString(USER_NAME, user.asString()) .setString(SCRIPT_NAME, script.getName().getValue()) @@ -120,7 +120,7 @@ public class CassandraSieveDAO { .collect(Guavate.toImmutableList())); } - public CompletableFuture<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) { + public Mono<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) { return cassandraAsyncExecutor.executeReturnApplied( updateScriptActivationStatement.bind() .setString(USER_NAME, user.asString()) @@ -128,24 +128,24 @@ public class CassandraSieveDAO { .setBool(IS_ACTIVE, active)); } - public CompletableFuture<Optional<Script>> getScript(User user, ScriptName name) { - return getScriptRow(user, name).thenApply(opt -> opt.map(row -> Script.builder() + public Mono<Script> getScript(User user, ScriptName name) { + return getScriptRow(user, name).map(row -> Script.builder() .content(row.getString(SCRIPT_CONTENT)) .isActive(row.getBool(IS_ACTIVE)) .name(name) .size(row.getLong(SIZE)) - .build())); + .build()); } - public CompletableFuture<Boolean> deleteScriptInCassandra(User user, ScriptName name) { + public Mono<Boolean> deleteScriptInCassandra(User user, ScriptName name) { return cassandraAsyncExecutor.executeReturnApplied( deleteScriptStatement.bind() .setString(USER_NAME, user.asString()) .setString(SCRIPT_NAME, name.getValue())); } - private CompletableFuture<Optional<Row>> getScriptRow(User user, ScriptName name) { - return cassandraAsyncExecutor.executeSingleRow( + private Mono<Row> getScriptRow(User user, ScriptName name) { + return cassandraAsyncExecutor.executeSingleRowReactor( selectScriptStatement.bind() .setString(USER_NAME, user.asString()) .setString(SCRIPT_NAME, name.getValue())); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java index b20ea30..1e99f1a 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java @@ -41,6 +41,7 @@ import org.apache.james.sieve.cassandra.tables.CassandraSieveSpaceTable; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; +import reactor.core.publisher.Mono; public class CassandraSieveQuotaDAO { @@ -107,8 +108,8 @@ public class CassandraSieveQuotaDAO { .orElse(0L)); } - public CompletableFuture<Void> updateSpaceUsed(User user, long spaceUsed) { - return cassandraAsyncExecutor.executeVoid( + public Mono<Void> updateSpaceUsed(User user, long spaceUsed) { + return cassandraAsyncExecutor.executeVoidReactor( updateSpaceUsedStatement.bind() .setLong(CassandraSieveSpaceTable.SPACE_USED, spaceUsed) .setString(CassandraSieveSpaceTable.USER_NAME, user.asString())); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java index fa52b4a..7f4b559 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java @@ -25,6 +25,7 @@ import java.time.ZonedDateTime; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import javax.inject.Inject; @@ -43,7 +44,11 @@ import org.apache.james.sieverepository.api.exception.IsActiveException; import org.apache.james.sieverepository.api.exception.QuotaExceededException; import org.apache.james.sieverepository.api.exception.QuotaNotFoundException; import org.apache.james.sieverepository.api.exception.ScriptNotFoundException; -import org.apache.james.util.CompletableFutureUtil; + +import org.apache.james.util.FunctionalUtils; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CassandraSieveRepository implements SieveRepository { @@ -60,7 +65,8 @@ public class CassandraSieveRepository implements SieveRepository { @Override public ZonedDateTime getActivationDateForActiveScript(User user) throws ScriptNotFoundException { - return cassandraActiveScriptDAO.getActiveSctiptInfo(user).join() + return cassandraActiveScriptDAO.getActiveSctiptInfo(user) + .blockOptional() .orElseThrow(ScriptNotFoundException::new) .getActivationDate(); } @@ -70,19 +76,20 @@ public class CassandraSieveRepository implements SieveRepository { throwOnOverQuota(user, spaceThatWillBeUsedByNewScript(user, name, newSize)); } - private void throwOnOverQuota(User user, CompletableFuture<Long> sizeDifference) throws QuotaExceededException { + private void throwOnOverQuota(User user, Mono<Long> sizeDifference) throws QuotaExceededException { CompletableFuture<Optional<QuotaSize>> userQuotaFuture = cassandraSieveQuotaDAO.getQuota(user); CompletableFuture<Optional<QuotaSize>> globalQuotaFuture = cassandraSieveQuotaDAO.getQuota(); CompletableFuture<Long> spaceUsedFuture = cassandraSieveQuotaDAO.spaceUsedBy(user); new SieveQuota(spaceUsedFuture.join(), limitToUse(userQuotaFuture, globalQuotaFuture)) - .checkOverQuotaUponModification(sizeDifference.join()); + .checkOverQuotaUponModification(sizeDifference.block()); } - private CompletableFuture<Long> spaceThatWillBeUsedByNewScript(User user, ScriptName name, long scriptSize) { + private Mono<Long> spaceThatWillBeUsedByNewScript(User user, ScriptName name, long scriptSize) { return cassandraSieveDAO.getScript(user, name) - .thenApply(optional -> optional.map(Script::getSize).orElse(0L)) - .thenApply(sizeOfStoredScript -> scriptSize - sizeOfStoredScript); + .map(Script::getSize) + .switchIfEmpty(Mono.just(0L)) + .map(sizeOfStoredScript -> scriptSize - sizeOfStoredScript); } private Optional<QuotaSize> limitToUse(CompletableFuture<Optional<QuotaSize>> userQuota, CompletableFuture<Optional<QuotaSize>> globalQuota) { @@ -94,23 +101,24 @@ public class CassandraSieveRepository implements SieveRepository { @Override public void putScript(User user, ScriptName name, ScriptContent content) throws QuotaExceededException { - CompletableFuture<Long> spaceUsed = spaceThatWillBeUsedByNewScript(user, name, content.length()); + Mono<Long> spaceUsed = spaceThatWillBeUsedByNewScript(user, name, content.length()); throwOnOverQuota(user, spaceUsed); - CompletableFuture.allOf( - updateSpaceUsed(user, spaceUsed.join()), + Flux.merge( + updateSpaceUsed(user, spaceUsed.block()), cassandraSieveDAO.insertScript(user, Script.builder() .name(name) .content(content) .isActive(false) .build())) - .join(); + .then() + .block(); } - private CompletableFuture<Void> updateSpaceUsed(User user, long spaceUsed) { + private Mono<Void> updateSpaceUsed(User user, long spaceUsed) { if (spaceUsed == 0) { - return CompletableFuture.completedFuture(null); + return Mono.empty(); } return cassandraSieveQuotaDAO.updateSpaceUsed(user, spaceUsed); } @@ -124,10 +132,8 @@ public class CassandraSieveRepository implements SieveRepository { public InputStream getActive(User user) throws ScriptNotFoundException { return IOUtils.toInputStream( cassandraActiveScriptDAO.getActiveSctiptInfo(user) - .thenCompose(optionalActiveName -> optionalActiveName - .map(activeScriptInfo -> cassandraSieveDAO.getScript(user, activeScriptInfo.getName())) - .orElse(CompletableFuture.completedFuture(Optional.empty()))) - .join() + .flatMap(activeScriptInfo -> cassandraSieveDAO.getScript(user, activeScriptInfo.getName())) + .blockOptional() .orElseThrow(ScriptNotFoundException::new) .getContent() .getValue(), StandardCharsets.UTF_8); @@ -135,36 +141,33 @@ public class CassandraSieveRepository implements SieveRepository { @Override public void setActive(User user, ScriptName name) throws ScriptNotFoundException { - CompletableFuture<Boolean> activateNewScript = + Mono<Boolean> activateNewScript = unactivateOldScript(user) - .thenCompose(any -> updateScriptActivation(user, name, true) - .thenCompose(CompletableFutureUtil.composeIfTrue( - () -> cassandraActiveScriptDAO.activate(user, name)))); + .then(updateScriptActivation(user, name, true)) + .filter(FunctionalUtils.toPredicate(Function.identity())) + .flatMap(any -> cassandraActiveScriptDAO.activate(user, name).thenReturn(any)); - if (!activateNewScript.join()) { + if (!activateNewScript.blockOptional().isPresent()) { throw new ScriptNotFoundException(); } } - private CompletableFuture<Void> unactivateOldScript(User user) { + private Mono<Void> unactivateOldScript(User user) { return cassandraActiveScriptDAO.getActiveSctiptInfo(user) - .thenCompose(scriptNameOptional -> scriptNameOptional - .map(activeScriptInfo -> updateScriptActivation(user, activeScriptInfo.getName(), false) - .<Void>thenApply(any -> null)) - .orElse(CompletableFuture.completedFuture(null))); + .flatMap(activeScriptInfo -> updateScriptActivation(user, activeScriptInfo.getName(), false).then()); } - private CompletableFuture<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) { + private Mono<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) { if (!scriptName.equals(SieveRepository.NO_SCRIPT_NAME)) { return cassandraSieveDAO.updateScriptActivation(user, scriptName, active); } - return cassandraActiveScriptDAO.unactivate(user).thenApply(any -> true); + return cassandraActiveScriptDAO.unactivate(user).thenReturn(true); } @Override public InputStream getScript(User user, ScriptName name) throws ScriptNotFoundException { return cassandraSieveDAO.getScript(user, name) - .join() + .blockOptional() .map(script -> IOUtils.toInputStream(script.getContent().getValue(), StandardCharsets.UTF_8)) .orElseThrow(ScriptNotFoundException::new); } @@ -172,13 +175,13 @@ public class CassandraSieveRepository implements SieveRepository { @Override public void deleteScript(User user, ScriptName name) throws ScriptNotFoundException, IsActiveException { ensureIsNotActive(user, name); - if (!cassandraSieveDAO.deleteScriptInCassandra(user, name).join()) { + if (!cassandraSieveDAO.deleteScriptInCassandra(user, name).switchIfEmpty(Mono.just(false)).block()) { throw new ScriptNotFoundException(); } } private void ensureIsNotActive(User user, ScriptName name) throws IsActiveException { - Optional<ScriptName> activeName = cassandraActiveScriptDAO.getActiveSctiptInfo(user).join().map(ActiveScriptInfo::getName); + Optional<ScriptName> activeName = cassandraActiveScriptDAO.getActiveSctiptInfo(user).blockOptional().map(ActiveScriptInfo::getName); if (activeName.isPresent() && name.equals(activeName.get())) { throw new IsActiveException(); } @@ -186,22 +189,21 @@ public class CassandraSieveRepository implements SieveRepository { @Override public void renameScript(User user, ScriptName oldName, ScriptName newName) throws ScriptNotFoundException, DuplicateException { - CompletableFuture<Boolean> scriptExistsFuture = cassandraSieveDAO.getScript(user, newName) - .thenApply(Optional::isPresent); - CompletableFuture<Optional<Script>> oldScriptFuture = cassandraSieveDAO.getScript(user, oldName); + Mono<Script> oldScript = cassandraSieveDAO.getScript(user, oldName).cache(); + Mono<Boolean> newScriptExists = cassandraSieveDAO.getScript(user, newName).hasElement(); - oldScriptFuture.join(); - if (scriptExistsFuture.join()) { + oldScript.block(); + if (newScriptExists.block()) { throw new DuplicateException(); } performScriptRename(user, newName, - oldScriptFuture.join().orElseThrow(ScriptNotFoundException::new)); + oldScript.blockOptional().orElseThrow(ScriptNotFoundException::new)); } private void performScriptRename(User user, ScriptName newName, Script oldScript) { - CompletableFuture.allOf( + Flux.merge( cassandraSieveDAO.insertScript(user, Script.builder() .copyOf(oldScript) @@ -209,15 +211,14 @@ public class CassandraSieveRepository implements SieveRepository { .build()), cassandraSieveDAO.deleteScriptInCassandra(user, oldScript.getName()), performActiveScriptRename(user, oldScript.getName(), newName)) - .join(); + .then() + .block(); } - private CompletableFuture<Void> performActiveScriptRename(User user, ScriptName oldName, ScriptName newName) { + private Mono<Void> performActiveScriptRename(User user, ScriptName oldName, ScriptName newName) { return cassandraActiveScriptDAO.getActiveSctiptInfo(user) - .thenCompose(optionalActivationInfo -> optionalActivationInfo - .filter(activeScriptInfo -> activeScriptInfo.getName().equals(oldName)) - .map(name -> cassandraActiveScriptDAO.activate(user, newName)) - .orElse(CompletableFuture.completedFuture(null))); + .filter(activeScriptInfo -> activeScriptInfo.getName().equals(oldName)) + .flatMap(name -> cassandraActiveScriptDAO.activate(user, newName)); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java index 45504ab..0d6eee8 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java @@ -137,7 +137,7 @@ public class CassandraUsersRepository extends AbstractUsersRepository { .setString(PASSWORD, defaultUser.getHashedPassword()) .setString(ALGORITHM, defaultUser.getHashAlgorithm()) .setString(NAME, defaultUser.getUserName().toLowerCase(Locale.US))) - .join(); + .block(); if (!executed) { throw new UsersRepositoryException("Unable to update user"); @@ -149,7 +149,7 @@ public class CassandraUsersRepository extends AbstractUsersRepository { boolean executed = executor.executeReturnApplied( removeUserStatement.bind() .setString(NAME, name)) - .join(); + .block(); if (!executed) { throw new UsersRepositoryException("unable to remove unknown user " + name); @@ -202,7 +202,7 @@ public class CassandraUsersRepository extends AbstractUsersRepository { .setString(REALNAME, user.getUserName()) .setString(PASSWORD, user.getHashedPassword()) .setString(ALGORITHM, user.getHashAlgorithm())) - .join(); + .block(); if (!executed) { throw new AlreadyExistInUsersRepositoryException("User with username " + username + " already exist!"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
