This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit bff6763adbfb02ac4cbf1819b8d666cb3053b59c Author: Benoit Tellier <[email protected]> AuthorDate: Thu May 21 16:39:45 2020 +0700 JAMES-3184 Further reactify quota updates --- .../james/mailbox/quota/QuotaRootResolver.java | 4 +--- .../quota/InMemoryCurrentQuotaManager.java | 26 ++++++++++++++------ .../quota/InMemoryCurrentQuotaCalculatorTest.java | 12 ++++++---- .../store/quota/CurrentQuotaCalculator.java | 28 ++++++++-------------- .../store/quota/DefaultUserQuotaRootResolver.java | 28 ++++++++++++---------- .../quota/DefaultUserQuotaRootResolverTest.java | 7 +++--- .../quota/task/RecomputeCurrentQuotasService.java | 2 +- .../james/imap/processor/GetQuotaProcessor.java | 9 ++++++- .../imap/processor/GetQuotaProcessorTest.java | 8 +++---- 9 files changed, 70 insertions(+), 54 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/quota/QuotaRootResolver.java b/mailbox/api/src/main/java/org/apache/james/mailbox/quota/QuotaRootResolver.java index 13e1099..a887363 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/quota/QuotaRootResolver.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/quota/QuotaRootResolver.java @@ -19,8 +19,6 @@ package org.apache.james.mailbox.quota; -import java.util.List; - import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.Mailbox; @@ -43,5 +41,5 @@ public interface QuotaRootResolver extends QuotaRootDeserializer { Publisher<QuotaRoot> getQuotaRootReactive(MailboxId mailboxId); - List<Mailbox> retrieveAssociatedMailboxes(QuotaRoot quotaRoot, MailboxSession mailboxSession) throws MailboxException; + Publisher<Mailbox> retrieveAssociatedMailboxes(QuotaRoot quotaRoot, MailboxSession mailboxSession); } diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java index 8521557..cd9c5c7 100644 --- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java +++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java @@ -40,6 +40,7 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class InMemoryCurrentQuotaManager implements CurrentQuotaManager { @@ -47,14 +48,21 @@ public class InMemoryCurrentQuotaManager implements CurrentQuotaManager { @Inject public InMemoryCurrentQuotaManager(CurrentQuotaCalculator quotaCalculator, SessionProvider sessionProvider) { - this.quotaCache = CacheBuilder.newBuilder().build(new CacheLoader<QuotaRoot, AtomicReference<CurrentQuotas>>() { + this.quotaCache = CacheBuilder.newBuilder().build(new CacheLoader<>() { @Override - public AtomicReference<CurrentQuotas> load(QuotaRoot quotaRoot) throws Exception { - return new AtomicReference<>(quotaCalculator.recalculateCurrentQuotas(quotaRoot, sessionProvider.createSystemSession(Username.of(quotaRoot.getValue())))); + public AtomicReference<CurrentQuotas> load(QuotaRoot quotaRoot) { + return new AtomicReference<>( + loadQuotas(quotaRoot, quotaCalculator, sessionProvider)); } }); } + public CurrentQuotas loadQuotas(QuotaRoot quotaRoot, CurrentQuotaCalculator quotaCalculator, SessionProvider sessionProvider) { + return quotaCalculator.recalculateCurrentQuotas(quotaRoot, sessionProvider.createSystemSession(Username.of(quotaRoot.getValue()))) + .subscribeOn(Schedulers.elastic()) + .block(); + } + @Override public Mono<Void> increase(QuotaOperation quotaOperation) { return updateQuota(quotaOperation.quotaRoot(), quota -> quota.increase(new CurrentQuotas(quotaOperation.count(), quotaOperation.size()))); @@ -68,19 +76,22 @@ public class InMemoryCurrentQuotaManager implements CurrentQuotaManager { @Override public Mono<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot) { return Mono.fromCallable(() -> quotaCache.get(quotaRoot).get().count()) - .onErrorMap(this::wrapAsMailboxException); + .onErrorMap(this::wrapAsMailboxException) + .subscribeOn(Schedulers.elastic()); } @Override public Mono<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot) { return Mono.fromCallable(() -> quotaCache.get(quotaRoot).get().size()) - .onErrorMap(this::wrapAsMailboxException); + .onErrorMap(this::wrapAsMailboxException) + .subscribeOn(Schedulers.elastic()); } @Override public Mono<CurrentQuotas> getCurrentQuotas(QuotaRoot quotaRoot) { return Mono.fromCallable(() -> quotaCache.get(quotaRoot).get()) - .onErrorMap(this::wrapAsMailboxException); + .onErrorMap(this::wrapAsMailboxException) + .subscribeOn(Schedulers.elastic()); } @Override @@ -88,7 +99,8 @@ public class InMemoryCurrentQuotaManager implements CurrentQuotaManager { return getCurrentQuotas(quotaOperation.quotaRoot()) .filter(storedQuotas -> !storedQuotas.equals(CurrentQuotas.from(quotaOperation))) .flatMap(storedQuotas -> decrease(new QuotaOperation(quotaOperation.quotaRoot(), storedQuotas.count(), storedQuotas.size())) - .then(increase(quotaOperation))); + .then(increase(quotaOperation))) + .subscribeOn(Schedulers.elastic()); } private Mono<Void> updateQuota(QuotaRoot quotaRoot, UnaryOperator<CurrentQuotas> quotaFunction) { diff --git a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaCalculatorTest.java b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaCalculatorTest.java index e4f5b79..508a22e 100644 --- a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaCalculatorTest.java +++ b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaCalculatorTest.java @@ -35,6 +35,8 @@ import org.apache.james.mailbox.store.quota.CurrentQuotaCalculator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + class InMemoryCurrentQuotaCalculatorTest { static final QuotaRoot QUOTA_ROOT = QuotaRoot.quotaRoot("benwa", Optional.empty()); static final CurrentQuotas CURRENT_QUOTAS = new CurrentQuotas( @@ -53,7 +55,7 @@ class InMemoryCurrentQuotaCalculatorTest { @Test void getCurrentMessageCountShouldReturnRecalculateMessageCountWhenEntryIsNotInitialized() throws Exception { when(mockedCurrentQuotaCalculator.recalculateCurrentQuotas(QUOTA_ROOT, null)) - .thenReturn(CURRENT_QUOTAS); + .thenReturn(Mono.just(CURRENT_QUOTAS)); assertThat(testee.getCurrentMessageCount(QUOTA_ROOT).block()).isEqualTo(QuotaCountUsage.count(18)); } @@ -61,7 +63,7 @@ class InMemoryCurrentQuotaCalculatorTest { @Test void getCurrentStorageShouldReturnRecalculateSizeWhenEntryIsNotInitialized() throws Exception { when(mockedCurrentQuotaCalculator.recalculateCurrentQuotas(QUOTA_ROOT, null)) - .thenReturn(CURRENT_QUOTAS); + .thenReturn(Mono.just(CURRENT_QUOTAS)); assertThat(testee.getCurrentStorage(QUOTA_ROOT).block()).isEqualTo(QuotaSizeUsage.size(512)); } @@ -69,7 +71,7 @@ class InMemoryCurrentQuotaCalculatorTest { @Test void getCurrentStorageShouldReRetrieveStoredQuotasWhenCalculateOnUnknownQuotaIsTrue() throws Exception { when(mockedCurrentQuotaCalculator.recalculateCurrentQuotas(QUOTA_ROOT, null)) - .thenReturn(CURRENT_QUOTAS); + .thenReturn(Mono.just(CURRENT_QUOTAS)); QuotaOperation quotaOperation = new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100)); testee.increase(quotaOperation).block(); @@ -81,7 +83,7 @@ class InMemoryCurrentQuotaCalculatorTest { @Test void getCurrentQuotasShouldReturnRecalculateQuotasWhenEntryIsNotInitialized() throws Exception { when(mockedCurrentQuotaCalculator.recalculateCurrentQuotas(QUOTA_ROOT, null)) - .thenReturn(CURRENT_QUOTAS); + .thenReturn(Mono.just(CURRENT_QUOTAS)); assertThat(testee.getCurrentQuotas(QUOTA_ROOT).block()).isEqualTo(CURRENT_QUOTAS); } @@ -89,7 +91,7 @@ class InMemoryCurrentQuotaCalculatorTest { @Test void getCurrentQuotasShouldReRetrieveStoredQuotasWhenCalculateOnUnknownQuotaIsTrue() throws Exception { when(mockedCurrentQuotaCalculator.recalculateCurrentQuotas(QUOTA_ROOT, null)) - .thenReturn(CURRENT_QUOTAS); + .thenReturn(Mono.just(CURRENT_QUOTAS)); QuotaOperation quotaOperation = new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100)); testee.increase(quotaOperation).block(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/CurrentQuotaCalculator.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/CurrentQuotaCalculator.java index 7c01a09..c1681c6 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/CurrentQuotaCalculator.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/CurrentQuotaCalculator.java @@ -21,25 +21,23 @@ package org.apache.james.mailbox.store.quota; import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED; -import java.util.Iterator; -import java.util.List; - import javax.inject.Inject; import org.apache.james.core.quota.QuotaCountUsage; import org.apache.james.core.quota.QuotaSizeUsage; import org.apache.james.mailbox.MailboxSession; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.CurrentQuotas; -import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.model.QuotaRoot; import org.apache.james.mailbox.quota.QuotaRootResolver; import org.apache.james.mailbox.store.MailboxSessionMapperFactory; import org.apache.james.mailbox.store.mail.MessageMapper; -import org.apache.james.mailbox.store.mail.model.MailboxMessage; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CurrentQuotaCalculator { + private static final int NO_CONCURRENCY = 1; private final MailboxSessionMapperFactory factory; private final QuotaRootResolver quotaRootResolver; @@ -51,18 +49,12 @@ public class CurrentQuotaCalculator { this.quotaRootResolver = quotaRootResolver; } - public CurrentQuotas recalculateCurrentQuotas(QuotaRoot quotaRoot, MailboxSession session) throws MailboxException { - List<Mailbox> mailboxes = quotaRootResolver.retrieveAssociatedMailboxes(quotaRoot, session); + public Mono<CurrentQuotas> recalculateCurrentQuotas(QuotaRoot quotaRoot, MailboxSession session) { MessageMapper mapper = factory.getMessageMapper(session); - long messagesSizes = 0; - long messageCount = 0; - for (Mailbox mailbox : mailboxes) { - Iterator<MailboxMessage> messages = mapper.findInMailbox(mailbox, MessageRange.all(), MessageMapper.FetchType.Metadata, UNLIMITED); - messageCount += mapper.countMessagesInMailbox(mailbox); - while (messages.hasNext()) { - messagesSizes += messages.next().getFullContentOctets(); - } - } - return new CurrentQuotas(QuotaCountUsage.count(messageCount), QuotaSizeUsage.size(messagesSizes)); + + return Flux.from(quotaRootResolver.retrieveAssociatedMailboxes(quotaRoot, session)) + .flatMap(mailbox -> mapper.findInMailboxReactive(mailbox, MessageRange.all(), MessageMapper.FetchType.Metadata, UNLIMITED), NO_CONCURRENCY) + .map(message -> new CurrentQuotas(QuotaCountUsage.count(1), QuotaSizeUsage.size(message.getFullContentOctets()))) + .reduce(CurrentQuotas.emptyQuotas(), CurrentQuotas::increase); } } \ No newline at end of file diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java index 552e155..773a8b0 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java @@ -41,6 +41,7 @@ import org.apache.james.mailbox.store.MailboxSessionMapperFactory; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver { @@ -115,17 +116,20 @@ public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver { } @Override - public List<Mailbox> retrieveAssociatedMailboxes(QuotaRoot quotaRoot, MailboxSession mailboxSession) throws MailboxException { - List<String> parts = QUOTA_ROOT_DESERIALIZER.toParts(quotaRoot.getValue()); - String namespace = parts.get(0); - String user = parts.get(1); - return factory.getMailboxMapper(mailboxSession) - .findMailboxWithPathLike(MailboxQuery.builder() - .namespace(namespace) - .user(Username.of(user)) - .matchesAllMailboxNames() - .build() - .asUserBound()) - .collectList().block(); + public Flux<Mailbox> retrieveAssociatedMailboxes(QuotaRoot quotaRoot, MailboxSession mailboxSession) { + try { + List<String> parts = QUOTA_ROOT_DESERIALIZER.toParts(quotaRoot.getValue()); + String namespace = parts.get(0); + String user = parts.get(1); + return factory.getMailboxMapper(mailboxSession) + .findMailboxWithPathLike(MailboxQuery.builder() + .namespace(namespace) + .user(Username.of(user)) + .matchesAllMailboxNames() + .build() + .asUserBound()); + } catch (MailboxException e) { + return Flux.error(e); + } } } diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java index b022655..3a2cfbc 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java @@ -95,14 +95,15 @@ class DefaultUserQuotaRootResolverTest { when(mockedFactory.getMailboxMapper(MAILBOX_SESSION)).thenReturn(mockedMapper); when(mockedMapper.findMailboxWithPathLike(any())).thenReturn(Flux.just(MAILBOX, MAILBOX_2)); - assertThat(testee.retrieveAssociatedMailboxes(QUOTA_ROOT, MAILBOX_SESSION)).containsOnly(MAILBOX, MAILBOX_2); + assertThat(testee.retrieveAssociatedMailboxes(QUOTA_ROOT, MAILBOX_SESSION).collectList().block()).containsOnly(MAILBOX, MAILBOX_2); } @Test void retrieveAssociatedMailboxesShouldThrowWhenQuotaRootContainsSeparator2Times() throws Exception { assertThatThrownBy(() -> testee.retrieveAssociatedMailboxes( - QuotaRoot.quotaRoot("#private&be&nwa", Optional.empty()), MAILBOX_SESSION)) - .isInstanceOf(MailboxException.class); + QuotaRoot.quotaRoot("#private&be&nwa", Optional.empty()), MAILBOX_SESSION) + .collectList().block()) + .hasCauseInstanceOf(MailboxException.class); } @Test diff --git a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java index 2d9095b..57d7b72 100644 --- a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java +++ b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java @@ -179,7 +179,7 @@ public class RecomputeCurrentQuotasService { MailboxSession session = sessionProvider.createSystemSession(username); QuotaRoot quotaRoot = userQuotaRootResolver.forUser(username); - return Mono.fromCallable(() -> currentQuotaCalculator.recalculateCurrentQuotas(quotaRoot, session)) + return currentQuotaCalculator.recalculateCurrentQuotas(quotaRoot, session) .map(recalculatedQuotas -> QuotaOperation.from(quotaRoot, recalculatedQuotas)) .flatMap(quotaOperation -> Mono.from(storeCurrentQuotaManager.setCurrentQuotas(quotaOperation))) .then(Mono.just(Task.Result.COMPLETED)) diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/GetQuotaProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/GetQuotaProcessor.java index 1f9e2bf..273aff8 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/GetQuotaProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/GetQuotaProcessor.java @@ -46,8 +46,12 @@ import org.apache.james.mailbox.quota.QuotaRootResolver; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.util.MDCBuilder; +import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + /** * GETQUOTA processor */ @@ -101,7 +105,10 @@ public class GetQuotaProcessor extends AbstractMailboxProcessor<GetQuotaRequest> private boolean hasRight(QuotaRoot quotaRoot, ImapSession session) throws MailboxException { // If any of the mailboxes owned by quotaRoot user can be read by the current user, then we should respond to him. final MailboxSession mailboxSession = session.getMailboxSession(); - List<Mailbox> mailboxList = quotaRootResolver.retrieveAssociatedMailboxes(quotaRoot, mailboxSession); + List<Mailbox> mailboxList = Flux.from(quotaRootResolver.retrieveAssociatedMailboxes(quotaRoot, mailboxSession)) + .collect(Guavate.toImmutableList()) + .subscribeOn(Schedulers.elastic()) + .block(); for (Mailbox mailbox : mailboxList) { if (getMailboxManager().hasRight(mailbox.generateAssociatedPath(), MailboxACL.Right.Read, mailboxSession)) { return true; diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/GetQuotaProcessorTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/GetQuotaProcessorTest.java index bcb35dc..3bf1c3d 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/processor/GetQuotaProcessorTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/GetQuotaProcessorTest.java @@ -58,7 +58,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; -import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; public class GetQuotaProcessorTest { @@ -102,7 +102,7 @@ public class GetQuotaProcessorTest { GetQuotaRequest getQuotaRequest = new GetQuotaRequest(TAG, QUOTA_ROOT.getValue()); when(mockedQuotaRootResolver.retrieveAssociatedMailboxes(QUOTA_ROOT, mailboxSession)) - .thenReturn(ImmutableList.of(mailbox)); + .thenReturn(Flux.just(mailbox)); when(mockedMailboxManager.hasRight(MAILBOX_PATH, MailboxACL.Right.Read, mailboxSession)) .thenReturn(true); when(mockedQuotaManager.getMessageQuota(QUOTA_ROOT)).thenReturn(MESSAGE_QUOTA); @@ -129,7 +129,7 @@ public class GetQuotaProcessorTest { GetQuotaRequest getQuotaRequest = new GetQuotaRequest(TAG, QUOTA_ROOT.getValue()); when(mockedQuotaRootResolver.retrieveAssociatedMailboxes(QUOTA_ROOT, mailboxSession)) - .thenReturn(ImmutableList.of(mailbox)); + .thenReturn(Flux.just((mailbox))); when(mockedMailboxManager.hasRight(MAILBOX_PATH, MailboxACL.Right.Read, mailboxSession)) .thenReturn(true); when(mockedQuotaManager.getMessageQuota(QUOTA_ROOT)).thenThrow(new MailboxException()); @@ -151,7 +151,7 @@ public class GetQuotaProcessorTest { GetQuotaRequest getQuotaRequest = new GetQuotaRequest(TAG, QUOTA_ROOT.getValue()); when(mockedQuotaRootResolver.retrieveAssociatedMailboxes(QUOTA_ROOT, mailboxSession)) - .thenReturn(ImmutableList.of(mailbox)); + .thenReturn(Flux.just((mailbox))); when(mockedMailboxManager.hasRight(MAILBOX_PATH, MailboxACL.Right.Read, mailboxSession)) .thenReturn(false); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
