This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 39f2724e4b1bf23cde7b02e774b725ddf9a9dc43
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Thu Jan 16 09:57:09 2020 +0100

    [REFACTORING] Reactor: Use handle instead of flatMap for flux synchronous 
transformations
---
 .../apache/james/vault/metadata/MetadataDAO.java   |  2 +-
 .../james/vault/metadata/MetadataSerializer.java   | 30 ++++++++++++++++------
 .../distributed/RabbitMQTerminationSubscriber.java |  8 +++---
 .../apache/james/task/SerialTaskManagerWorker.java |  2 +-
 4 files changed, 28 insertions(+), 14 deletions(-)

diff --git 
a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java
 
b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java
index 251728c..87553e9 100644
--- 
a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java
+++ 
b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java
@@ -109,7 +109,7 @@ public class MetadataDAO {
                 .setString(BUCKET_NAME, bucketName.asString())
                 .setString(OWNER, username.asString()))
             .map(row -> row.getString(PAYLOAD))
-            .flatMap(metadataSerializer::deserialize);
+            .handle((json, sink) -> 
metadataSerializer.deserialize(json).ifPresent(sink::next));
     }
 
     Flux<MessageId> retrieveMessageIds(BucketName bucketName, Username 
username) {
diff --git 
a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataSerializer.java
 
b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataSerializer.java
index 89b55c4..b191ecb 100644
--- 
a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataSerializer.java
+++ 
b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataSerializer.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.vault.metadata;
 
+import java.util.Optional;
+
 import javax.inject.Inject;
 
 import 
org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter;
@@ -31,8 +33,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
 
-import reactor.core.publisher.Mono;
-
 class MetadataSerializer {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataSerializer.class);
 
@@ -47,13 +47,27 @@ class MetadataSerializer {
             .setSerializationInclusion(JsonInclude.Include.NON_ABSENT);
     }
 
-    Mono<DeletedMessageWithStorageInformation> deserialize(String payload) {
-        return Mono.just(payload)
-            .flatMap(string -> Mono.fromCallable(() -> 
objectMapper.readValue(string, DeletedMessageWithStorageInformationDTO.class))
-                .onErrorResume(e -> Mono.fromRunnable(() -> 
LOGGER.error("Error deserializing JSON metadata", e))))
-            .flatMap(dto -> Mono.fromCallable(() -> 
dtoConverter.toDomainObject(dto))
-                .onErrorResume(e -> Mono.fromRunnable(() -> 
LOGGER.error("Error deserializing DTO", e))));
+    Optional<DeletedMessageWithStorageInformation> deserialize(String payload) 
{
+        return deserializeDto(payload)
+            .flatMap(this::toDomainObject);
+    }
 
+    private Optional<DeletedMessageWithStorageInformationDTO> 
deserializeDto(String payload) {
+        try {
+            return Optional.of(objectMapper.readValue(payload, 
DeletedMessageWithStorageInformationDTO.class));
+        } catch (Exception e) {
+            LOGGER.error("Error deserializing JSON metadata", e);
+            return Optional.empty();
+        }
+    }
+
+    private Optional<DeletedMessageWithStorageInformation> 
toDomainObject(DeletedMessageWithStorageInformationDTO dto) {
+        try {
+            return Optional.of(dtoConverter.toDomainObject(dto));
+        } catch (Exception e) {
+            LOGGER.error("Error deserializing DTO", e);
+            return Optional.empty();
+        }
     }
 
     String serialize(DeletedMessageWithStorageInformation message) {
diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index b04ea22..8f4b2b7 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -94,7 +94,7 @@ public class RabbitMQTerminationSubscriber implements 
TerminationSubscriber, Sta
         listenQueueHandle = listenerReceiver
             .consumeAutoAck(queueName)
             .subscribeOn(Schedulers.elastic())
-            .concatMap(this::toEvent)
+            .<Event>handle((delivery, sink) -> 
toEvent(delivery).ifPresent(sink::next))
             .subscribe(listener::onNext);
     }
 
@@ -116,14 +116,14 @@ public class RabbitMQTerminationSubscriber implements 
TerminationSubscriber, Sta
             .share();
     }
 
-    private Mono<Event> toEvent(Delivery delivery) {
+    private Optional<Event> toEvent(Delivery delivery) {
         String message = new String(delivery.getBody(), 
StandardCharsets.UTF_8);
         try {
             Event event = serializer.deserialize(message);
-            return Mono.just(event);
+            return Optional.of(event);
         } catch (Exception e) {
             LOGGER.error("Unable to deserialize '{}'", message, e);
-            return Mono.empty();
+            return Optional.empty();
         }
     }
 
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index ed35478..faf61ca 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -89,7 +89,7 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
         return Mono.fromCallable(() -> taskWithId.getTask().details())
             .delayElement(pollingInterval, Schedulers.elastic())
             .repeat()
-            .flatMap(Mono::justOrEmpty)
+            .<TaskExecutionDetails.AdditionalInformation>handle((maybeDetails, 
sink) -> maybeDetails.ifPresent(sink::next))
             .doOnNext(information -> listener.updated(taskWithId.getId(), 
information));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to