This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d4647112f6e71186f8f039c95e94d3425cfc4962 Author: Benoit Tellier <[email protected]> AuthorDate: Sat May 1 22:34:04 2021 +0700 JAMES-3575 Rewrite StoreMessageIdManager::setFlags in a more reactive way --- .../james/mailbox/store/StoreMessageIdManager.java | 32 ++++++++++------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index 6ab1ffc..5031573 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -82,7 +82,6 @@ import com.github.fge.lambdas.functions.ThrowingFunction; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import reactor.core.publisher.Flux; @@ -125,20 +124,19 @@ public class StoreMessageIdManager implements MessageIdManager { MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession); int concurrency = 4; - List<Mailbox> targetMailboxes = Flux.fromIterable(mailboxIds) + + MailboxReactorUtils.block(Flux.fromIterable(mailboxIds) .flatMap(mailboxMapper::findMailboxById, concurrency) .collect(Guavate.toImmutableList()) - .subscribeOn(Schedulers.elastic()) - .block(); - - assertRightsOnMailboxes(targetMailboxes, mailboxSession, Right.Write); + .flatMap(Throwing.<List<Mailbox>, Mono<Void>>function(targetMailboxes -> { + assertRightsOnMailboxes(targetMailboxes, mailboxSession, Right.Write); - Multimap<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace) - .subscribeOn(Schedulers.elastic()) - .block(); - for (Map.Entry<MailboxId, Collection<UpdatedFlags>> entry : updatedFlags.asMap().entrySet()) { - dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes); - } + return messageIdMapper.setFlags(messageId, mailboxIds, newState, replace) + .flatMapIterable(updatedFlags -> updatedFlags.asMap().entrySet()) + .concatMap(entry -> dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes)) + .then(); + }).sneakyThrow()) + .subscribeOn(Schedulers.elastic())); } @Override @@ -389,8 +387,7 @@ public class StoreMessageIdManager implements MessageIdManager { .then()); } - private void dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags, - List<Mailbox> knownMailboxes) throws MailboxException { + private Mono<Void> dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags, List<Mailbox> knownMailboxes) { if (updatedFlags.stream().anyMatch(UpdatedFlags::flagsChanged)) { Mailbox mailbox = knownMailboxes.stream() .filter(knownMailbox -> knownMailbox.getMailboxId().equals(mailboxId)) @@ -399,16 +396,15 @@ public class StoreMessageIdManager implements MessageIdManager { .findMailboxById(mailboxId) .subscribeOn(Schedulers.elastic()))) .sneakyThrow()); - eventBus.dispatch(EventFactory.flagsUpdated() + return eventBus.dispatch(EventFactory.flagsUpdated() .randomEventId() .mailboxSession(mailboxSession) .mailbox(mailbox) .updatedFlags(updatedFlags) .build(), - new MailboxIdRegistrationKey(mailboxId)) - .subscribeOn(Schedulers.elastic()) - .block(); + new MailboxIdRegistrationKey(mailboxId)); } + return Mono.empty(); } private void validateQuota(MessageMovesWithMailbox messageMoves, MailboxMessage mailboxMessage) throws MailboxException { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
