This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a4f75c27dea83aa6b7b9a5b232ebaf80ff67642a Author: Benoit Tellier <[email protected]> AuthorDate: Mon May 4 11:12:26 2020 +0700 [Refactoring] Reactor: use handle for nullable transformations https://github.com/reactor/reactor-core/issues/86 --- .../org/apache/james/backends/rabbitmq/SimpleConnectionPool.java | 4 +++- .../james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java | 5 +++-- .../james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java | 5 +++-- .../james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java | 5 +++-- .../util/src/main/java/org/apache/james/util/ReactorUtils.java | 5 +++++ 5 files changed, 17 insertions(+), 7 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java index b174795..97ebfbe 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java @@ -19,6 +19,8 @@ package org.apache.james.backends.rabbitmq; +import static org.apache.james.util.ReactorUtils.transformAndPublishIfNotNull; + import java.io.IOException; import java.time.Duration; import java.util.Optional; @@ -89,7 +91,7 @@ public class SimpleConnectionPool implements AutoCloseable { public Mono<RabbitMQServerVersion> version() { return getOpenConnection() .map(Connection::getServerProperties) - .flatMap(serverProperties -> Mono.justOrEmpty(serverProperties.get("version"))) + .handle(transformAndPublishIfNotNull(serverProperties -> serverProperties.get("version"))) .map(Object::toString) .map(RabbitMQServerVersion::of) .timeout(Duration.ofSeconds(1)) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java index 9ee9159..03d4e54 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java @@ -25,6 +25,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static org.apache.james.util.ReactorUtils.publishIfPresent; +import static org.apache.james.util.ReactorUtils.transformAndPublishIfNotNull; import javax.inject.Inject; @@ -93,7 +94,7 @@ public class CassandraGlobalMaxQuotaDao { Mono<QuotaSizeLimit> getGlobalMaxStorage() { return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind() .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.STORAGE)) - .flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class))) + .handle(transformAndPublishIfNotNull(row -> row.get(CassandraGlobalMaxQuota.VALUE, Long.class))) .map(QuotaCodec::longToQuotaSize) .handle(publishIfPresent()); } @@ -101,7 +102,7 @@ public class CassandraGlobalMaxQuotaDao { Mono<QuotaCountLimit> getGlobalMaxMessage() { return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind() .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.MESSAGE)) - .flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class))) + .handle(transformAndPublishIfNotNull(row -> row.get(CassandraGlobalMaxQuota.VALUE, Long.class))) .map(QuotaCodec::longToQuotaCount) .handle(publishIfPresent()); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java index 38161ef..102d793 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java @@ -25,6 +25,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static org.apache.james.util.ReactorUtils.publishIfPresent; +import static org.apache.james.util.ReactorUtils.transformAndPublishIfNotNull; import javax.inject.Inject; @@ -109,14 +110,14 @@ public class CassandraPerDomainMaxQuotaDao { Mono<QuotaSizeLimit> getMaxStorage(Domain domain) { return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(domain.asString())) - .flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.STORAGE, Long.class))) + .handle(transformAndPublishIfNotNull(row -> row.get(CassandraDomainMaxQuota.STORAGE, Long.class))) .map(QuotaCodec::longToQuotaSize) .handle(publishIfPresent()); } Mono<QuotaCountLimit> getMaxMessage(Domain domain) { return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(domain.asString())) - .flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.MESSAGE_COUNT, Long.class))) + .handle(transformAndPublishIfNotNull(row -> row.get(CassandraDomainMaxQuota.MESSAGE_COUNT, Long.class))) .map(QuotaCodec::longToQuotaCount) .handle(publishIfPresent()); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java index 2b4e21b..4abe876 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java @@ -25,6 +25,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static org.apache.james.util.ReactorUtils.publishIfPresent; +import static org.apache.james.util.ReactorUtils.transformAndPublishIfNotNull; import javax.inject.Inject; @@ -109,14 +110,14 @@ public class CassandraPerUserMaxQuotaDao { Mono<QuotaSizeLimit> getMaxStorage(QuotaRoot quotaRoot) { return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(quotaRoot.getValue())) - .flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.STORAGE, Long.class))) + .handle(transformAndPublishIfNotNull(row -> row.get(CassandraMaxQuota.STORAGE, Long.class))) .map(QuotaCodec::longToQuotaSize) .handle(publishIfPresent()); } Mono<QuotaCountLimit> getMaxMessage(QuotaRoot quotaRoot) { return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(quotaRoot.getValue())) - .flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.MESSAGE_COUNT, Long.class))) + .handle(transformAndPublishIfNotNull(row -> row.get(CassandraMaxQuota.MESSAGE_COUNT, Long.class))) .map(QuotaCodec::longToQuotaCount) .handle(publishIfPresent()); } diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index a477355..7fb5d5a 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -45,6 +46,10 @@ public class ReactorUtils { return (element, sink) -> element.ifPresent(sink::next); } + public static <T, U> BiConsumer<U, SynchronousSink<T>> transformAndPublishIfNotNull(Function<U, T> mapper) { + return (element, sink) -> Optional.ofNullable(mapper.apply(element)).ifPresent(sink::next); + } + public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) { return new StreamInputStream(byteArrays.toIterable(1).iterator()); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
