This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0f513dd32611a815b332227ba6521f1d2a4f8247 Author: Benoit Tellier <[email protected]> AuthorDate: Wed Apr 1 17:03:39 2020 +0700 [Performance] Rely on reactor for Cassandra max quota definition --- .../quota/CassandraGlobalMaxQuotaDao.java | 52 +++++++-------- .../quota/CassandraPerDomainMaxQuotaDao.java | 49 ++++++-------- .../quota/CassandraPerUserMaxQuotaDao.java | 49 ++++++-------- .../quota/CassandraPerUserMaxQuotaManager.java | 76 ++++++++++++---------- 4 files changed, 108 insertions(+), 118 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java index fba2c52..74fd9bf 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java @@ -25,24 +25,24 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; -import java.util.Optional; - import javax.inject.Inject; +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.core.quota.QuotaCountLimit; import org.apache.james.core.quota.QuotaSizeLimit; import org.apache.james.mailbox.cassandra.table.CassandraGlobalMaxQuota; import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.Delete; import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.Select; +import reactor.core.publisher.Mono; + public class CassandraGlobalMaxQuotaDao { - private final Session session; + private final CassandraAsyncExecutor queryExecutor; private final PreparedStatement setGlobalMaxStorageStatement; private final PreparedStatement setGlobalMaxMessageStatement; private final PreparedStatement getGlobalMaxStatement; @@ -50,7 +50,7 @@ public class CassandraGlobalMaxQuotaDao { @Inject public CassandraGlobalMaxQuotaDao(Session session) { - this.session = session; + this.queryExecutor = new CassandraAsyncExecutor(session); this.getGlobalMaxStatement = session.prepare(getGlobalMaxStatement()); this.setGlobalMaxMessageStatement = session.prepare(setGlobalMaxMessageStatement()); this.setGlobalMaxStorageStatement = session.prepare(setGlobalMaxStorageStatement()); @@ -81,39 +81,33 @@ public class CassandraGlobalMaxQuotaDao { .where(eq(CassandraGlobalMaxQuota.TYPE, bindMarker(CassandraGlobalMaxQuota.TYPE))); } - public void setGlobalMaxStorage(QuotaSizeLimit globalMaxStorage) { - session.execute(setGlobalMaxStorageStatement.bind(QuotaCodec.quotaValueToLong(globalMaxStorage))); + Mono<Void> setGlobalMaxStorage(QuotaSizeLimit globalMaxStorage) { + return queryExecutor.executeVoid(setGlobalMaxStorageStatement.bind(QuotaCodec.quotaValueToLong(globalMaxStorage))); } - public void setGlobalMaxMessage(QuotaCountLimit globalMaxMessageCount) { - session.execute(setGlobalMaxMessageStatement.bind(QuotaCodec.quotaValueToLong(globalMaxMessageCount))); + Mono<Void> setGlobalMaxMessage(QuotaCountLimit globalMaxMessageCount) { + return queryExecutor.executeVoid(setGlobalMaxMessageStatement.bind(QuotaCodec.quotaValueToLong(globalMaxMessageCount))); } - public Optional<QuotaSizeLimit> getGlobalMaxStorage() { - ResultSet resultSet = session.execute(getGlobalMaxStatement.bind() - .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.STORAGE)); - if (resultSet.isExhausted()) { - return Optional.empty(); - } - Long maxStorage = resultSet.one().get(CassandraGlobalMaxQuota.VALUE, Long.class); - return QuotaCodec.longToQuotaSize(maxStorage); + Mono<QuotaSizeLimit> getGlobalMaxStorage() { + return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind() + .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.STORAGE)) + .flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class))) + .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage))); } - public Optional<QuotaCountLimit> getGlobalMaxMessage() { - ResultSet resultSet = session.execute(getGlobalMaxStatement.bind() - .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.MESSAGE)); - if (resultSet.isExhausted()) { - return Optional.empty(); - } - Long maxMessages = resultSet.one().get(CassandraGlobalMaxQuota.VALUE, Long.class); - return QuotaCodec.longToQuotaCount(maxMessages); + Mono<QuotaCountLimit> getGlobalMaxMessage() { + return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind() + .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.MESSAGE)) + .flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class))) + .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages))); } - public void removeGlobaltMaxStorage() { - session.execute(removeGlobalMaxQuotaStatement.bind(CassandraGlobalMaxQuota.STORAGE)); + Mono<Void> removeGlobaltMaxStorage() { + return queryExecutor.executeVoid(removeGlobalMaxQuotaStatement.bind(CassandraGlobalMaxQuota.STORAGE)); } - public void removeGlobalMaxMessage() { - session.execute(removeGlobalMaxQuotaStatement.bind(CassandraGlobalMaxQuota.MESSAGE)); + Mono<Void> removeGlobalMaxMessage() { + return queryExecutor.executeVoid(removeGlobalMaxQuotaStatement.bind(CassandraGlobalMaxQuota.MESSAGE)); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java index cccbdc2..cb49d3a 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java @@ -25,25 +25,25 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; -import java.util.Optional; - import javax.inject.Inject; +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.core.Domain; import org.apache.james.core.quota.QuotaCountLimit; import org.apache.james.core.quota.QuotaSizeLimit; import org.apache.james.mailbox.cassandra.table.CassandraDomainMaxQuota; import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.Delete; import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.Select; +import reactor.core.publisher.Mono; + public class CassandraPerDomainMaxQuotaDao { - private final Session session; + private final CassandraAsyncExecutor queryExecutor; private final PreparedStatement setMaxStorageStatement; private final PreparedStatement setMaxMessageStatement; private final PreparedStatement getMaxStorageStatement; @@ -53,7 +53,7 @@ public class CassandraPerDomainMaxQuotaDao { @Inject public CassandraPerDomainMaxQuotaDao(Session session) { - this.session = session; + this.queryExecutor = new CassandraAsyncExecutor(session); this.setMaxStorageStatement = session.prepare(setMaxStorageStatement()); this.setMaxMessageStatement = session.prepare(setMaxMessageStatement()); this.getMaxStorageStatement = session.prepare(getMaxStorageStatement()); @@ -98,38 +98,31 @@ public class CassandraPerDomainMaxQuotaDao { .value(CassandraDomainMaxQuota.STORAGE, bindMarker()); } - public void setMaxStorage(Domain domain, QuotaSizeLimit maxStorageQuota) { - session.execute(setMaxStorageStatement.bind(domain.asString(), QuotaCodec.quotaValueToLong(maxStorageQuota))); + Mono<Void> setMaxStorage(Domain domain, QuotaSizeLimit maxStorageQuota) { + return queryExecutor.executeVoid(setMaxStorageStatement.bind(domain.asString(), QuotaCodec.quotaValueToLong(maxStorageQuota))); } - public void setMaxMessage(Domain domain, QuotaCountLimit maxMessageCount) { - session.execute(setMaxMessageStatement.bind(domain.asString(), QuotaCodec.quotaValueToLong(maxMessageCount))); + Mono<Void> setMaxMessage(Domain domain, QuotaCountLimit maxMessageCount) { + return queryExecutor.executeVoid(setMaxMessageStatement.bind(domain.asString(), QuotaCodec.quotaValueToLong(maxMessageCount))); } - public Optional<QuotaSizeLimit> getMaxStorage(Domain domain) { - ResultSet resultSet = session.execute(getMaxStorageStatement.bind(domain.asString())); - if (resultSet.isExhausted()) { - return Optional.empty(); - } - Long maxStorage = resultSet.one().get(CassandraDomainMaxQuota.STORAGE, Long.class); - return QuotaCodec.longToQuotaSize(maxStorage); + Mono<QuotaSizeLimit> getMaxStorage(Domain domain) { + return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(domain.asString())) + .flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.STORAGE, Long.class))) + .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage))); } - public Optional<QuotaCountLimit> getMaxMessage(Domain domain) { - ResultSet resultSet = session.execute(getMaxMessageStatement.bind(domain.asString())); - if (resultSet.isExhausted()) { - return Optional.empty(); - } - Long maxMessages = resultSet.one().get(CassandraDomainMaxQuota.MESSAGE_COUNT, Long.class); - return QuotaCodec.longToQuotaCount(maxMessages); + Mono<QuotaCountLimit> getMaxMessage(Domain domain) { + return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(domain.asString())) + .flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.MESSAGE_COUNT, Long.class))) + .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages))); } - public void removeMaxMessage(Domain domain) { - session.execute(removeMaxMessageStatement.bind(domain.asString())); + Mono<Void> removeMaxMessage(Domain domain) { + return queryExecutor.executeVoid(removeMaxMessageStatement.bind(domain.asString())); } - public void removeMaxStorage(Domain domain) { - session.execute(removeMaxStorageStatement.bind(domain.asString())); + Mono<Void> removeMaxStorage(Domain domain) { + return queryExecutor.executeVoid(removeMaxStorageStatement.bind(domain.asString())); } - } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java index 52beb42..cff560d 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java @@ -25,25 +25,25 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; -import java.util.Optional; - import javax.inject.Inject; +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.core.quota.QuotaCountLimit; import org.apache.james.core.quota.QuotaSizeLimit; import org.apache.james.mailbox.cassandra.table.CassandraMaxQuota; import org.apache.james.mailbox.model.QuotaRoot; import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.Delete; import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.Select; +import reactor.core.publisher.Mono; + public class CassandraPerUserMaxQuotaDao { - private final Session session; + private final CassandraAsyncExecutor queryExecutor; private final PreparedStatement setMaxStorageStatement; private final PreparedStatement setMaxMessageStatement; private final PreparedStatement getMaxStorageStatement; @@ -53,7 +53,7 @@ public class CassandraPerUserMaxQuotaDao { @Inject public CassandraPerUserMaxQuotaDao(Session session) { - this.session = session; + this.queryExecutor = new CassandraAsyncExecutor(session); this.setMaxStorageStatement = session.prepare(setMaxStorageStatement()); this.setMaxMessageStatement = session.prepare(setMaxMessageStatement()); this.getMaxStorageStatement = session.prepare(getMaxStorageStatement()); @@ -98,38 +98,31 @@ public class CassandraPerUserMaxQuotaDao { .value(CassandraMaxQuota.STORAGE, bindMarker()); } - public void setMaxStorage(QuotaRoot quotaRoot, QuotaSizeLimit maxStorageQuota) { - session.execute(setMaxStorageStatement.bind(quotaRoot.getValue(), QuotaCodec.quotaValueToLong(maxStorageQuota))); + Mono<Void> setMaxStorage(QuotaRoot quotaRoot, QuotaSizeLimit maxStorageQuota) { + return queryExecutor.executeVoid(setMaxStorageStatement.bind(quotaRoot.getValue(), QuotaCodec.quotaValueToLong(maxStorageQuota))); } - public void setMaxMessage(QuotaRoot quotaRoot, QuotaCountLimit maxMessageCount) { - session.execute(setMaxMessageStatement.bind(quotaRoot.getValue(), QuotaCodec.quotaValueToLong(maxMessageCount))); + Mono<Void> setMaxMessage(QuotaRoot quotaRoot, QuotaCountLimit maxMessageCount) { + return queryExecutor.executeVoid(setMaxMessageStatement.bind(quotaRoot.getValue(), QuotaCodec.quotaValueToLong(maxMessageCount))); } - public Optional<QuotaSizeLimit> getMaxStorage(QuotaRoot quotaRoot) { - ResultSet resultSet = session.execute(getMaxStorageStatement.bind(quotaRoot.getValue())); - if (resultSet.isExhausted()) { - return Optional.empty(); - } - Long maxStorage = resultSet.one().get(CassandraMaxQuota.STORAGE, Long.class); - return QuotaCodec.longToQuotaSize(maxStorage); + Mono<QuotaSizeLimit> getMaxStorage(QuotaRoot quotaRoot) { + return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(quotaRoot.getValue())) + .flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.STORAGE, Long.class))) + .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage))); } - public Optional<QuotaCountLimit> getMaxMessage(QuotaRoot quotaRoot) { - ResultSet resultSet = session.execute(getMaxMessageStatement.bind(quotaRoot.getValue())); - if (resultSet.isExhausted()) { - return Optional.empty(); - } - Long maxMessages = resultSet.one().get(CassandraMaxQuota.MESSAGE_COUNT, Long.class); - return QuotaCodec.longToQuotaCount(maxMessages); + Mono<QuotaCountLimit> getMaxMessage(QuotaRoot quotaRoot) { + return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(quotaRoot.getValue())) + .flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.MESSAGE_COUNT, Long.class))) + .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages))); } - public void removeMaxMessage(QuotaRoot quotaRoot) { - session.execute(removeMaxMessageStatement.bind(quotaRoot.getValue())); + Mono<Void> removeMaxMessage(QuotaRoot quotaRoot) { + return queryExecutor.executeVoid(removeMaxMessageStatement.bind(quotaRoot.getValue())); } - public void removeMaxStorage(QuotaRoot quotaRoot) { - session.execute(removeMaxStorageStatement.bind(quotaRoot.getValue())); + Mono<Void> removeMaxStorage(QuotaRoot quotaRoot) { + return queryExecutor.executeVoid(removeMaxStorageStatement.bind(quotaRoot.getValue())); } - } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaManager.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaManager.java index 671e707..12d90c1 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaManager.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaManager.java @@ -21,8 +21,6 @@ package org.apache.james.mailbox.cassandra.quota; import java.util.Map; import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Stream; import javax.inject.Inject; @@ -34,9 +32,11 @@ import org.apache.james.mailbox.model.Quota; import org.apache.james.mailbox.model.QuotaRoot; import org.apache.james.mailbox.quota.MaxQuotaManager; -import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class CassandraPerUserMaxQuotaManager implements MaxQuotaManager { private final CassandraPerUserMaxQuotaDao perUserQuota; @@ -54,103 +54,113 @@ public class CassandraPerUserMaxQuotaManager implements MaxQuotaManager { @Override public void setMaxStorage(QuotaRoot quotaRoot, QuotaSizeLimit maxStorageQuota) { - perUserQuota.setMaxStorage(quotaRoot, maxStorageQuota); + perUserQuota.setMaxStorage(quotaRoot, maxStorageQuota).block(); } @Override public void setMaxMessage(QuotaRoot quotaRoot, QuotaCountLimit maxMessageCount) { - perUserQuota.setMaxMessage(quotaRoot, maxMessageCount); + perUserQuota.setMaxMessage(quotaRoot, maxMessageCount).block(); } @Override public void setDomainMaxMessage(Domain domain, QuotaCountLimit count) { - perDomainQuota.setMaxMessage(domain, count); + perDomainQuota.setMaxMessage(domain, count).block(); } @Override public void setDomainMaxStorage(Domain domain, QuotaSizeLimit size) { - perDomainQuota.setMaxStorage(domain, size); + perDomainQuota.setMaxStorage(domain, size).block(); } @Override public void removeDomainMaxMessage(Domain domain) { - perDomainQuota.removeMaxMessage(domain); + perDomainQuota.removeMaxMessage(domain).block(); } @Override public void removeDomainMaxStorage(Domain domain) { - perDomainQuota.removeMaxStorage(domain); + perDomainQuota.removeMaxStorage(domain).block(); } @Override public Optional<QuotaCountLimit> getDomainMaxMessage(Domain domain) { - return perDomainQuota.getMaxMessage(domain); + return perDomainQuota.getMaxMessage(domain).blockOptional(); } @Override public Optional<QuotaSizeLimit> getDomainMaxStorage(Domain domain) { - return perDomainQuota.getMaxStorage(domain); + return perDomainQuota.getMaxStorage(domain).blockOptional(); } @Override public void removeMaxMessage(QuotaRoot quotaRoot) { - perUserQuota.removeMaxMessage(quotaRoot); + perUserQuota.removeMaxMessage(quotaRoot).block(); } @Override public void removeMaxStorage(QuotaRoot quotaRoot) { - perUserQuota.removeMaxStorage(quotaRoot); + perUserQuota.removeMaxStorage(quotaRoot).block(); } @Override public void setGlobalMaxStorage(QuotaSizeLimit globalMaxStorage) { - globalQuota.setGlobalMaxStorage(globalMaxStorage); + globalQuota.setGlobalMaxStorage(globalMaxStorage).block(); } @Override public void removeGlobalMaxStorage() { - globalQuota.removeGlobaltMaxStorage(); + globalQuota.removeGlobaltMaxStorage().block(); } @Override public void setGlobalMaxMessage(QuotaCountLimit globalMaxMessageCount) { - globalQuota.setGlobalMaxMessage(globalMaxMessageCount); + globalQuota.setGlobalMaxMessage(globalMaxMessageCount).block(); } @Override public void removeGlobalMaxMessage() { - globalQuota.removeGlobalMaxMessage(); + globalQuota.removeGlobalMaxMessage().block(); } @Override public Optional<QuotaSizeLimit> getGlobalMaxStorage() { - return globalQuota.getGlobalMaxStorage(); + return globalQuota.getGlobalMaxStorage().blockOptional(); } @Override public Optional<QuotaCountLimit> getGlobalMaxMessage() { - return globalQuota.getGlobalMaxMessage(); + return globalQuota.getGlobalMaxMessage().blockOptional(); } @Override public Map<Quota.Scope, QuotaCountLimit> listMaxMessagesDetails(QuotaRoot quotaRoot) { - Function<Domain, Optional<QuotaCountLimit>> domainQuotaSupplier = Throwing.function(this::getDomainMaxMessage).sneakyThrow(); - return Stream.of( - Pair.of(Quota.Scope.User, perUserQuota.getMaxMessage(quotaRoot)), - Pair.of(Quota.Scope.Domain, quotaRoot.getDomain().flatMap(domainQuotaSupplier)), - Pair.of(Quota.Scope.Global, globalQuota.getGlobalMaxMessage())) - .filter(pair -> pair.getValue().isPresent()) - .collect(Guavate.toImmutableMap(Pair::getKey, value -> value.getValue().get())); + return Flux.merge( + perUserQuota.getMaxMessage(quotaRoot) + .map(limit -> Pair.of(Quota.Scope.User, limit)), + Mono.justOrEmpty(quotaRoot.getDomain()) + .flatMap(perDomainQuota::getMaxMessage) + .map(limit -> Pair.of(Quota.Scope.Domain, limit)), + globalQuota.getGlobalMaxMessage() + .map(limit -> Pair.of(Quota.Scope.Global, limit))) + .collect(Guavate.toImmutableMap( + Pair::getKey, + Pair::getValue)) + .block(); } @Override public Map<Quota.Scope, QuotaSizeLimit> listMaxStorageDetails(QuotaRoot quotaRoot) { - Function<Domain, Optional<QuotaSizeLimit>> domainQuotaSupplier = Throwing.function(this::getDomainMaxStorage).sneakyThrow(); - return Stream.of( - Pair.of(Quota.Scope.User, perUserQuota.getMaxStorage(quotaRoot)), - Pair.of(Quota.Scope.Domain, quotaRoot.getDomain().flatMap(domainQuotaSupplier)), - Pair.of(Quota.Scope.Global, globalQuota.getGlobalMaxStorage())) - .filter(pair -> pair.getValue().isPresent()) - .collect(Guavate.toImmutableMap(Pair::getKey, value -> value.getValue().get())); + return Flux.merge( + perUserQuota.getMaxStorage(quotaRoot) + .map(limit -> Pair.of(Quota.Scope.User, limit)), + Mono.justOrEmpty(quotaRoot.getDomain()) + .flatMap(perDomainQuota::getMaxStorage) + .map(limit -> Pair.of(Quota.Scope.Domain, limit)), + globalQuota.getGlobalMaxStorage() + .map(limit -> Pair.of(Quota.Scope.Global, limit))) + .collect(Guavate.toImmutableMap( + Pair::getKey, + Pair::getValue)) + .block(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
