This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f2fbe4f0cc378394254be7b7a7e36cda290009e6 Author: Benoit Tellier <[email protected]> AuthorDate: Mon May 4 18:25:21 2020 +0700 JAMES-3149 EventBus caller should choose on which scheduler he dispatch This avoid enforcing the thread context switch upon dispatch, optimizing thread usage. --- .../apache/james/mailbox/events/GroupContract.java | 5 ++- .../james/mailbox/events/EventDispatcher.java | 7 +--- .../james/mailbox/store/StoreMailboxManager.java | 49 ++++++++++++---------- .../james/mailbox/store/StoreMessageIdManager.java | 28 ++++++++----- .../james/mailbox/store/StoreMessageManager.java | 28 +++++++------ .../james/mailbox/store/StoreRightManager.java | 3 ++ 6 files changed, 69 insertions(+), 51 deletions(-) diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java index 83ef2cf..dfe31ac 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java @@ -56,6 +56,7 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; + import reactor.core.scheduler.Schedulers; public interface GroupContract { @@ -158,7 +159,9 @@ public interface GroupContract { AtomicBoolean successfulRetry = new AtomicBoolean(false); MailboxListener listener = event -> { if (event.getEventId().equals(EVENT_ID)) { - eventBus().dispatch(EVENT_2, NO_KEYS).block(); + eventBus().dispatch(EVENT_2, NO_KEYS) + .subscribeOn(Schedulers.elastic()) + .block(); successfulRetry.set(true); } }; diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java index 3de1fe0..c150184 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java @@ -46,7 +46,6 @@ import com.rabbitmq.client.AMQP; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; -import reactor.core.scheduler.Schedulers; import reactor.rabbitmq.ExchangeSpecification; import reactor.rabbitmq.OutboundMessage; import reactor.rabbitmq.Sender; @@ -95,7 +94,6 @@ public class EventDispatcher { .concat( dispatchToLocalListeners(event, keys), dispatchToRemoteListeners(event, keys)) - .subscribeOn(Schedulers.elastic()) .doOnError(throwable -> LOGGER.error("error while dispatching event", throwable)) .then() .subscribeWith(MonoProcessor.create()); @@ -106,7 +104,7 @@ public class EventDispatcher { .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key) .map(listener -> Tuples.of(key, listener))) .filter(pair -> pair.getT2().getExecutionMode() == MailboxListener.ExecutionMode.SYNCHRONOUS) - .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1())).subscribeOn(Schedulers.elastic()) + .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1())) .then(); } @@ -161,8 +159,7 @@ public class EventDispatcher { if (routingKeys.isEmpty()) { return Mono.empty(); } - return sender.send(toMessages(serializedEvent, routingKeys)) - .subscribeOn(Schedulers.elastic()); + return sender.send(toMessages(serializedEvent, routingKeys)); } private Flux<OutboundMessage> toMessages(byte[] serializedEvent, Collection<RoutingKey> routingKeys) { diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java index 1f72b2c..bb68c0d 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java @@ -93,6 +93,7 @@ import com.google.common.collect.Iterables; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; /** * This base class of an {@link MailboxManager} implementation provides a high-level api for writing your own @@ -382,6 +383,7 @@ public class StoreMailboxManager implements MailboxManager { .mailbox(mailbox) .build(), new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .subscribeOn(Schedulers.elastic()) .block(); })); } catch (MailboxExistsException e) { @@ -451,14 +453,15 @@ public class StoreMailboxManager implements MailboxManager { Mailbox m = new Mailbox(mailbox); mailboxMapper.delete(mailbox); eventBus.dispatch(EventFactory.mailboxDeleted() - .randomEventId() - .mailboxSession(session) - .mailbox(mailbox) - .quotaRoot(quotaRoot) - .quotaCount(QuotaCountUsage.count(messageCount)) - .quotaSize(QuotaSizeUsage.size(totalSize)) - .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .randomEventId() + .mailboxSession(session) + .mailbox(mailbox) + .quotaRoot(quotaRoot) + .quotaCount(QuotaCountUsage.count(messageCount)) + .quotaSize(QuotaSizeUsage.size(totalSize)) + .build(), + new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .subscribeOn(Schedulers.elastic()) .block(); return m; } @@ -519,13 +522,14 @@ public class StoreMailboxManager implements MailboxManager { mapper.rename(mailbox); eventBus.dispatch(EventFactory.mailboxRenamed() - .randomEventId() - .mailboxSession(session) - .mailboxId(mailbox.getMailboxId()) - .oldPath(from) - .newPath(newMailboxPath) - .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .randomEventId() + .mailboxSession(session) + .mailboxId(mailbox.getMailboxId()) + .oldPath(from) + .newPath(newMailboxPath) + .build(), + new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .subscribeOn(Schedulers.elastic()) .block(); // rename submailboxes @@ -543,13 +547,14 @@ public class StoreMailboxManager implements MailboxManager { sub.setName(subNewName); mapper.rename(sub); eventBus.dispatch(EventFactory.mailboxRenamed() - .randomEventId() - .mailboxSession(session) - .mailboxId(sub.getMailboxId()) - .oldPath(fromPath) - .newPath(sub.generateAssociatedPath()) - .build(), - new MailboxIdRegistrationKey(sub.getMailboxId())) + .randomEventId() + .mailboxSession(session) + .mailboxId(sub.getMailboxId()) + .oldPath(fromPath) + .newPath(sub.generateAssociatedPath()) + .build(), + new MailboxIdRegistrationKey(sub.getMailboxId())) + .subscribeOn(Schedulers.elastic()) .block(); LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index 3f87736..17c0477 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -80,6 +80,7 @@ import com.google.common.collect.Sets; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class StoreMessageIdManager implements MessageIdManager { @@ -235,6 +236,7 @@ public class StoreMessageIdManager implements MessageIdManager { new MailboxIdRegistrationKey(metadataWithMailboxId.getMailboxId()))) .sneakyThrow()) .then() + .subscribeOn(Schedulers.elastic()) .block(); } @@ -305,6 +307,7 @@ public class StoreMessageIdManager implements MessageIdManager { messageMoves.impactedMailboxIds() .map(MailboxIdRegistrationKey::new) .collect(Guavate.toImmutableSet())) + .subscribeOn(Schedulers.elastic()) .block(); } @@ -322,6 +325,7 @@ public class StoreMessageIdManager implements MessageIdManager { .addMetaData(eventPayload) .build(), new MailboxIdRegistrationKey(mailboxId)) + .subscribeOn(Schedulers.elastic()) .block(); } } @@ -331,12 +335,13 @@ public class StoreMessageIdManager implements MessageIdManager { Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId); eventBus.dispatch(EventFactory.flagsUpdated() - .randomEventId() - .mailboxSession(mailboxSession) - .mailbox(mailbox) - .updatedFlags(updatedFlags) - .build(), + .randomEventId() + .mailboxSession(mailboxSession) + .mailbox(mailbox) + .updatedFlags(updatedFlags) + .build(), new MailboxIdRegistrationKey(mailboxId)) + .subscribeOn(Schedulers.elastic()) .block(); } } @@ -393,12 +398,13 @@ public class StoreMessageIdManager implements MessageIdManager { save(messageIdMapper, copy); eventBus.dispatch(EventFactory.added() - .randomEventId() - .mailboxSession(mailboxSession) - .mailbox(mailboxMapper.findMailboxById(mailboxId)) - .addMetaData(copy.metaData()) - .build(), - new MailboxIdRegistrationKey(mailboxId)) + .randomEventId() + .mailboxSession(mailboxSession) + .mailbox(mailboxMapper.findMailboxById(mailboxId)) + .addMetaData(copy.metaData()) + .build(), + new MailboxIdRegistrationKey(mailboxId)) + .subscribeOn(Schedulers.elastic()) .block(); } } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java index 9fd3ad4..8708344 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java @@ -289,6 +289,7 @@ public class StoreMessageManager implements MessageManager { .metaData(ImmutableSortedMap.copyOf(deletedMessages)) .build(), new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .subscribeOn(Schedulers.elastic()) .block(); } @@ -437,12 +438,12 @@ public class StoreMessageManager implements MessageManager { Mailbox mailbox = getMailboxEntity(); eventBus.dispatch(EventFactory.added() - .randomEventId() - .mailboxSession(mailboxSession) - .mailbox(mailbox) - .addMetaData(data.getLeft()) - .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .randomEventId() + .mailboxSession(mailboxSession) + .mailbox(mailbox) + .addMetaData(data.getLeft()) + .build(), + new MailboxIdRegistrationKey(mailbox.getMailboxId())) .subscribeOn(Schedulers.elastic()) .block(); MessageMetaData messageMetaData = data.getLeft(); @@ -592,12 +593,13 @@ public class StoreMessageManager implements MessageManager { List<UpdatedFlags> updatedFlags = Iterators.toStream(it).collect(Guavate.toImmutableList()); eventBus.dispatch(EventFactory.flagsUpdated() - .randomEventId() - .mailboxSession(mailboxSession) - .mailbox(getMailboxEntity()) - .updatedFlags(updatedFlags) - .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .randomEventId() + .mailboxSession(mailboxSession) + .mailbox(getMailboxEntity()) + .updatedFlags(updatedFlags) + .build(), + new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .subscribeOn(Schedulers.elastic()) .block(); return updatedFlags.stream().collect(Guavate.toImmutableMap( @@ -759,6 +761,7 @@ public class StoreMessageManager implements MessageManager { .messageId(messageIds.build()) .build(), messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(Guavate.toImmutableSet()))) + .subscribeOn(Schedulers.elastic()) .blockLast(); return copiedUids; @@ -800,6 +803,7 @@ public class StoreMessageManager implements MessageManager { .session(session) .build(), messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(Guavate.toImmutableSet()))) + .subscribeOn(Schedulers.elastic()) .blockLast(); return moveUids; diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java index 20f4779..6b4c205 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java @@ -54,6 +54,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class StoreRightManager implements RightManager { public static final boolean GROUP_FOLDER = true; @@ -156,6 +157,7 @@ public class StoreRightManager implements RightManager { .aclDiff(aclDiff) .build(), new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .subscribeOn(Schedulers.elastic()) .block(); } @@ -241,6 +243,7 @@ public class StoreRightManager implements RightManager { .aclDiff(aclDiff) .build(), new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .subscribeOn(Schedulers.elastic()) .block(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
