This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 20ca53821b0342b0bc12d466c1450534e3b197e1 Author: Benoit Tellier <[email protected]> AuthorDate: Sat May 1 22:06:42 2021 +0700 JAMES-3575 Make message deletion slightly more reactive --- .../cassandra/mail/CassandraMessageIdMapper.java | 11 +++-- .../james/mailbox/store/StoreMessageIdManager.java | 50 +++++++++++----------- .../james/mailbox/store/mail/MessageIdMapper.java | 4 ++ 3 files changed, 37 insertions(+), 28 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 1224932..5d5972e 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -217,13 +217,18 @@ public class CassandraMessageIdMapper implements MessageIdMapper { @Override public void delete(Multimap<MessageId, MailboxId> ids) { - Flux.fromIterable(ids.asMap() + deleteReactive(ids) + .block(); + } + + + public Mono<Void> deleteReactive(Multimap<MessageId, MailboxId> ids) { + return Flux.fromIterable(ids.asMap() .entrySet()) .publishOn(Schedulers.elastic()) .flatMap(entry -> deleteReactive(entry.getKey(), entry.getValue()), cassandraConfiguration.getExpungeChunkSize(), DEFAULT_CONCURRENCY) - .then() - .block(); + .then(); } private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) { diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index e40d42d..d34b4a7 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -206,7 +206,9 @@ public class StoreMessageIdManager implements MessageIdManager { .collect(Guavate.toImmutableList()); if (!messageList.isEmpty()) { - deleteWithPreHooks(messageIdMapper, messageList, mailboxSession); + deleteWithPreHooks(messageIdMapper, messageList, mailboxSession) + .subscribeOn(Schedulers.elastic()) + .block(); return DeleteResult.destroyed(messageId); } return DeleteResult.notFound(messageId); @@ -230,7 +232,9 @@ public class StoreMessageIdManager implements MessageIdManager { .collect(Guavate.toImmutableSet()); Sets.SetView<MessageId> nonAccessibleMessages = Sets.difference(ImmutableSet.copyOf(messageIds), accessibleMessageIds); - deleteWithPreHooks(messageIdMapper, accessibleMessages, mailboxSession); + deleteWithPreHooks(messageIdMapper, accessibleMessages, mailboxSession) + .subscribeOn(Schedulers.elastic()) + .block(); return DeleteResult.builder() .addDestroyed(accessibleMessageIds) @@ -238,38 +242,34 @@ public class StoreMessageIdManager implements MessageIdManager { .build(); } - private void deleteWithPreHooks(MessageIdMapper messageIdMapper, List<MailboxMessage> messageList, MailboxSession mailboxSession) throws MailboxException { + private Mono<Void> deleteWithPreHooks(MessageIdMapper messageIdMapper, List<MailboxMessage> messageList, MailboxSession mailboxSession) { ImmutableList<MetadataWithMailboxId> metadataWithMailbox = messageList.stream() .map(mailboxMessage -> MetadataWithMailboxId.from(mailboxMessage.metaData(), mailboxMessage.getMailboxId())) .collect(Guavate.toImmutableList()); - preDeletionHooks.runHooks(PreDeletionHook.DeleteOperation.from(metadataWithMailbox)) - .then(Mono.fromRunnable(Throwing.runnable( - () -> delete(messageIdMapper, messageList, mailboxSession, metadataWithMailbox)))) - .block(); + return preDeletionHooks.runHooks(PreDeletionHook.DeleteOperation.from(metadataWithMailbox)) + .then(delete(messageIdMapper, messageList, mailboxSession, metadataWithMailbox)); } - private void delete(MessageIdMapper messageIdMapper, List<MailboxMessage> messageList, MailboxSession mailboxSession, - ImmutableList<MetadataWithMailboxId> metadataWithMailbox) throws MailboxException { - messageIdMapper.delete( + private Mono<Void> delete(MessageIdMapper messageIdMapper, List<MailboxMessage> messageList, MailboxSession mailboxSession, ImmutableList<MetadataWithMailboxId> metadataWithMailbox) { + MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession); + + return messageIdMapper.deleteReactive( messageList.stream() .collect(Guavate.toImmutableListMultimap( Message::getMessageId, - MailboxMessage::getMailboxId))); - - MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession); - Flux.fromIterable(metadataWithMailbox) - .flatMap(metadataWithMailboxId -> mailboxMapper.findMailboxById(metadataWithMailboxId.getMailboxId()) - .flatMap(mailbox -> eventBus.dispatch(EventFactory.expunged() - .randomEventId() - .mailboxSession(mailboxSession) - .mailbox(mailbox) - .addMetaData(metadataWithMailboxId.getMessageMetaData()) - .build(), - new MailboxIdRegistrationKey(metadataWithMailboxId.getMailboxId()))), DEFAULT_CONCURRENCY) - .then() - .subscribeOn(Schedulers.elastic()) - .block(); + MailboxMessage::getMailboxId))) + .then( + Flux.fromIterable(metadataWithMailbox) + .flatMap(metadataWithMailboxId -> mailboxMapper.findMailboxById(metadataWithMailboxId.getMailboxId()) + .flatMap(mailbox -> eventBus.dispatch(EventFactory.expunged() + .randomEventId() + .mailboxSession(mailboxSession) + .mailbox(mailbox) + .addMetaData(metadataWithMailboxId.getMessageMetaData()) + .build(), + new MailboxIdRegistrationKey(metadataWithMailboxId.getMailboxId()))), DEFAULT_CONCURRENCY) + .then()); } @Override diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java index 5191435..7bdafbc 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java @@ -69,6 +69,10 @@ public interface MessageIdMapper { .forEach(this::delete); } + default Mono<Void> deleteReactive(Multimap<MessageId, MailboxId> ids) { + return Mono.fromRunnable(() -> delete(ids)); + } + /** * Updates the flags of the messages with the given MessageId in the supplied mailboxes * --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
