This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 39f2724e4b1bf23cde7b02e774b725ddf9a9dc43 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Thu Jan 16 09:57:09 2020 +0100 [REFACTORING] Reactor: Use handle instead of flatMap for flux synchronous transformations --- .../apache/james/vault/metadata/MetadataDAO.java | 2 +- .../james/vault/metadata/MetadataSerializer.java | 30 ++++++++++++++++------ .../distributed/RabbitMQTerminationSubscriber.java | 8 +++--- .../apache/james/task/SerialTaskManagerWorker.java | 2 +- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java index 251728c..87553e9 100644 --- a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java @@ -109,7 +109,7 @@ public class MetadataDAO { .setString(BUCKET_NAME, bucketName.asString()) .setString(OWNER, username.asString())) .map(row -> row.getString(PAYLOAD)) - .flatMap(metadataSerializer::deserialize); + .handle((json, sink) -> metadataSerializer.deserialize(json).ifPresent(sink::next)); } Flux<MessageId> retrieveMessageIds(BucketName bucketName, Username username) { diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataSerializer.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataSerializer.java index 89b55c4..b191ecb 100644 --- a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataSerializer.java +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataSerializer.java @@ -19,6 +19,8 @@ package org.apache.james.vault.metadata; +import java.util.Optional; + import javax.inject.Inject; import org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter; @@ -31,8 +33,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; -import reactor.core.publisher.Mono; - class MetadataSerializer { private static final Logger LOGGER = LoggerFactory.getLogger(MetadataSerializer.class); @@ -47,13 +47,27 @@ class MetadataSerializer { .setSerializationInclusion(JsonInclude.Include.NON_ABSENT); } - Mono<DeletedMessageWithStorageInformation> deserialize(String payload) { - return Mono.just(payload) - .flatMap(string -> Mono.fromCallable(() -> objectMapper.readValue(string, DeletedMessageWithStorageInformationDTO.class)) - .onErrorResume(e -> Mono.fromRunnable(() -> LOGGER.error("Error deserializing JSON metadata", e)))) - .flatMap(dto -> Mono.fromCallable(() -> dtoConverter.toDomainObject(dto)) - .onErrorResume(e -> Mono.fromRunnable(() -> LOGGER.error("Error deserializing DTO", e)))); + Optional<DeletedMessageWithStorageInformation> deserialize(String payload) { + return deserializeDto(payload) + .flatMap(this::toDomainObject); + } + private Optional<DeletedMessageWithStorageInformationDTO> deserializeDto(String payload) { + try { + return Optional.of(objectMapper.readValue(payload, DeletedMessageWithStorageInformationDTO.class)); + } catch (Exception e) { + LOGGER.error("Error deserializing JSON metadata", e); + return Optional.empty(); + } + } + + private Optional<DeletedMessageWithStorageInformation> toDomainObject(DeletedMessageWithStorageInformationDTO dto) { + try { + return Optional.of(dtoConverter.toDomainObject(dto)); + } catch (Exception e) { + LOGGER.error("Error deserializing DTO", e); + return Optional.empty(); + } } String serialize(DeletedMessageWithStorageInformation message) { diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index b04ea22..8f4b2b7 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -94,7 +94,7 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta listenQueueHandle = listenerReceiver .consumeAutoAck(queueName) .subscribeOn(Schedulers.elastic()) - .concatMap(this::toEvent) + .<Event>handle((delivery, sink) -> toEvent(delivery).ifPresent(sink::next)) .subscribe(listener::onNext); } @@ -116,14 +116,14 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta .share(); } - private Mono<Event> toEvent(Delivery delivery) { + private Optional<Event> toEvent(Delivery delivery) { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); try { Event event = serializer.deserialize(message); - return Mono.just(event); + return Optional.of(event); } catch (Exception e) { LOGGER.error("Unable to deserialize '{}'", message, e); - return Mono.empty(); + return Optional.empty(); } } diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index ed35478..faf61ca 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -89,7 +89,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { return Mono.fromCallable(() -> taskWithId.getTask().details()) .delayElement(pollingInterval, Schedulers.elastic()) .repeat() - .flatMap(Mono::justOrEmpty) + .<TaskExecutionDetails.AdditionalInformation>handle((maybeDetails, sink) -> maybeDetails.ifPresent(sink::next)) .doOnNext(information -> listener.updated(taskWithId.getId(), information)); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org