This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9bdf1bc1d3365320fc970aef6a9bc46516bf16c5
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Thu Jan 16 16:20:20 2020 +0700

    [REFACTORING] Reactor: Use handle instead of flatMap for mono synchronous 
transformations
---
 .../backends/cassandra/utils/CassandraAsyncExecutor.java |  2 +-
 .../mailbox/cassandra/mail/CassandraMessageMapper.java   |  7 +++----
 .../mailbox/cassandra/mail/CassandraModSeqProvider.java  | 16 ++++++++--------
 .../mailbox/cassandra/mail/CassandraUidProvider.java     | 12 ++++++------
 .../apache/james/blob/cassandra/CassandraBlobStore.java  |  2 +-
 .../cassandra/CassandraMailRepository.java               |  5 +++--
 .../distributed/RabbitMQTerminationSubscriber.java       |  1 -
 7 files changed, 22 insertions(+), 23 deletions(-)

diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 1b6464c..3654ebe 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -59,7 +59,7 @@ public class CassandraAsyncExecutor {
 
     public Mono<Row> executeSingleRow(Statement statement) {
         return executeSingleRowOptional(statement)
-                .flatMap(Mono::justOrEmpty);
+                .handle((t, sink) -> t.ifPresent(sink::next));
     }
 
     public Flux<Row> executeRows(Statement statement) {
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index ed93cc2..695c38f 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -240,7 +240,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
         return messageIdDAO.retrieve(mailboxId, uid)
             .doOnNext(optional -> OptionalUtils.executeIfEmpty(optional,
                 () -> LOGGER.warn("Could not retrieve message {} {}", 
mailboxId, uid)))
-            .flatMap(Mono::justOrEmpty);
+            .handle((t, sink) -> t.ifPresent(sink::next));
     }
 
     @Override
@@ -325,9 +325,8 @@ public class CassandraMessageMapper implements 
MessageMapper {
     private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId 
mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> 
failed) {
         if (!failed.isEmpty()) {
             Flux<ComposedMessageIdWithMetaData> toUpdate = 
Flux.fromIterable(failed)
-                .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid)
-                        .flatMap(Mono::justOrEmpty)
-                );
+                .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid))
+                .handle((t, sink) -> t.ifPresent(sink::next));
             return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
         } else {
             return Mono.empty();
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index e5d7290..2199f74 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -44,7 +44,6 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
-import org.apache.james.util.FunctionalUtils;
 
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.PreparedStatement;
@@ -157,7 +156,7 @@ public class CassandraModSeqProvider implements 
ModSeqProvider {
             insert.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(NEXT_MODSEQ, nextModSeq.asLong()))
-            .flatMap(success -> successToModSeq(nextModSeq, success));
+            .handle((success, sink) -> successToModSeq(nextModSeq, 
success).ifPresent(sink::next));
     }
 
     private Mono<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) 
{
@@ -167,13 +166,14 @@ public class CassandraModSeqProvider implements 
ModSeqProvider {
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(NEXT_MODSEQ, nextModSeq.asLong())
                 .setLong(MOD_SEQ_CONDITION, modSeq.asLong()))
-            .flatMap(success -> successToModSeq(nextModSeq, success));
+            .handle((success, sink) -> successToModSeq(nextModSeq, 
success).ifPresent(sink::next));
     }
 
-    private Mono<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
-        return Mono.just(success)
-            .filter(FunctionalUtils.identityPredicate())
-            .map(any -> modSeq);
+    private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
+        if (success) {
+            return Optional.of(modSeq);
+        }
+        return Optional.empty();
     }
 
     public Mono<ModSeq> nextModSeq(CassandraId mailboxId) {
@@ -193,7 +193,7 @@ public class CassandraModSeqProvider implements 
ModSeqProvider {
 
     private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) {
         return Mono.defer(() -> findHighestModSeq(mailboxId)
-            .flatMap(Mono::justOrEmpty)
+            .<ModSeq>handle((t, sink) -> t.ifPresent(sink::next))
             .flatMap(highestModSeq -> tryUpdateModSeq(mailboxId, 
highestModSeq)));
     }
 
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index 276ba2b..a5d9968 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -111,7 +111,7 @@ public class CassandraUidProvider implements UidProvider {
     }
 
     @Override
-    public Optional<MessageUid> lastUid(Mailbox mailbox) throws 
MailboxException {
+    public Optional<MessageUid> lastUid(Mailbox mailbox) {
         return findHighestUid((CassandraId) mailbox.getMailboxId())
                 .blockOptional();
     }
@@ -131,21 +131,21 @@ public class CassandraUidProvider implements UidProvider {
                         .setUUID(MAILBOX_ID, mailboxId.asUuid())
                         .setLong(CONDITION, uid.asLong())
                         .setLong(NEXT_UID, nextUid.asLong()))
-                .flatMap(success -> successToUid(nextUid, success)));
+                .handle((success, sink) -> successToUid(nextUid, 
success).ifPresent(sink::next)));
     }
 
     private Mono<MessageUid> tryInsert(CassandraId mailboxId) {
         return Mono.defer(() -> executor.executeReturnApplied(
             insertStatement.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid()))
-            .flatMap(success -> successToUid(MessageUid.MIN_VALUE, success)));
+            .handle((success, sink) -> successToUid(MessageUid.MIN_VALUE, 
success).ifPresent(sink::next)));
     }
 
-    private Mono<MessageUid> successToUid(MessageUid uid, Boolean success) {
+    private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) 
{
         if (success) {
-            return Mono.just(uid);
+            return Optional.of(uid);
         }
-        return Mono.empty();
+        return Optional.empty();
     }
 
 }
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 1993f84..854bf44 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -96,7 +96,7 @@ public class CassandraBlobStore implements BlobStore {
             .flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), 
pair.getValue())
                 .then(Mono.just(getChunkNum(pair))))
             .collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
-            .flatMap(Mono::justOrEmpty)
+            .<Integer>handle((t, sink) -> t.ifPresent(sink::next))
             .map(this::numToCount)
             .defaultIfEmpty(0);
     }
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index bbaf01f..cf98198 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -32,6 +32,7 @@ import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepository;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
+import 
org.apache.james.mailrepository.cassandra.CassandraMailRepositoryMailDaoAPI.MailDTO;
 import org.apache.mailet.Mail;
 
 import reactor.core.publisher.Flux;
@@ -92,13 +93,13 @@ public class CassandraMailRepository implements 
MailRepository {
     @Override
     public Mail retrieve(MailKey key) {
         return mailDAO.read(url, key)
-            .flatMap(Mono::justOrEmpty)
+            .<MailDTO>handle((t, sink) -> t.ifPresent(sink::next))
             .flatMap(this::toMail)
             .blockOptional()
             .orElse(null);
     }
 
-    private Mono<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) {
+    private Mono<Mail> toMail(MailDTO mailDTO) {
         MimeMessagePartsId parts = MimeMessagePartsId.builder()
             .headerBlobId(mailDTO.getHeaderBlobId())
             .bodyBlobId(mailDTO.getBodyBlobId())
diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 8f4b2b7..a949af5 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -43,7 +43,6 @@ import com.rabbitmq.client.Delivery;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.DirectProcessor;
-import reactor.core.publisher.Mono;
 import reactor.core.publisher.UnicastProcessor;
 import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.BindingSpecification;


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to