This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e3c0b3fd96e127bfb74fbe00c6cbac25d1d36856 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Fri Dec 13 14:47:34 2019 +0100 [Refactoring] fix Reactor Intellij inspections --- .../cassandra/mail/CassandraMessageIdMapper.java | 24 ++++++++++-------- .../cassandra/mail/CassandraMessageMapper.java | 29 +++++++++++++++------- .../james/blob/cassandra/CassandraBlobStore.java | 5 ++-- .../memory/vacation/MemoryVacationRepository.java | 6 ++--- 4 files changed, 38 insertions(+), 26 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 1b15350..3a206c3 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -258,22 +258,24 @@ public class CassandraMessageIdMapper implements MessageIdMapper { private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { CassandraId cassandraId = (CassandraId) mailboxId; - ComposedMessageIdWithMetaData oldComposedId = imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId)) - .next() - .blockOptional() - .orElseThrow(MailboxDeleteDuringUpdateException::new); + return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId)) + .single() + .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new)) + .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId)); + } + private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) { Flags newFlags = new FlagsUpdateCalculator(newState, updateMode).buildNewFlags(oldComposedId.getFlags()); if (identicalFlags(oldComposedId, newFlags)) { return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId)); + } else { + return Mono + .fromCallable(() -> new ComposedMessageIdWithMetaData( + oldComposedId.getComposedMessageId(), + newFlags, + modSeqProvider.nextModSeq(cassandraId))) + .flatMap(newComposedId -> updateFlags(oldComposedId, newComposedId)); } - - ComposedMessageIdWithMetaData newComposedId = new ComposedMessageIdWithMetaData( - oldComposedId.getComposedMessageId(), - newFlags, - modSeqProvider.nextModSeq(cassandraId)); - - return updateFlags(oldComposedId, newComposedId); } private boolean identicalFlags(ComposedMessageIdWithMetaData oldComposedId, Flags newFlags) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index ebc29a7..ed93cc2 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -274,15 +274,26 @@ public class CassandraMessageMapper implements MessageMapper { } private MailboxMessage addUidAndModseq(MailboxMessage message, CassandraId mailboxId) throws MailboxException { - final Mono<MessageUid> messageUidMono = uidProvider.nextUid(mailboxId).cache(); - final Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId).cache(); - Flux.merge(messageUidMono, nextModSeqMono).then(); - - message.setUid(messageUidMono.blockOptional() - .orElseThrow(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId))); - message.setModSeq(nextModSeqMono.blockOptional() - .orElseThrow(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId))); - + Mono<MessageUid> messageUidMono = uidProvider + .nextUid(mailboxId) + .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId))); + + Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId) + .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId))); + + try { + Mono.zip(messageUidMono, nextModSeqMono) + .doOnNext(tuple -> { + message.setUid(tuple.getT1()); + message.setModSeq(tuple.getT2()); + }) + .block(); + } catch (RuntimeException e) { + if (e.getCause() instanceof MailboxException) { + throw (MailboxException)e.getCause(); + } + throw e; + } return message; } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java index cd179f4..1993f84 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java @@ -128,13 +128,12 @@ public class CassandraBlobStore implements BlobStore { } private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) { - Integer rowCount = selectRowCount(bucketName, blobId) + return selectRowCount(bucketName, blobId) .publishOn(Schedulers.elastic()) .single() .onErrorResume(NoSuchElementException.class, e -> Mono.error( new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)))) - .block(); - return Flux.range(0, rowCount) + .flatMapMany(rowCount -> Flux.range(0, rowCount)) .publishOn(Schedulers.elastic(), PREFETCH) .flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex) .single() diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java index 5e3b085..5f06c24 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java @@ -48,9 +48,9 @@ public class MemoryVacationRepository implements VacationRepository { public Mono<Void> modifyVacation(AccountId accountId, VacationPatch vacationPatch) { Preconditions.checkNotNull(accountId); Preconditions.checkNotNull(vacationPatch); - Vacation oldVacation = retrieveVacation(accountId).block(); - vacationMap.put(accountId, vacationPatch.patch(oldVacation)); - return Mono.empty(); + return retrieveVacation(accountId) + .doOnNext(oldVacation -> vacationMap.put(accountId, vacationPatch.patch(oldVacation))) + .then(); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org