This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 119c9f34a6991c9c0db48245de89e53d8f6d0a55 Author: Benoit Tellier <[email protected]> AuthorDate: Sun May 30 20:08:31 2021 +0700 [PERFORMANCE] MailboxChangeListener should be fully reactive --- .../org/apache/james/mailbox/MessageIdManager.java | 2 ++ .../james/mailbox/store/StoreMessageIdManager.java | 32 +++++++++--------- .../apache/james/jmap/api/change/EmailChange.java | 39 +++++++++++----------- .../james/jmap/change/MailboxChangeListener.scala | 5 +-- 4 files changed, 39 insertions(+), 39 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java index 4bb6060..8d42623 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java @@ -50,6 +50,8 @@ public interface MessageIdManager { Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, final MailboxSession mailboxSession) throws MailboxException; + Publisher<Set<MessageId>> accessibleMessagesReactive(Collection<MessageId> messageIds, final MailboxSession mailboxSession); + void setFlags(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException; Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index 47d44a0..71f7ab4 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -144,20 +144,22 @@ public class StoreMessageIdManager implements MessageIdManager { } @Override - public Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException { + public Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, MailboxSession mailboxSession) { + return accessibleMessagesReactive(messageIds, mailboxSession).block(); + } + + @Override + public Mono<Set<MessageId>> accessibleMessagesReactive(Collection<MessageId> messageIds, MailboxSession mailboxSession) { MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession); - ImmutableList<ComposedMessageIdWithMetaData> idList = Flux.fromIterable(messageIds) + return Flux.fromIterable(messageIds) .flatMap(messageIdMapper::findMetadata, DEFAULT_CONCURRENCY) .collect(Guavate.toImmutableList()) - .block(); - - ImmutableSet<MailboxId> allowedMailboxIds = getAllowedMailboxIds(mailboxSession, idList.stream() - .map(id -> id.getComposedMessageId().getMailboxId()), Right.Read); - - return idList.stream() - .filter(id -> allowedMailboxIds.contains(id.getComposedMessageId().getMailboxId())) - .map(id -> id.getComposedMessageId().getMessageId()) - .collect(Guavate.toImmutableSet()); + .flatMap(idList -> getAllowedMailboxIds(mailboxSession, idList.stream() + .map(id -> id.getComposedMessageId().getMailboxId()), Right.Read) + .map(allowedMailboxIds -> idList.stream() + .filter(id -> allowedMailboxIds.contains(id.getComposedMessageId().getMailboxId())) + .map(id -> id.getComposedMessageId().getMessageId()) + .collect(Guavate.toImmutableSet()))); } @Override @@ -190,11 +192,7 @@ public class StoreMessageIdManager implements MessageIdManager { .flatMap(Function.identity(), DEFAULT_CONCURRENCY); } - private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) throws MailboxException { - return MailboxReactorUtils.block(getAllowedMailboxIdsReactive(mailboxSession, idList, rights)); - } - - private Mono<ImmutableSet<MailboxId>> getAllowedMailboxIdsReactive(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) { + private Mono<ImmutableSet<MailboxId>> getAllowedMailboxIds(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) { return Flux.fromStream(idList) .distinct() .filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights), DEFAULT_CONCURRENCY) @@ -229,7 +227,7 @@ public class StoreMessageIdManager implements MessageIdManager { return messageIdMapper.findReactive(messageIds, MessageMapper.FetchType.Metadata) .collectList() .flatMap(messageList -> - getAllowedMailboxIdsReactive(mailboxSession, + getAllowedMailboxIds(mailboxSession, messageList .stream() .map(MailboxMessage::getMailboxId), diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java index 6210da1..00e2733 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java @@ -23,7 +23,6 @@ import java.time.ZonedDateTime; import java.util.Collection; import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.stream.Stream; import javax.inject.Inject; @@ -35,10 +34,8 @@ import org.apache.james.mailbox.SessionProvider; import org.apache.james.mailbox.events.MailboxEvents.Added; import org.apache.james.mailbox.events.MailboxEvents.Expunged; import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.MessageId; -import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -46,6 +43,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class EmailChange implements JmapChange { public static class Builder { @FunctionalInterface @@ -185,28 +185,27 @@ public class EmailChange implements JmapChange { .collect(Guavate.toImmutableList()); } - public List<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<Username> sharees) throws MailboxException { + public Flux<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<Username> sharees) { - EmailChange ownerChange = fromExpunged(expunged, now, expunged.getUsername()); + Mono<EmailChange> ownerChange = fromExpunged(expunged, now, expunged.getUsername()); - Stream<EmailChange> shareeChanges = sharees.stream() - .map(Throwing.<Username, EmailChange>function(shareeId -> fromExpunged(expunged, now, shareeId)).sneakyThrow()); + Flux<EmailChange> shareeChanges = Flux.fromIterable(sharees) + .flatMap(shareeId -> fromExpunged(expunged, now, shareeId)); - return Stream.concat(Stream.of(ownerChange), shareeChanges) - .collect(Guavate.toImmutableList()); + return Flux.concat(ownerChange, shareeChanges); } - private EmailChange fromExpunged(Expunged expunged, ZonedDateTime now, Username username) throws MailboxException { - Set<MessageId> accessibleMessageIds = messageIdManager.accessibleMessages(expunged.getMessageIds(), sessionProvider.createSystemSession(username)); - - return EmailChange.builder() - .accountId(AccountId.fromUsername(username)) - .state(stateFactory.generate()) - .date(now) - .isDelegated(false) - .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds)) - .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds)) - .build(); + private Mono<EmailChange> fromExpunged(Expunged expunged, ZonedDateTime now, Username username) { + return Mono.from(messageIdManager.accessibleMessagesReactive(expunged.getMessageIds(), + sessionProvider.createSystemSession(username))) + .map(accessibleMessageIds -> EmailChange.builder() + .accountId(AccountId.fromUsername(username)) + .state(stateFactory.generate()) + .date(now) + .isDelegated(false) + .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds)) + .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds)) + .build()); } } diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala index b4c6aa9..e3cb5b9 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala @@ -90,8 +90,9 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus: .concat(emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala)) case expunged: Expunged => getSharees(mailboxId, username) - .flatMapIterable(sharees => mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala - .concat(emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava).asScala)) + .flatMapMany(sharees => SFlux.concat( + SFlux.fromIterable(mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala), + emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava))) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
