http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index 23297ee..dab5496 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -46,7 +46,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -76,8 +75,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty; -import org.apache.james.util.CompletableFutureUtil; -import org.apache.james.util.FluentFutureStream; import org.apache.james.util.streams.JamesCollectors; import org.apache.james.util.streams.Limit; @@ -95,6 +92,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Bytes; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CassandraMessageDAO { public static final long DEFAULT_LONG_VALUE = 0L; @@ -234,42 +233,36 @@ public class CassandraMessageDAO { .collect(Guavate.toImmutableList()); } - public CompletableFuture<Stream<MessageResult>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { - return CompletableFutureUtil.chainAll( - limit.applyOnStream(messageIds.stream().distinct()) - .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())), - ids -> rowToMessages(fetchType, ids)) - .thenApply(stream -> stream.flatMap(Function.identity())); + public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { + return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()) + .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize()))) + .flatMap(ids -> rowToMessages(fetchType, ids)); } - private CompletableFuture<Stream<MessageResult>> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData> ids) { - return FluentFutureStream.of( - ids.stream() - .map(id -> retrieveRow(id, fetchType) - .thenCompose((ResultSet resultSet) -> message(resultSet, id, fetchType)))) - .completableFuture(); + private Flux<MessageResult> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData> ids) { + return Flux.fromIterable(ids) + .flatMap(id -> retrieveRow(id, fetchType) + .flatMap(resultSet -> message(resultSet, id, fetchType))); } - private CompletableFuture<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) { + private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) { CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId(); - return cassandraAsyncExecutor.execute(retrieveSelect(fetchType) + return cassandraAsyncExecutor.executeReactor(retrieveSelect(fetchType) .bind() .setUUID(MESSAGE_ID, cassandraMessageId.get())); } - private CompletableFuture<MessageResult> + private Mono<MessageResult> message(ResultSet rows,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) { ComposedMessageId messageId = messageIdWithMetaData.getComposedMessageId(); if (rows.isExhausted()) { - return CompletableFuture.completedFuture(notFound(messageIdWithMetaData)); + return Mono.just(notFound(messageIdWithMetaData)); } Row row = rows.one(); - CompletableFuture<byte[]> contentFuture = buildContentRetriever(fetchType).apply(row); - - return contentFuture.thenApply(content -> { + return buildContentRetriever(fetchType, row).map(content -> { MessageWithoutAttachment messageWithoutAttachment = new MessageWithoutAttachment( messageId.getMessageId(), @@ -341,37 +334,37 @@ public class CassandraMessageDAO { .setUUID(MESSAGE_ID, messageId.get())); } - private Function<Row, CompletableFuture<byte[]>> buildContentRetriever(FetchType fetchType) { + private Mono<byte[]> buildContentRetriever(FetchType fetchType, Row row) { switch (fetchType) { case Full: - return this::getFullContent; + return getFullContent(row); case Headers: - return this::getHeaderContent; + return getHeaderContent(row); case Body: - return row -> getBodyContent(row) - .thenApply(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data)); + return getBodyContent(row) + .map(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data)); case Metadata: - return row -> CompletableFuture.completedFuture(EMPTY_BYTE_ARRAY); + return Mono.just(EMPTY_BYTE_ARRAY); default: throw new RuntimeException("Unknown FetchType " + fetchType); } } - private CompletableFuture<byte[]> getFullContent(Row row) { + private Mono<byte[]> getFullContent(Row row) { return getHeaderContent(row) - .thenCombine(getBodyContent(row), Bytes::concat); + .zipWith(getBodyContent(row), Bytes::concat); } - private CompletableFuture<byte[]> getBodyContent(Row row) { + private Mono<byte[]> getBodyContent(Row row) { return getFieldContent(BODY_CONTENT, row); } - private CompletableFuture<byte[]> getHeaderContent(Row row) { + private Mono<byte[]> getHeaderContent(Row row) { return getFieldContent(HEADER_CONTENT, row); } - private CompletableFuture<byte[]> getFieldContent(String field, Row row) { - return blobStore.readBytes(blobIdFactory.from(row.getString(field))); + private Mono<byte[]> getFieldContent(String field, Row row) { + return Mono.fromFuture(blobStore.readBytes(blobIdFactory.from(row.getString(field)))); } public static MessageResult notFound(ComposedMessageIdWithMetaData id) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java index 51365e6..fe93143 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java @@ -67,6 +67,7 @@ import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Mono; public class CassandraMessageIdDAO { @@ -169,8 +170,8 @@ public class CassandraMessageIdDAO { .and(lte(IMAP_UID, bindMarker(IMAP_UID_LTE)))); } - public CompletableFuture<Void> delete(CassandraId mailboxId, MessageUid uid) { - return cassandraAsyncExecutor.executeVoid(delete.bind() + public Mono<Void> delete(CassandraId mailboxId, MessageUid uid) { + return cassandraAsyncExecutor.executeVoidReactor(delete.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid()) .setLong(IMAP_UID, uid.asLong())); } @@ -193,10 +194,10 @@ public class CassandraMessageIdDAO { .setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags()))); } - public CompletableFuture<Void> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { + public Mono<Void> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId(); Flags flags = composedMessageIdWithMetaData.getFlags(); - return cassandraAsyncExecutor.executeVoid(update.bind() + return cassandraAsyncExecutor.executeVoidReactor(update.bind() .setLong(MOD_SEQ, composedMessageIdWithMetaData.getModSeq()) .setBool(ANSWERED, flags.contains(Flag.ANSWERED)) .setBool(DELETED, flags.contains(Flag.DELETED)) http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 880ac16..0a7c0bc 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -24,14 +24,14 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.stream.Stream; import javax.mail.Flags; import org.apache.commons.lang3.tuple.Pair; + import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; -import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; -import org.apache.james.backends.cassandra.utils.LightweightTransactionException; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageManager; import org.apache.james.mailbox.cassandra.ids.CassandraId; @@ -49,7 +49,8 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; import org.apache.james.mailbox.store.mail.ModSeqProvider; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; -import org.apache.james.util.FluentFutureStream; +import org.apache.james.util.FunctionalUtils; +import org.apache.james.util.ReactorUtils; import org.apache.james.util.streams.JamesCollectors; import org.apache.james.util.streams.Limit; import org.slf4j.Logger; @@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory; import com.github.steveash.guavate.Guavate; import com.google.common.collect.Multimap; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class CassandraMessageIdMapper implements MessageIdMapper { @@ -99,47 +101,37 @@ public class CassandraMessageIdMapper implements MessageIdMapper { } private Stream<SimpleMailboxMessage> findAsStream(Collection<MessageId> messageIds, FetchType fetchType) { - return FluentFutureStream.of( - messageIds.stream() - .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())), - FluentFutureStream::unboxStream) - .collect(Guavate.toImmutableList()) - .thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited())) - .thenApply(stream -> stream + return Flux.fromStream(messageIds.stream()) + .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())) + .collectList() + .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited())) .filter(CassandraMessageDAO.MessageResult::isFound) - .map(CassandraMessageDAO.MessageResult::message)) - .thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType)) - .thenCompose(this::filterMessagesWithExistingMailbox) - .join() - .sorted(Comparator.comparing(MailboxMessage::getUid)); - } - - private CompletableFuture<Stream<SimpleMailboxMessage>> filterMessagesWithExistingMailbox(Stream<SimpleMailboxMessage> stream) { - return FluentFutureStream.of(stream.map(this::keepMessageIfMailboxExists), FluentFutureStream::unboxOptional) - .completableFuture(); + .map(CassandraMessageDAO.MessageResult::message) + .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType)) + .flatMap(this::keepMessageIfMailboxExists) + .collectSortedList(Comparator.comparing(MailboxMessage::getUid)) + .block() + .stream(); } - private CompletableFuture<Optional<SimpleMailboxMessage>> keepMessageIfMailboxExists(SimpleMailboxMessage message) { + private Mono<SimpleMailboxMessage> keepMessageIfMailboxExists(SimpleMailboxMessage message) { CassandraId cassandraId = (CassandraId) message.getMailboxId(); return mailboxDAO.retrieveMailbox(cassandraId) - .thenApply(optional -> { - if (!optional.isPresent()) { + .map(any -> message) + .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> { LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.", cassandraId, message.getMailboxId()); - return Optional.empty(); - } - - return Optional.of(message); - }); + })); } @Override public List<MailboxId> findMailboxes(MessageId messageId) { - return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()).join() + return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()) .map(ComposedMessageIdWithMetaData::getComposedMessageId) .map(ComposedMessageId::getMailboxId) - .collect(Guavate.toImmutableList()); + .collectList() + .block(); } @Override @@ -182,114 +174,115 @@ public class CassandraMessageIdMapper implements MessageIdMapper { @Override public void delete(MessageId messageId, Collection<MailboxId> mailboxIds) { - deleteAsFuture(messageId, mailboxIds).join(); + deleteAsMono(messageId, mailboxIds).block(); } - public CompletableFuture<Void> deleteAsFuture(MessageId messageId, Collection<MailboxId> mailboxIds) { + public Mono<Void> deleteAsMono(MessageId messageId, Collection<MailboxId> mailboxIds) { CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId; - return mailboxIds.stream() - .map(mailboxId -> retrieveAndDeleteIndices(cassandraMessageId, Optional.of((CassandraId) mailboxId))) - .reduce((f1, f2) -> CompletableFuture.allOf(f1, f2)) - .orElse(CompletableFuture.completedFuture(null)); + return Flux.fromStream(mailboxIds.stream()) + .flatMap(mailboxId -> retrieveAndDeleteIndices(cassandraMessageId, Optional.of((CassandraId) mailboxId))) + .then(); } @Override public void delete(Multimap<MessageId, MailboxId> ids) { - ids.asMap() - .entrySet() - .stream() - .collect(JamesCollectors.chunker(cassandraConfiguration.getExpungeChunkSize())) - .forEach(chunk -> - FluentFutureStream.of(chunk.stream() - .map(entry -> deleteAsFuture(entry.getKey(), entry.getValue()))) - .join()); + Flux.fromIterable(ids.asMap() + .entrySet()) + .limitRate(cassandraConfiguration.getExpungeChunkSize()) + .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue())) + .then() + .block(); } - private CompletableFuture<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) { + private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) { return imapUidDAO.retrieve(messageId, mailboxId) - .thenCompose(composedMessageIds -> composedMessageIds - .map(this::deleteIds) - .reduce((f1, f2) -> CompletableFuture.allOf(f1, f2)) - .orElse(CompletableFuture.completedFuture(null))); + .flatMap(this::deleteIds) + .then(); } @Override public void delete(MessageId messageId) { CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId; retrieveAndDeleteIndices(cassandraMessageId, Optional.empty()) - .join(); + .block(); } - private CompletableFuture<Void> deleteIds(ComposedMessageIdWithMetaData metaData) { + private Mono<Void> deleteIds(ComposedMessageIdWithMetaData metaData) { CassandraMessageId messageId = (CassandraMessageId) metaData.getComposedMessageId().getMessageId(); CassandraId mailboxId = (CassandraId) metaData.getComposedMessageId().getMailboxId(); - return CompletableFuture.allOf( - imapUidDAO.delete(messageId, mailboxId), - messageIdDAO.delete(mailboxId, metaData.getComposedMessageId().getUid())) - .thenCompose(voidValue -> indexTableHandler.updateIndexOnDelete(metaData, mailboxId).toFuture()); + return Flux.merge( + imapUidDAO.delete(messageId, mailboxId), + messageIdDAO.delete(mailboxId, metaData.getComposedMessageId().getUid())) + .then(indexTableHandler.updateIndexOnDelete(metaData, mailboxId)); } @Override public Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { - return mailboxIds.stream() + return Flux.fromIterable(mailboxIds) .distinct() .map(mailboxId -> (CassandraId) mailboxId) - .filter(mailboxId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(mailboxId)) - .join() - .findAny() - .isPresent()) + .filter(mailboxId -> haveMetaData(messageId, mailboxId)) .flatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId)) - .map(this::updateCounts) - .map(Mono::block) - .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight)); + .flatMap(this::updateCounts) + .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight)) + .block(); } - private Stream<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { + private boolean haveMetaData(MessageId messageId, CassandraId mailboxId) { + return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(mailboxId)) + .hasElements() + .block(); + } + + private Mono<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { try { - Pair<Flags, ComposedMessageIdWithMetaData> pair = new FunctionRunnerWithRetry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry()) - .executeAndRetrieveObject(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId)); - ComposedMessageIdWithMetaData composedMessageIdWithMetaData = pair.getRight(); - Flags oldFlags = pair.getLeft(); - return Stream.of(Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(), - UpdatedFlags.builder() - .uid(composedMessageIdWithMetaData.getComposedMessageId().getUid()) - .modSeq(composedMessageIdWithMetaData.getModSeq()) - .oldFlags(oldFlags) - .newFlags(composedMessageIdWithMetaData.getFlags()) - .build())); - } catch (LightweightTransactionException e) { - throw new RuntimeException(e); + return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId)) + .single() + .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry()) + .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft())); } catch (MailboxDeleteDuringUpdateException e) { LOGGER.info("Mailbox {} was deleted during flag update", mailboxId); - return Stream.of(); + return Mono.empty(); } } + private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) { + return Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(), + UpdatedFlags.builder() + .uid(composedMessageIdWithMetaData.getComposedMessageId().getUid()) + .modSeq(composedMessageIdWithMetaData.getModSeq()) + .oldFlags(oldFlags) + .newFlags(composedMessageIdWithMetaData.getFlags()) + .build()); + } + private Mono<Pair<MailboxId, UpdatedFlags>> updateCounts(Pair<MailboxId, UpdatedFlags> pair) { CassandraId cassandraId = (CassandraId) pair.getLeft(); return indexTableHandler.updateIndexOnFlagsUpdate(cassandraId, pair.getRight()) - .then(Mono.just(pair)); + .thenReturn(pair); } - private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { + private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { try { return updateFlags(mailboxId, messageId, newState, updateMode); } catch (MailboxException e) { LOGGER.error("Error while updating flags on mailbox: {}", mailboxId); - return Optional.empty(); + return Mono.empty(); } } - private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { + private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { CassandraId cassandraId = (CassandraId) mailboxId; ComposedMessageIdWithMetaData oldComposedId = imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId)) - .join() - .findFirst() + .next() + .blockOptional() .orElseThrow(MailboxDeleteDuringUpdateException::new); + Flags newFlags = new FlagsUpdateCalculator(newState, updateMode).buildNewFlags(oldComposedId.getFlags()); if (identicalFlags(oldComposedId, newFlags)) { - return Optional.of(Pair.of(oldComposedId.getFlags(), oldComposedId)); + return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId)); } + ComposedMessageIdWithMetaData newComposedId = new ComposedMessageIdWithMetaData( oldComposedId.getComposedMessageId(), newFlags, @@ -302,15 +295,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper { return oldComposedId.getFlags().equals(newFlags); } - private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(ComposedMessageIdWithMetaData oldComposedId, ComposedMessageIdWithMetaData newComposedId) { + private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(ComposedMessageIdWithMetaData oldComposedId, ComposedMessageIdWithMetaData newComposedId) { return imapUidDAO.updateMetadata(newComposedId, oldComposedId.getModSeq()) - .thenCompose(updateSuccess -> Optional.of(updateSuccess) - .filter(b -> b) - .map((Boolean any) -> messageIdDAO.updateMetadata(newComposedId).thenApply(v -> updateSuccess)) - .orElse(CompletableFuture.completedFuture(updateSuccess))) - .thenApply(success -> Optional.of(success) - .filter(b -> b) - .map(any -> Pair.of(oldComposedId.getFlags(), newComposedId))) - .join(); + .filter(FunctionalUtils.toPredicate(Function.identity())) + .flatMap(any -> messageIdDAO.updateMetadata(newComposedId) + .thenReturn(Pair.of(oldComposedId.getFlags(), newComposedId))); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java index 169e32d..be5686a 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java @@ -42,7 +42,6 @@ import static org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.TABLE_ import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; import javax.inject.Inject; import javax.mail.Flags; @@ -64,6 +63,8 @@ import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CassandraMessageIdToImapUidDAO { @@ -148,8 +149,8 @@ public class CassandraMessageIdToImapUidDAO { .and(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); } - public CompletableFuture<Void> delete(CassandraMessageId messageId, CassandraId mailboxId) { - return cassandraAsyncExecutor.executeVoid(delete.bind() + public Mono<Void> delete(CassandraMessageId messageId, CassandraId mailboxId) { + return cassandraAsyncExecutor.executeVoidReactor(delete.bind() .setUUID(MESSAGE_ID, messageId.get()) .setUUID(MAILBOX_ID, mailboxId.asUuid())); } @@ -172,7 +173,7 @@ public class CassandraMessageIdToImapUidDAO { .setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags()))); } - public CompletableFuture<Boolean> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, long oldModSeq) { + public Mono<Boolean> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, long oldModSeq) { ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId(); Flags flags = composedMessageIdWithMetaData.getFlags(); return cassandraAsyncExecutor.executeReturnApplied(update.bind() @@ -191,10 +192,10 @@ public class CassandraMessageIdToImapUidDAO { .setLong(MOD_SEQ_CONDITION, oldModSeq)); } - public CompletableFuture<Stream<ComposedMessageIdWithMetaData>> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId) { + public Flux<ComposedMessageIdWithMetaData> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId) { return selectStatement(messageId, mailboxId) - .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet) - .map(this::toComposedMessageIdWithMetadata)); + .flatMapMany(cassandraUtils::convertToFlux) + .map(this::toComposedMessageIdWithMetadata); } private ComposedMessageIdWithMetaData toComposedMessageIdWithMetadata(Row row) { @@ -208,12 +209,12 @@ public class CassandraMessageIdToImapUidDAO { .build(); } - private CompletableFuture<ResultSet> selectStatement(CassandraMessageId messageId, Optional<CassandraId> mailboxId) { + private Mono<ResultSet> selectStatement(CassandraMessageId messageId, Optional<CassandraId> mailboxId) { return mailboxId - .map(cassandraId -> cassandraAsyncExecutor.execute(select.bind() + .map(cassandraId -> cassandraAsyncExecutor.executeReactor(select.bind() .setUUID(MESSAGE_ID, messageId.get()) .setUUID(MAILBOX_ID, cassandraId.asUuid()))) - .orElseGet(() -> cassandraAsyncExecutor.execute(selectAll.bind() + .orElseGet(() -> cassandraAsyncExecutor.executeReactor(selectAll.bind() .setUUID(MESSAGE_ID, messageId.get()))); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index f2b468f..d2c4006 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -24,7 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import javax.mail.Flags; import javax.mail.Flags.Flag; @@ -50,6 +50,7 @@ import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; import org.apache.james.util.OptionalUtils; +import org.apache.james.util.ReactorUtils; import org.apache.james.util.streams.Limit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,8 +154,8 @@ public class CassandraMessageMapper implements MessageMapper { CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId(); MessageUid uid = composedMessageId.getUid(); return Flux.merge( - Mono.fromCompletionStage(imapUidDAO.delete(messageId, mailboxId)), - Mono.fromCompletionStage(messageIdDAO.delete(mailboxId, uid))) + imapUidDAO.delete(messageId, mailboxId), + messageIdDAO.delete(mailboxId, uid)) .then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId)); } @@ -162,9 +163,9 @@ public class CassandraMessageMapper implements MessageMapper { public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return retrieveMessages(retrieveMessageIds(mailboxId, messageRange), ftype, Limit.from(max)) - .map(SimpleMailboxMessage -> (MailboxMessage) SimpleMailboxMessage) + .map(simpleMailboxMessage -> (MailboxMessage) simpleMailboxMessage) .collectSortedList(Comparator.comparing(MailboxMessage::getUid)) - .flatMapMany(Flux::fromIterable) + .flatMapIterable(Function.identity()) .toIterable() .iterator(); } @@ -176,12 +177,10 @@ public class CassandraMessageMapper implements MessageMapper { } private Flux<SimpleMailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { - return Mono.fromCompletionStage(messageDAO.retrieveMessages(messageIds, fetchType, limit)) - .map(stream -> stream - .filter(CassandraMessageDAO.MessageResult::isFound) - .map(CassandraMessageDAO.MessageResult::message)) - .flatMap(stream -> Mono.fromCompletionStage(attachmentLoader.addAttachmentToMessages(stream, fetchType))) - .flatMapMany(Flux::fromStream); + return messageDAO.retrieveMessages(messageIds, fetchType, limit) + .filter(CassandraMessageDAO.MessageResult::isFound) + .map(CassandraMessageDAO.MessageResult::message) + .flatMap(stream -> attachmentLoader.addAttachmentToMessage(stream, fetchType)); } @Override @@ -196,9 +195,7 @@ public class CassandraMessageMapper implements MessageMapper { public MessageUid findFirstUnseenMessageUid(Mailbox mailbox) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return firstUnseenDAO.retrieveFirstUnread(mailboxId) - .map(Optional::of) - .defaultIfEmpty(Optional.empty()) - .block() + .blockOptional() .orElse(null); } @@ -216,9 +213,8 @@ public class CassandraMessageMapper implements MessageMapper { private Flux<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, MessageUid messageUid) { return retrieveComposedId(mailboxId, messageUid) .flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata)) - .flatMap(idWithMetadata -> - Mono.fromCompletionStage(messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))) - .flatMapMany(Flux::fromStream) + .flatMapMany(idWithMetadata -> + messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())) .filter(CassandraMessageDAO.MessageResult::isFound) .map(CassandraMessageDAO.MessageResult::message) .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of())); @@ -262,13 +258,13 @@ public class CassandraMessageMapper implements MessageMapper { } private MailboxMessage addUidAndModseq(MailboxMessage message, CassandraId mailboxId) throws MailboxException { - CompletableFuture<Optional<MessageUid>> uidFuture = uidProvider.nextUid(mailboxId); - CompletableFuture<Optional<Long>> modseqFuture = modSeqProvider.nextModSeq(mailboxId); - CompletableFuture.allOf(uidFuture, modseqFuture).join(); + final Mono<MessageUid> messageUidMono = uidProvider.nextUid(mailboxId).cache(); + final Mono<Long> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId).cache(); + Flux.merge(messageUidMono, nextModSeqMono).then(); - message.setUid(uidFuture.join() + message.setUid(messageUidMono.blockOptional() .orElseThrow(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId))); - message.setModSeq(modseqFuture.join() + message.setModSeq(nextModSeqMono.blockOptional() .orElseThrow(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId))); return message; @@ -323,8 +319,8 @@ public class CassandraMessageMapper implements MessageMapper { } private Mono<Long> computeNewModSeq(CassandraId mailboxId) { - return Mono.fromCompletionStage(modSeqProvider.nextModSeq(mailboxId)) - .map(value -> value.orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid()))); + return modSeqProvider.nextModSeq(mailboxId) + .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid()))); } private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) { @@ -429,11 +425,10 @@ public class CassandraMessageMapper implements MessageMapper { .modSeq(newModSeq) .flags(newFlags) .build(); - return Mono.fromCompletionStage(imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq())) + return imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq()) .flatMap(success -> { if (success) { - return Mono.fromCompletionStage(messageIdDAO.updateMetadata(newMetadata)) - .then(Mono.just(true)); + return messageIdDAO.updateMetadata(newMetadata).thenReturn(true); } else { return Mono.just(false); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java index 63e045b..6f798c5 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java @@ -29,30 +29,33 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTab import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.NEXT_MODSEQ; import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME; +import java.time.Duration; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.function.Function; import java.util.function.Supplier; import javax.inject.Inject; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; -import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.store.mail.ModSeqProvider; import org.apache.james.mailbox.store.mail.model.Mailbox; +import org.apache.james.util.FunctionalUtils; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import reactor.core.publisher.Mono; public class CassandraModSeqProvider implements ModSeqProvider { public static final String MOD_SEQ_CONDITION = "modSeqCondition"; + private final long maxModSeqRetries; public static class ExceptionRelay extends RuntimeException { private final MailboxException underlying; @@ -81,7 +84,6 @@ public class CassandraModSeqProvider implements ModSeqProvider { private static final ModSeq FIRST_MODSEQ = new ModSeq(0); private final CassandraAsyncExecutor cassandraAsyncExecutor; - private final FunctionRunnerWithRetry runner; private final PreparedStatement select; private final PreparedStatement update; private final PreparedStatement insert; @@ -89,17 +91,12 @@ public class CassandraModSeqProvider implements ModSeqProvider { @Inject public CassandraModSeqProvider(Session session, CassandraConfiguration cassandraConfiguration) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); - this.runner = new FunctionRunnerWithRetry(cassandraConfiguration.getModSeqMaxRetry()); + this.maxModSeqRetries = cassandraConfiguration.getModSeqMaxRetry(); this.insert = prepareInsert(session); this.update = prepareUpdate(session); this.select = prepareSelect(session); } - @VisibleForTesting - public CassandraModSeqProvider(Session session) { - this(session, CassandraConfiguration.DEFAULT_CONFIGURATION); - } - private PreparedStatement prepareInsert(Session session) { return session.prepare(insertInto(TABLE_NAME) .value(NEXT_MODSEQ, bindMarker(NEXT_MODSEQ)) @@ -125,81 +122,79 @@ public class CassandraModSeqProvider implements ModSeqProvider { @Override public long nextModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return nextModSeq(mailboxId).join() + return nextModSeq(mailboxId) + .blockOptional() .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId)); } @Override public long nextModSeq(MailboxSession session, MailboxId mailboxId) throws MailboxException { return nextModSeq((CassandraId) mailboxId) - .join() + .blockOptional() .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId)); } @Override public long highestModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException { - return unbox(() -> findHighestModSeq((CassandraId) mailbox.getMailboxId()).join().getValue()); + return highestModSeq(mailboxSession, mailbox.getMailboxId()); } @Override public long highestModSeq(MailboxSession mailboxSession, MailboxId mailboxId) throws MailboxException { - return unbox(() -> findHighestModSeq((CassandraId) mailboxId).join().getValue()); + return unbox(() -> findHighestModSeq((CassandraId) mailboxId).block().orElse(FIRST_MODSEQ).getValue()); } - private CompletableFuture<ModSeq> findHighestModSeq(CassandraId mailboxId) { - return cassandraAsyncExecutor.executeSingleRow( + private Mono<Optional<ModSeq>> findHighestModSeq(CassandraId mailboxId) { + return cassandraAsyncExecutor.executeSingleRowOptionalReactor( select.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid())) - .thenApply(optional -> optional.map(row -> new ModSeq(row.getLong(NEXT_MODSEQ))) - .orElse(FIRST_MODSEQ)); + .map(maybeRow -> maybeRow.map(row -> new ModSeq(row.getLong(NEXT_MODSEQ)))); } - private CompletableFuture<Optional<ModSeq>> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) { + private Mono<ModSeq> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) { ModSeq nextModSeq = modSeq.next(); return cassandraAsyncExecutor.executeReturnApplied( insert.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid()) .setLong(NEXT_MODSEQ, nextModSeq.getValue())) - .thenApply(success -> successToModSeq(nextModSeq, success)); + .flatMap(success -> successToModSeq(nextModSeq, success)); } - private CompletableFuture<Optional<ModSeq>> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) { + private Mono<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) { ModSeq nextModSeq = modSeq.next(); return cassandraAsyncExecutor.executeReturnApplied( update.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid()) .setLong(NEXT_MODSEQ, nextModSeq.getValue()) .setLong(MOD_SEQ_CONDITION, modSeq.getValue())) - .thenApply(success -> successToModSeq(nextModSeq, success)); + .flatMap(success -> successToModSeq(nextModSeq, success)); } - private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) { - if (success) { - return Optional.of(modSeq); - } - return Optional.empty(); + private Mono<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) { + return Mono.just(success) + .filter(FunctionalUtils.toPredicate(Function.identity())) + .map(any -> modSeq); } - - public CompletableFuture<Optional<Long>> nextModSeq(CassandraId mailboxId) { + + public Mono<Long> nextModSeq(CassandraId mailboxId) { return findHighestModSeq(mailboxId) - .thenCompose(modSeq -> { - if (modSeq.isFirst()) { - return tryInsertModSeq(mailboxId, FIRST_MODSEQ); - } - return tryUpdateModSeq(mailboxId, modSeq); - }).thenCompose(firstInsert -> { - if (firstInsert.isPresent()) { - return CompletableFuture.completedFuture(firstInsert); - } - return handleRetries(mailboxId); - }) - .thenApply(optional -> optional.map(ModSeq::getValue)); - } - - private CompletableFuture<Optional<ModSeq>> handleRetries(CassandraId mailboxId) { - return runner.executeAsyncAndRetrieveObject( - () -> findHighestModSeq(mailboxId) - .thenCompose(newModSeq -> tryUpdateModSeq(mailboxId, newModSeq))); + .flatMap(maybeHighestModSeq -> maybeHighestModSeq + .map(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq)) + .orElseGet(() -> tryInsertModSeq(mailboxId, FIRST_MODSEQ))) + .switchIfEmpty(handleRetries(mailboxId)) + .map(ModSeq::getValue); + } + + private Mono<ModSeq> handleRetries(CassandraId mailboxId) { + return tryFindThenUpdateOnce(mailboxId) + .single() + .retryBackoff(maxModSeqRetries, Duration.ofMillis(2)); + } + + private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) { + return Mono.defer(() -> findHighestModSeq(mailboxId) + .flatMap(Mono::justOrEmpty) + .flatMap(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq))); } private static class ModSeq { @@ -216,9 +211,12 @@ public class CassandraModSeqProvider implements ModSeqProvider { public long getValue() { return value; } - - public boolean isFirst() { - return value == FIRST_MODSEQ.value; + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("value", value) + .toString(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java index b402bf8..bd3baac 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java @@ -30,13 +30,11 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable. import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.TABLE_NAME; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import javax.inject.Inject; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; -import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.cassandra.ids.CassandraId; @@ -47,13 +45,13 @@ import org.apache.james.mailbox.store.mail.model.Mailbox; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; -import com.google.common.annotations.VisibleForTesting; +import reactor.core.publisher.Mono; public class CassandraUidProvider implements UidProvider { private static final String CONDITION = "Condition"; private final CassandraAsyncExecutor executor; - private final FunctionRunnerWithRetry runner; + private final long maxUidRetries; private final PreparedStatement insertStatement; private final PreparedStatement updateStatement; private final PreparedStatement selectStatement; @@ -61,17 +59,12 @@ public class CassandraUidProvider implements UidProvider { @Inject public CassandraUidProvider(Session session, CassandraConfiguration cassandraConfiguration) { this.executor = new CassandraAsyncExecutor(session); - this.runner = new FunctionRunnerWithRetry(cassandraConfiguration.getUidMaxRetry()); + this.maxUidRetries = cassandraConfiguration.getUidMaxRetry(); this.selectStatement = prepareSelect(session); this.updateStatement = prepareUpdate(session); this.insertStatement = prepareInsert(session); } - @VisibleForTesting - public CassandraUidProvider(Session session) { - this(session, CassandraConfiguration.DEFAULT_CONFIGURATION); - } - private PreparedStatement prepareSelect(Session session) { return session.prepare(select(NEXT_UID) .from(TABLE_NAME) @@ -101,66 +94,56 @@ public class CassandraUidProvider implements UidProvider { public MessageUid nextUid(MailboxSession session, MailboxId mailboxId) throws MailboxException { CassandraId cassandraId = (CassandraId) mailboxId; return nextUid(cassandraId) - .join() - .orElseThrow(() -> new MailboxException("Error during Uid update")); + .blockOptional() + .orElseThrow(() -> new MailboxException("Error during Uid update")); } - public CompletableFuture<Optional<MessageUid>> nextUid(CassandraId cassandraId) { - return findHighestUid(cassandraId) - .thenCompose(optional -> { - if (optional.isPresent()) { - return tryUpdateUid(cassandraId, optional); - } - return tryInsert(cassandraId); - }) - .thenCompose(optional -> { - if (optional.isPresent()) { - return CompletableFuture.completedFuture(optional); - } - return runner.executeAsyncAndRetrieveObject( - () -> findHighestUid(cassandraId) - .thenCompose(readUid -> tryUpdateUid(cassandraId, readUid))); - }); + public Mono<MessageUid> nextUid(CassandraId cassandraId) { + Mono<MessageUid> updateUid = findHighestUid(cassandraId) + .flatMap(messageUid -> tryUpdateUid(cassandraId, messageUid)); + + return updateUid + .switchIfEmpty(tryInsert(cassandraId)) + .switchIfEmpty(updateUid) + .single() + .retry(maxUidRetries); } @Override public Optional<MessageUid> lastUid(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException { - return findHighestUid((CassandraId) mailbox.getMailboxId()).join(); + return findHighestUid((CassandraId) mailbox.getMailboxId()) + .blockOptional(); } - private CompletableFuture<Optional<MessageUid>> findHighestUid(CassandraId mailboxId) { - return executor.executeSingleRow( + private Mono<MessageUid> findHighestUid(CassandraId mailboxId) { + return Mono.defer(() -> executor.executeSingleRowReactor( selectStatement.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid())) - .thenApply(optional -> optional.map(row -> MessageUid.of(row.getLong(NEXT_UID)))); + .map(row -> MessageUid.of(row.getLong(NEXT_UID)))); } - private CompletableFuture<Optional<MessageUid>> tryUpdateUid(CassandraId mailboxId, Optional<MessageUid> uid) { - if (uid.isPresent()) { - MessageUid nextUid = uid.get().next(); - return executor.executeReturnApplied( + private Mono<MessageUid> tryUpdateUid(CassandraId mailboxId, MessageUid uid) { + MessageUid nextUid = uid.next(); + return Mono.defer(() -> executor.executeReturnApplied( updateStatement.bind() - .setUUID(MAILBOX_ID, mailboxId.asUuid()) - .setLong(CONDITION, uid.get().asLong()) - .setLong(NEXT_UID, nextUid.asLong())) - .thenApply(success -> successToUid(nextUid, success)); - } else { - return tryInsert(mailboxId); - } + .setUUID(MAILBOX_ID, mailboxId.asUuid()) + .setLong(CONDITION, uid.asLong()) + .setLong(NEXT_UID, nextUid.asLong())) + .flatMap(success -> successToUid(nextUid, success))); } - private CompletableFuture<Optional<MessageUid>> tryInsert(CassandraId mailboxId) { - return executor.executeReturnApplied( + private Mono<MessageUid> tryInsert(CassandraId mailboxId) { + return Mono.defer(() -> executor.executeReturnApplied( insertStatement.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid())) - .thenApply(success -> successToUid(MessageUid.MIN_VALUE, success)); + .flatMap(success -> successToUid(MessageUid.MIN_VALUE, success))); } - private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) { + private Mono<MessageUid> successToUid(MessageUid uid, Boolean success) { if (success) { - return Optional.of(uid); + return Mono.just(uid); } - return Optional.empty(); + return Mono.empty(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java index 7f93150..4bfeafb 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java @@ -27,10 +27,8 @@ import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRight import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRightsTable.TABLE_NAME; import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRightsTable.USER_NAME; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import java.util.stream.Stream; import javax.inject.Inject; @@ -44,13 +42,14 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.exception.UnsupportedRightException; import org.apache.james.mailbox.model.MailboxACL; import org.apache.james.mailbox.model.MailboxACL.Rfc4314Rights; -import org.apache.james.util.FluentFutureStream; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.github.fge.lambdas.Throwing; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CassandraUserMailboxRightsDAO { @@ -98,31 +97,32 @@ public class CassandraUserMailboxRightsDAO { .where(eq(USER_NAME, bindMarker(USER_NAME)))); } - public CompletableFuture<Void> update(CassandraId cassandraId, ACLDiff aclDiff) { + public Mono<Void> update(CassandraId cassandraId, ACLDiff aclDiff) { PositiveUserACLDiff userACLDiff = new PositiveUserACLDiff(aclDiff); - return CompletableFuture.allOf( + return Flux.merge( addAll(cassandraId, userACLDiff.addedEntries()), removeAll(cassandraId, userACLDiff.removedEntries()), - addAll(cassandraId, userACLDiff.changedEntries())); + addAll(cassandraId, userACLDiff.changedEntries())) + .then(); } - private CompletableFuture<Stream<Void>> removeAll(CassandraId cassandraId, Stream<MailboxACL.Entry> removedEntries) { - return FluentFutureStream.of(removedEntries - .map(entry -> cassandraAsyncExecutor.executeVoid( + private Mono<Void> removeAll(CassandraId cassandraId, Stream<MailboxACL.Entry> removedEntries) { + return Flux.fromStream(removedEntries) + .flatMap(entry -> cassandraAsyncExecutor.executeVoidReactor( delete.bind() .setString(USER_NAME, entry.getKey().getName()) - .setUUID(MAILBOX_ID, cassandraId.asUuid())))) - .completableFuture(); + .setUUID(MAILBOX_ID, cassandraId.asUuid()))) + .then(); } - private CompletableFuture<Stream<Void>> addAll(CassandraId cassandraId, Stream<MailboxACL.Entry> addedEntries) { - return FluentFutureStream.of(addedEntries - .map(entry -> cassandraAsyncExecutor.executeVoid( + private Mono<Void> addAll(CassandraId cassandraId, Stream<MailboxACL.Entry> addedEntries) { + return Flux.fromStream(addedEntries) + .flatMap(entry -> cassandraAsyncExecutor.executeVoidReactor( insert.bind() .setString(USER_NAME, entry.getKey().getName()) .setUUID(MAILBOX_ID, cassandraId.asUuid()) - .setString(RIGHTS, entry.getValue().serialize())))) - .completableFuture(); + .setString(RIGHTS, entry.getValue().serialize()))) + .then(); } public CompletableFuture<Optional<Rfc4314Rights>> retrieve(String userName, CassandraId mailboxId) { @@ -134,14 +134,12 @@ public class CassandraUserMailboxRightsDAO { rowOptional.map(Throwing.function(row -> Rfc4314Rights.fromSerializedRfc4314Rights(row.getString(RIGHTS))))); } - public CompletableFuture<Map<CassandraId, Rfc4314Rights>> listRightsForUser(String userName) { - return cassandraAsyncExecutor.execute( + public Flux<Pair<CassandraId, Rfc4314Rights>> listRightsForUser(String userName) { + return cassandraAsyncExecutor.executeReactor( selectUser.bind() .setString(USER_NAME, userName)) - .thenApply(cassandraUtils::convertToStream) - .thenApply(row -> - row.map(Throwing.function(this::toPair)) - .collect(Collectors.toMap(Pair::getLeft, Pair::getRight))); + .flatMapMany(cassandraUtils::convertToFlux) + .map(Throwing.function(this::toPair)); } private Pair<CassandraId, Rfc4314Rights> toPair(Row row) throws UnsupportedRightException { http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java index d43c8a6..2ab5e56 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java @@ -25,10 +25,13 @@ import org.apache.james.backends.cassandra.migration.Migration; import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMessageIdDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO.MessageIdAttachmentIds; +import org.apache.james.mailbox.model.MessageId; import org.apache.james.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + public class AttachmentMessageIdCreation implements Migration { private static final Logger LOGGER = LoggerFactory.getLogger(AttachmentMessageIdCreation.class); private final CassandraMessageDAO cassandraMessageDAO; @@ -56,10 +59,11 @@ public class AttachmentMessageIdCreation implements Migration { private Result createIndex(MessageIdAttachmentIds message) { try { - message.getAttachmentId() - .forEach(attachmentId -> attachmentMessageIdDAO - .storeAttachmentForMessageId(attachmentId, message.getMessageId()) - .join()); + MessageId messageId = message.getMessageId(); + Flux.fromIterable(message.getAttachmentId()) + .flatMap(attachmentId -> attachmentMessageIdDAO.storeAttachmentForMessageId(attachmentId, messageId)) + .then() + .block(); return Result.COMPLETED; } catch (Exception e) { LOGGER.error("Error while creation attachmentId -> messageIds index", e); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java index 148d395..bebf83d 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java @@ -61,7 +61,7 @@ public class AttachmentV2Migration implements Migration { try { blobStore.save(attachment.getBytes()) .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)) - .thenCompose(attachmentDAOV2::storeAttachment) + .thenCompose(daoAttachement -> attachmentDAOV2.storeAttachment(daoAttachement).toFuture()) .thenCompose(any -> attachmentDAOV1.deleteAttachment(attachment.getAttachmentId())) .join(); return Result.COMPLETED; http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java index 70f4a8e..c4f3fd2 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java @@ -80,9 +80,9 @@ public class MailboxPathV2Migration implements Migration { public Result migrate(CassandraIdAndPath idAndPath) { try { - daoV2.save(idAndPath.getMailboxPath(), idAndPath.getCassandraId()).join(); + daoV2.save(idAndPath.getMailboxPath(), idAndPath.getCassandraId()).block(); - daoV1.delete(idAndPath.getMailboxPath()).join(); + daoV1.delete(idAndPath.getMailboxPath()).block(); return Result.COMPLETED; } catch (Exception e) { LOGGER.error("Error while performing migration for path {}", idAndPath.getMailboxPath(), e); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java index 1a4a254..0032ef6 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java @@ -63,7 +63,7 @@ public class MailboxMergingTaskRunner { return moveMessages(oldMailboxId, newMailboxId, mailboxSession, context) .onComplete( () -> mergeRights(oldMailboxId, newMailboxId), - () -> mailboxDAO.delete(oldMailboxId).join()); + () -> mailboxDAO.delete(oldMailboxId).block()); } private Task.Result moveMessages(CassandraId oldMailboxId, CassandraId newMailboxId, MailboxSession session, MailboxMergingTask.Context context) { @@ -88,12 +88,12 @@ public class MailboxMergingTaskRunner { private void mergeRights(CassandraId oldMailboxId, CassandraId newMailboxId) { try { - MailboxACL oldAcl = cassandraACLMapper.getACL(oldMailboxId).join(); - MailboxACL newAcl = cassandraACLMapper.getACL(newMailboxId).join(); + MailboxACL oldAcl = cassandraACLMapper.getACL(oldMailboxId).block(); + MailboxACL newAcl = cassandraACLMapper.getACL(newMailboxId).block(); MailboxACL finalAcl = newAcl.union(oldAcl); cassandraACLMapper.setACL(newMailboxId, finalAcl); - rightsDAO.update(oldMailboxId, ACLDiff.computeDiff(oldAcl, MailboxACL.EMPTY)).join(); + rightsDAO.update(oldMailboxId, ACLDiff.computeDiff(oldAcl, MailboxACL.EMPTY)).block(); } catch (MailboxException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java index d4b073d..4161e8d 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java @@ -23,21 +23,17 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.Collection; -import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; import org.apache.james.mailbox.model.Attachment; import org.apache.james.mailbox.model.AttachmentId; import org.apache.james.mailbox.model.Cid; import org.apache.james.mailbox.model.MessageAttachment; -import org.assertj.core.data.MapEntry; import org.junit.Before; import org.junit.Test; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Mono; public class AttachmentLoaderTest { @@ -53,15 +49,15 @@ public class AttachmentLoaderTest { @Test public void getAttachmentsShouldWorkWithDuplicatedAttachments() { AttachmentId attachmentId = AttachmentId.from("1"); - Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId); Attachment attachment = Attachment.builder() .attachmentId(attachmentId) .bytes("attachment".getBytes()) .type("type") .build(); - when(attachmentMapper.getAttachmentsAsFuture(attachmentIds)) - .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment))); + + when(attachmentMapper.getAttachmentsAsMono(attachmentId)) + .thenReturn(Mono.just(attachment)); Optional<String> name = Optional.of("name1"); Optional<Cid> cid = Optional.empty(); @@ -69,7 +65,7 @@ public class AttachmentLoaderTest { MessageAttachmentRepresentation attachmentRepresentation = new MessageAttachmentRepresentation(attachmentId, name, cid, isInlined); Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of(attachmentRepresentation, attachmentRepresentation)) - .join(); + .block(); MessageAttachment expectedAttachment = new MessageAttachment(attachment, name, cid, isInlined); assertThat(attachments).hasSize(2) @@ -79,15 +75,15 @@ public class AttachmentLoaderTest { @Test public void getAttachmentsShouldWorkWithDuplicatedIds() { AttachmentId attachmentId = AttachmentId.from("1"); - Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId); Attachment attachment = Attachment.builder() .attachmentId(attachmentId) .bytes("attachment".getBytes()) .type("type") .build(); - when(attachmentMapper.getAttachmentsAsFuture(attachmentIds)) - .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment))); + + when(attachmentMapper.getAttachmentsAsMono(attachmentId)) + .thenReturn(Mono.just(attachment)); Optional<String> name1 = Optional.of("name1"); Optional<String> name2 = Optional.of("name2"); @@ -97,7 +93,7 @@ public class AttachmentLoaderTest { MessageAttachmentRepresentation attachmentRepresentation2 = new MessageAttachmentRepresentation(attachmentId, name2, cid, isInlined); Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of(attachmentRepresentation1, attachmentRepresentation2)) - .join(); + .block(); assertThat(attachments).hasSize(2) .containsOnly(new MessageAttachment(attachment, name1, cid, isInlined), @@ -108,7 +104,6 @@ public class AttachmentLoaderTest { public void getAttachmentsShouldReturnMultipleAttachmentWhenSeveralAttachmentsRepresentation() { AttachmentId attachmentId1 = AttachmentId.from("1"); AttachmentId attachmentId2 = AttachmentId.from("2"); - Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId1, attachmentId2); Attachment attachment1 = Attachment.builder() .attachmentId(attachmentId1) @@ -120,8 +115,11 @@ public class AttachmentLoaderTest { .bytes("attachment2".getBytes()) .type("type") .build(); - when(attachmentMapper.getAttachmentsAsFuture(attachmentIds)) - .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment1, attachment2))); + + when(attachmentMapper.getAttachmentsAsMono(attachmentId1)) + .thenReturn(Mono.just(attachment1)); + when(attachmentMapper.getAttachmentsAsMono(attachmentId2)) + .thenReturn(Mono.just(attachment2)); Optional<String> name1 = Optional.of("name1"); Optional<String> name2 = Optional.of("name2"); @@ -131,7 +129,7 @@ public class AttachmentLoaderTest { MessageAttachmentRepresentation attachmentRepresentation2 = new MessageAttachmentRepresentation(attachmentId2, name2, cid, isInlined); Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of(attachmentRepresentation1, attachmentRepresentation2)) - .join(); + .block(); assertThat(attachments).hasSize(2) .containsOnly(new MessageAttachment(attachment1, name1, cid, isInlined), @@ -141,61 +139,19 @@ public class AttachmentLoaderTest { @Test public void getAttachmentsShouldReturnEmptyByDefault() { AttachmentId attachmentId = AttachmentId.from("1"); - Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId); Attachment attachment = Attachment.builder() .attachmentId(attachmentId) .bytes("attachment".getBytes()) .type("type") .build(); - when(attachmentMapper.getAttachmentsAsFuture(attachmentIds)) - .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment))); + + when(attachmentMapper.getAttachmentsAsMono(attachmentId)) + .thenReturn(Mono.just(attachment)); Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of()) - .join(); + .block(); assertThat(attachments).isEmpty(); } - - @Test - public void attachmentsByIdShouldReturnMapWhenExist() { - AttachmentId attachmentId = AttachmentId.from("1"); - AttachmentId attachmentId2 = AttachmentId.from("2"); - Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId, attachmentId2); - - Attachment attachment = Attachment.builder() - .attachmentId(attachmentId) - .bytes("attachment".getBytes()) - .type("type") - .build(); - Attachment attachment2 = Attachment.builder() - .attachmentId(attachmentId2) - .bytes("attachment2".getBytes()) - .type("type") - .build(); - when(attachmentMapper.getAttachmentsAsFuture(attachmentIds)) - .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment, attachment2))); - - Map<AttachmentId, Attachment> attachmentsById = testee.attachmentsById(attachmentIds) - .join(); - - assertThat(attachmentsById).hasSize(2) - .containsOnly(MapEntry.entry(attachmentId, attachment), MapEntry.entry(attachmentId2, attachment2)); - } - - @Test - public void attachmentsByIdShouldReturnEmptyMapWhenAttachmentsDontExists() { - AttachmentId attachmentId = AttachmentId.from("1"); - AttachmentId attachmentId2 = AttachmentId.from("2"); - Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId, attachmentId2); - - when(attachmentMapper.getAttachmentsAsFuture(attachmentIds)) - .thenReturn(CompletableFuture.completedFuture(ImmutableList.of())); - - Map<AttachmentId, Attachment> attachmentsById = testee.attachmentsById(attachmentIds) - .join(); - - assertThat(attachmentsById).hasSize(0); - } - } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java index d9f7c89..8b94cdb 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java @@ -78,12 +78,12 @@ class CassandraACLMapperTest { .value(CassandraACLTable.ACL, "{\"entries\":{\"bob\":invalid}}") .value(CassandraACLTable.VERSION, 1)); - assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY); + assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY); } @Test void retrieveACLWhenNoACLStoredShouldReturnEmptyACL() { - assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY); + assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY); } @Test @@ -94,7 +94,7 @@ class CassandraACLMapperTest { cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition()); - assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()) + assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()) .isEqualTo(new MailboxACL().union(key, rights)); } @@ -107,7 +107,7 @@ class CassandraACLMapperTest { MailboxACL.EntryKey keyAlice = new MailboxACL.EntryKey("alice", MailboxACL.NameType.user, false); cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(keyAlice).rights(rights).asAddition()); - assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()) + assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()) .isEqualTo(new MailboxACL().union(keyBob, rights).union(keyAlice, rights)); } @@ -119,7 +119,7 @@ class CassandraACLMapperTest { cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition()); cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asRemoval()); - assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY); + assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY); } @Test @@ -130,7 +130,7 @@ class CassandraACLMapperTest { cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition()); cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).noRights().asReplacement()); - assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY); + assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY); } @Test @@ -140,7 +140,7 @@ class CassandraACLMapperTest { cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asReplacement()); - assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(new MailboxACL().union(key, rights)); + assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(new MailboxACL().union(key, rights)); } @Test @@ -155,11 +155,11 @@ class CassandraACLMapperTest { cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition()); - assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(new MailboxACL().union(key, rights)); + assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(new MailboxACL().union(key, rights)); } @Test - void twoConcurrentUpdatesWhenNoACEStoredShouldReturnACEWithTwoEntries(CassandraCluster cassandra) throws Exception { + void twoConcurrentUpdatesWhenNoACLStoredShouldReturnACLWithTwoEntries(CassandraCluster cassandra) throws Exception { CountDownLatch countDownLatch = new CountDownLatch(2); MailboxACL.EntryKey keyBob = new MailboxACL.EntryKey("bob", MailboxACL.NameType.user, false); MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Read); @@ -168,12 +168,12 @@ class CassandraACLMapperTest { Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights, countDownLatch::countDown); awaitAll(future1, future2); - assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()) + assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()) .isEqualTo(new MailboxACL().union(keyBob, rights).union(keyAlice, rights)); } @Test - void twoConcurrentUpdatesWhenStoredShouldReturnACEWithTwoEntries(CassandraCluster cassandra) throws Exception { + void twoConcurrentUpdatesWhenStoredShouldReturnACLWithTwoEntries(CassandraCluster cassandra) throws Exception { CountDownLatch countDownLatch = new CountDownLatch(2); MailboxACL.EntryKey keyBenwa = new MailboxACL.EntryKey("benwa", MailboxACL.NameType.user, false); MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Read); @@ -185,7 +185,7 @@ class CassandraACLMapperTest { Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights, countDownLatch::countDown); awaitAll(future1, future2); - assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()) + assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()) .isEqualTo(new MailboxACL().union(keyBob, rights).union(keyAlice, rights).union(keyBenwa, rights)); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java index f82375e..6cd0b2f 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java @@ -55,7 +55,7 @@ class CassandraAttachmentDAOTest { @Test void getAttachmentShouldReturnEmptyWhenAbsent() { - Optional<Attachment> attachment = testee.getAttachment(ATTACHMENT_ID).join(); + Optional<Attachment> attachment = testee.getAttachment(ATTACHMENT_ID).blockOptional(); assertThat(attachment).isEmpty(); } @@ -98,7 +98,7 @@ class CassandraAttachmentDAOTest { .build(); testee.storeAttachment(attachment).join(); - Optional<Attachment> actual = testee.getAttachment(ATTACHMENT_ID).join(); + Optional<Attachment> actual = testee.getAttachment(ATTACHMENT_ID).blockOptional(); assertThat(actual).contains(attachment); } @@ -114,7 +114,7 @@ class CassandraAttachmentDAOTest { testee.deleteAttachment(attachment.getAttachmentId()).join(); - assertThat(testee.getAttachment(attachment.getAttachmentId()).join()) + assertThat(testee.getAttachment(attachment.getAttachmentId()).blockOptional()) .isEmpty(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java index 6e3b705..2bf90c7 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java @@ -52,7 +52,7 @@ class CassandraAttachmentDAOV2Test { @Test void getAttachmentShouldReturnEmptyWhenAbsent() { - Optional<DAOAttachment> attachment = testee.getAttachment(ATTACHMENT_ID).join(); + Optional<DAOAttachment> attachment = testee.getAttachment(ATTACHMENT_ID).blockOptional(); assertThat(attachment).isEmpty(); } @@ -66,9 +66,9 @@ class CassandraAttachmentDAOV2Test { .build(); BlobId blobId = BLOB_ID_FACTORY.from("blobId"); DAOAttachment daoAttachment = CassandraAttachmentDAOV2.from(attachment, blobId); - testee.storeAttachment(daoAttachment).join(); + testee.storeAttachment(daoAttachment).block(); - Optional<DAOAttachment> actual = testee.getAttachment(ATTACHMENT_ID).join(); + Optional<DAOAttachment> actual = testee.getAttachment(ATTACHMENT_ID).blockOptional(); assertThat(actual).contains(daoAttachment); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java index 862ea95..10492c6 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.nio.charset.StandardCharsets; +import java.util.List; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; @@ -100,7 +101,7 @@ class CassandraAttachmentFallbackTest { .build(); BlobId blobId = blobsDAO.save(attachment.getBytes()).join(); - attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).join(); + attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block(); attachmentDAO.storeAttachment(otherAttachment).join(); assertThat(attachmentMapper.getAttachment(ATTACHMENT_ID_1)) @@ -135,7 +136,7 @@ class CassandraAttachmentFallbackTest { .build(); BlobId blobId = blobsDAO.save(attachment.getBytes()).join(); - attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).join(); + attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block(); attachmentDAO.storeAttachment(otherAttachment).join(); assertThat(attachmentMapper.getAttachments(ImmutableList.of(ATTACHMENT_ID_1))) @@ -170,10 +171,11 @@ class CassandraAttachmentFallbackTest { .build(); BlobId blobId = blobsDAO.save(attachment.getBytes()).join(); - attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).join(); + attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block(); attachmentDAO.storeAttachment(otherAttachment).join(); - assertThat(attachmentMapper.getAttachments(ImmutableList.of(ATTACHMENT_ID_1, ATTACHMENT_ID_2))) - .containsExactly(attachment, otherAttachment); + List<Attachment> attachments = attachmentMapper.getAttachments(ImmutableList.of(ATTACHMENT_ID_1, ATTACHMENT_ID_2)); + assertThat(attachments) + .containsExactlyInAnyOrder(attachment, otherAttachment); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
