This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9bdf1bc1d3365320fc970aef6a9bc46516bf16c5 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Thu Jan 16 16:20:20 2020 +0700 [REFACTORING] Reactor: Use handle instead of flatMap for mono synchronous transformations --- .../backends/cassandra/utils/CassandraAsyncExecutor.java | 2 +- .../mailbox/cassandra/mail/CassandraMessageMapper.java | 7 +++---- .../mailbox/cassandra/mail/CassandraModSeqProvider.java | 16 ++++++++-------- .../mailbox/cassandra/mail/CassandraUidProvider.java | 12 ++++++------ .../apache/james/blob/cassandra/CassandraBlobStore.java | 2 +- .../cassandra/CassandraMailRepository.java | 5 +++-- .../distributed/RabbitMQTerminationSubscriber.java | 1 - 7 files changed, 22 insertions(+), 23 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java index 1b6464c..3654ebe 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java @@ -59,7 +59,7 @@ public class CassandraAsyncExecutor { public Mono<Row> executeSingleRow(Statement statement) { return executeSingleRowOptional(statement) - .flatMap(Mono::justOrEmpty); + .handle((t, sink) -> t.ifPresent(sink::next)); } public Flux<Row> executeRows(Statement statement) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index ed93cc2..695c38f 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -240,7 +240,7 @@ public class CassandraMessageMapper implements MessageMapper { return messageIdDAO.retrieve(mailboxId, uid) .doOnNext(optional -> OptionalUtils.executeIfEmpty(optional, () -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid))) - .flatMap(Mono::justOrEmpty); + .handle((t, sink) -> t.ifPresent(sink::next)); } @Override @@ -325,9 +325,8 @@ public class CassandraMessageMapper implements MessageMapper { private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) { if (!failed.isEmpty()) { Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed) - .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid) - .flatMap(Mono::justOrEmpty) - ); + .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid)) + .handle((t, sink) -> t.ifPresent(sink::next)); return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator); } else { return Mono.empty(); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java index e5d7290..2199f74 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java @@ -44,7 +44,6 @@ import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.store.mail.ModSeqProvider; -import org.apache.james.util.FunctionalUtils; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PreparedStatement; @@ -157,7 +156,7 @@ public class CassandraModSeqProvider implements ModSeqProvider { insert.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid()) .setLong(NEXT_MODSEQ, nextModSeq.asLong())) - .flatMap(success -> successToModSeq(nextModSeq, success)); + .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next)); } private Mono<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) { @@ -167,13 +166,14 @@ public class CassandraModSeqProvider implements ModSeqProvider { .setUUID(MAILBOX_ID, mailboxId.asUuid()) .setLong(NEXT_MODSEQ, nextModSeq.asLong()) .setLong(MOD_SEQ_CONDITION, modSeq.asLong())) - .flatMap(success -> successToModSeq(nextModSeq, success)); + .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next)); } - private Mono<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) { - return Mono.just(success) - .filter(FunctionalUtils.identityPredicate()) - .map(any -> modSeq); + private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) { + if (success) { + return Optional.of(modSeq); + } + return Optional.empty(); } public Mono<ModSeq> nextModSeq(CassandraId mailboxId) { @@ -193,7 +193,7 @@ public class CassandraModSeqProvider implements ModSeqProvider { private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) { return Mono.defer(() -> findHighestModSeq(mailboxId) - .flatMap(Mono::justOrEmpty) + .<ModSeq>handle((t, sink) -> t.ifPresent(sink::next)) .flatMap(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq))); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java index 276ba2b..a5d9968 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java @@ -111,7 +111,7 @@ public class CassandraUidProvider implements UidProvider { } @Override - public Optional<MessageUid> lastUid(Mailbox mailbox) throws MailboxException { + public Optional<MessageUid> lastUid(Mailbox mailbox) { return findHighestUid((CassandraId) mailbox.getMailboxId()) .blockOptional(); } @@ -131,21 +131,21 @@ public class CassandraUidProvider implements UidProvider { .setUUID(MAILBOX_ID, mailboxId.asUuid()) .setLong(CONDITION, uid.asLong()) .setLong(NEXT_UID, nextUid.asLong())) - .flatMap(success -> successToUid(nextUid, success))); + .handle((success, sink) -> successToUid(nextUid, success).ifPresent(sink::next))); } private Mono<MessageUid> tryInsert(CassandraId mailboxId) { return Mono.defer(() -> executor.executeReturnApplied( insertStatement.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid())) - .flatMap(success -> successToUid(MessageUid.MIN_VALUE, success))); + .handle((success, sink) -> successToUid(MessageUid.MIN_VALUE, success).ifPresent(sink::next))); } - private Mono<MessageUid> successToUid(MessageUid uid, Boolean success) { + private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) { if (success) { - return Mono.just(uid); + return Optional.of(uid); } - return Mono.empty(); + return Optional.empty(); } } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java index 1993f84..854bf44 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java @@ -96,7 +96,7 @@ public class CassandraBlobStore implements BlobStore { .flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), pair.getValue()) .then(Mono.just(getChunkNum(pair)))) .collect(Collectors.maxBy(Comparator.comparingInt(x -> x))) - .flatMap(Mono::justOrEmpty) + .<Integer>handle((t, sink) -> t.ifPresent(sink::next)) .map(this::numToCount) .defaultIfEmpty(0); } diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java index bbaf01f..cf98198 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java @@ -32,6 +32,7 @@ import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepository; import org.apache.james.mailrepository.api.MailRepositoryUrl; +import org.apache.james.mailrepository.cassandra.CassandraMailRepositoryMailDaoAPI.MailDTO; import org.apache.mailet.Mail; import reactor.core.publisher.Flux; @@ -92,13 +93,13 @@ public class CassandraMailRepository implements MailRepository { @Override public Mail retrieve(MailKey key) { return mailDAO.read(url, key) - .flatMap(Mono::justOrEmpty) + .<MailDTO>handle((t, sink) -> t.ifPresent(sink::next)) .flatMap(this::toMail) .blockOptional() .orElse(null); } - private Mono<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) { + private Mono<Mail> toMail(MailDTO mailDTO) { MimeMessagePartsId parts = MimeMessagePartsId.builder() .headerBlobId(mailDTO.getHeaderBlobId()) .bodyBlobId(mailDTO.getBodyBlobId()) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index 8f4b2b7..a949af5 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -43,7 +43,6 @@ import com.rabbitmq.client.Delivery; import reactor.core.Disposable; import reactor.core.publisher.DirectProcessor; -import reactor.core.publisher.Mono; import reactor.core.publisher.UnicastProcessor; import reactor.core.scheduler.Schedulers; import reactor.rabbitmq.BindingSpecification; --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org