This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a4f75c27dea83aa6b7b9a5b232ebaf80ff67642a
Author: Benoit Tellier <[email protected]>
AuthorDate: Mon May 4 11:12:26 2020 +0700

    [Refactoring] Reactor: use handle for nullable transformations
    
    https://github.com/reactor/reactor-core/issues/86
---
 .../org/apache/james/backends/rabbitmq/SimpleConnectionPool.java     | 4 +++-
 .../james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java    | 5 +++--
 .../james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java | 5 +++--
 .../james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java   | 5 +++--
 .../util/src/main/java/org/apache/james/util/ReactorUtils.java       | 5 +++++
 5 files changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
index b174795..97ebfbe 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.backends.rabbitmq;
 
+import static org.apache.james.util.ReactorUtils.transformAndPublishIfNotNull;
+
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Optional;
@@ -89,7 +91,7 @@ public class SimpleConnectionPool implements AutoCloseable {
     public Mono<RabbitMQServerVersion> version() {
         return getOpenConnection()
             .map(Connection::getServerProperties)
-            .flatMap(serverProperties -> 
Mono.justOrEmpty(serverProperties.get("version")))
+            .handle(transformAndPublishIfNotNull(serverProperties -> 
serverProperties.get("version")))
             .map(Object::toString)
             .map(RabbitMQServerVersion::of)
             .timeout(Duration.ofSeconds(1))
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
index 9ee9159..03d4e54 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
@@ -25,6 +25,7 @@ import static 
com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
+import static org.apache.james.util.ReactorUtils.transformAndPublishIfNotNull;
 
 import javax.inject.Inject;
 
@@ -93,7 +94,7 @@ public class CassandraGlobalMaxQuotaDao {
     Mono<QuotaSizeLimit> getGlobalMaxStorage() {
         return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind()
                 .setString(CassandraGlobalMaxQuota.TYPE, 
CassandraGlobalMaxQuota.STORAGE))
-            .flatMap(row -> 
Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class)))
+            .handle(transformAndPublishIfNotNull(row -> 
row.get(CassandraGlobalMaxQuota.VALUE, Long.class)))
             .map(QuotaCodec::longToQuotaSize)
             .handle(publishIfPresent());
     }
@@ -101,7 +102,7 @@ public class CassandraGlobalMaxQuotaDao {
     Mono<QuotaCountLimit> getGlobalMaxMessage() {
         return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind()
             .setString(CassandraGlobalMaxQuota.TYPE, 
CassandraGlobalMaxQuota.MESSAGE))
-            .flatMap(row -> 
Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class)))
+            .handle(transformAndPublishIfNotNull(row -> 
row.get(CassandraGlobalMaxQuota.VALUE, Long.class)))
             .map(QuotaCodec::longToQuotaCount)
             .handle(publishIfPresent());
     }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
index 38161ef..102d793 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
@@ -25,6 +25,7 @@ import static 
com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
+import static org.apache.james.util.ReactorUtils.transformAndPublishIfNotNull;
 
 import javax.inject.Inject;
 
@@ -109,14 +110,14 @@ public class CassandraPerDomainMaxQuotaDao {
 
     Mono<QuotaSizeLimit> getMaxStorage(Domain domain) {
         return 
queryExecutor.executeSingleRow(getMaxStorageStatement.bind(domain.asString()))
-            .flatMap(row -> 
Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.STORAGE, Long.class)))
+            .handle(transformAndPublishIfNotNull(row -> 
row.get(CassandraDomainMaxQuota.STORAGE, Long.class)))
             .map(QuotaCodec::longToQuotaSize)
             .handle(publishIfPresent());
     }
 
     Mono<QuotaCountLimit> getMaxMessage(Domain domain) {
         return 
queryExecutor.executeSingleRow(getMaxMessageStatement.bind(domain.asString()))
-            .flatMap(row -> 
Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.MESSAGE_COUNT, Long.class)))
+            .handle(transformAndPublishIfNotNull(row -> 
row.get(CassandraDomainMaxQuota.MESSAGE_COUNT, Long.class)))
             .map(QuotaCodec::longToQuotaCount)
             .handle(publishIfPresent());
     }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
index 2b4e21b..4abe876 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
@@ -25,6 +25,7 @@ import static 
com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
+import static org.apache.james.util.ReactorUtils.transformAndPublishIfNotNull;
 
 import javax.inject.Inject;
 
@@ -109,14 +110,14 @@ public class CassandraPerUserMaxQuotaDao {
 
     Mono<QuotaSizeLimit> getMaxStorage(QuotaRoot quotaRoot) {
         return 
queryExecutor.executeSingleRow(getMaxStorageStatement.bind(quotaRoot.getValue()))
-            .flatMap(row -> 
Mono.justOrEmpty(row.get(CassandraMaxQuota.STORAGE, Long.class)))
+            .handle(transformAndPublishIfNotNull(row -> 
row.get(CassandraMaxQuota.STORAGE, Long.class)))
             .map(QuotaCodec::longToQuotaSize)
             .handle(publishIfPresent());
     }
 
     Mono<QuotaCountLimit> getMaxMessage(QuotaRoot quotaRoot) {
         return 
queryExecutor.executeSingleRow(getMaxMessageStatement.bind(quotaRoot.getValue()))
-            .flatMap(row -> 
Mono.justOrEmpty(row.get(CassandraMaxQuota.MESSAGE_COUNT, Long.class)))
+            .handle(transformAndPublishIfNotNull(row -> 
row.get(CassandraMaxQuota.MESSAGE_COUNT, Long.class)))
             .map(QuotaCodec::longToQuotaCount)
             .handle(publishIfPresent());
     }
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index a477355..7fb5d5a 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.Optional;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -45,6 +46,10 @@ public class ReactorUtils {
         return (element, sink) -> element.ifPresent(sink::next);
     }
 
+    public static <T, U> BiConsumer<U, SynchronousSink<T>> 
transformAndPublishIfNotNull(Function<U, T> mapper) {
+        return (element, sink) -> 
Optional.ofNullable(mapper.apply(element)).ifPresent(sink::next);
+    }
+
     public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) {
         return new StreamInputStream(byteArrays.toIterable(1).iterator());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to