This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0763ff05e653d815522eede47f2b54e9eacf6cb8 Author: Benoit Tellier <[email protected]> AuthorDate: Sat May 1 22:26:41 2021 +0700 JAMES-3575 MessageIdMapper::setFlags can easily be reactive --- .../cassandra/mail/CassandraMessageIdMapper.java | 5 +- .../mail/CassandraMessageIdMapperTest.java | 3 +- .../inmemory/mail/InMemoryMessageIdMapper.java | 9 ++-- .../james/mailbox/store/StoreMessageIdManager.java | 4 +- .../james/mailbox/store/mail/MessageIdMapper.java | 2 +- .../store/mail/model/MessageIdMapperTest.java | 57 ++++++++++++---------- 6 files changed, 44 insertions(+), 36 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 5d5972e..59ea987 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -254,14 +254,13 @@ public class CassandraMessageIdMapper implements MessageIdMapper { } @Override - public Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { + public Mono<Multimap<MailboxId, UpdatedFlags>> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { return Flux.fromIterable(mailboxIds) .distinct() .map(mailboxId -> (CassandraId) mailboxId) .concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId)) .flatMap(this::updateCounts, ReactorUtils.DEFAULT_CONCURRENCY) - .collect(Guavate.toImmutableListMultimap(Pair::getLeft, Pair::getRight)) - .block(); + .collect(Guavate.toImmutableListMultimap(Pair::getLeft, Pair::getRight)); } private Flux<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java index 85dd03d..5261cc2 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java @@ -118,7 +118,8 @@ class CassandraMessageIdMapperTest extends MessageIdMapperTest { mapperFactory.getMessageIdMapper(MAILBOX_SESSION).setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flags.Flag.DELETED), - MessageManager.FlagsUpdateMode.REPLACE); + MessageManager.FlagsUpdateMode.REPLACE) + .block(); assertThat(statementRecorder.listExecutedStatements( StatementRecorder.Selector.preparedStatementStartingWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted," + diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java index 626640a..8be1835 100644 --- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java +++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java @@ -52,6 +52,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class InMemoryMessageIdMapper implements MessageIdMapper { private final MailboxMapper mailboxMapper; @@ -132,16 +133,16 @@ public class InMemoryMessageIdMapper implements MessageIdMapper { } @Override - public Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, - Flags newState, FlagsUpdateMode updateMode) { - return find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata) + public Mono<Multimap<MailboxId, UpdatedFlags>> setFlags(MessageId messageId, List<MailboxId> mailboxIds, + Flags newState, FlagsUpdateMode updateMode) { + return Mono.fromCallable(() -> find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata) .stream() .filter(message -> mailboxIds.contains(message.getMailboxId())) .map(updateMessage(newState, updateMode)) .distinct() .collect(Guavate.toImmutableListMultimap( Pair::getKey, - Pair::getValue)); + Pair::getValue))); } private Function<MailboxMessage, Pair<MailboxId, UpdatedFlags>> updateMessage(Flags newState, FlagsUpdateMode updateMode) { diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index 116e91b..6ab1ffc 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -133,7 +133,9 @@ public class StoreMessageIdManager implements MessageIdManager { assertRightsOnMailboxes(targetMailboxes, mailboxSession, Right.Write); - Multimap<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace); + Multimap<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace) + .subscribeOn(Schedulers.elastic()) + .block(); for (Map.Entry<MailboxId, Collection<UpdatedFlags>> entry : updatedFlags.asMap().entrySet()) { dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes); } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java index 7bdafbc..02e1726 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java @@ -82,5 +82,5 @@ public interface MessageIdMapper { * @return Metadata of the update, indexed by mailboxIds. * @throws MailboxException */ - Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException; + Mono<Multimap<MailboxId, UpdatedFlags>> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException; } diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java index e30a2ac..0c61eb0 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java @@ -350,7 +350,7 @@ public abstract class MessageIdMapperTest { MessageId messageId = message1.getMessageId(); Flags newFlags = new Flags(Flag.ANSWERED); - Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.ADD); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.ADD).block(); ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox); UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder() @@ -378,7 +378,7 @@ public abstract class MessageIdMapperTest { .add("userflag") .build(); - Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REPLACE); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REPLACE).block(); ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox); UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder() @@ -408,7 +408,7 @@ public abstract class MessageIdMapperTest { .add("userflag") .build(); - Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE).block(); ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox); UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder() @@ -438,7 +438,7 @@ public abstract class MessageIdMapperTest { .add("userflag") .build(); - sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE); + sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE).block(); List<MailboxMessage> messages = sut.find(ImmutableList.of(messageId), MessageMapper.FetchType.Body); assertThat(messages).hasSize(1); @@ -454,7 +454,7 @@ public abstract class MessageIdMapperTest { MessageId messageId = message1.getMessageId(); Flags newFlags = new Flags(Flag.ANSWERED); - Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE).block(); assertThat(flags.asMap()).isEmpty(); } @@ -462,7 +462,7 @@ public abstract class MessageIdMapperTest { @Test void setFlagsShouldReturnEmptyWhenMessageIdDoesntExist() throws Exception { MessageId unknownMessageId = mapperProvider.generateMessageId(); - Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(unknownMessageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(unknownMessageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE).block(); assertThat(flags.asMap()).isEmpty(); } @@ -477,7 +477,7 @@ public abstract class MessageIdMapperTest { MessageId messageId = message1.getMessageId(); - Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD).block(); Flags newFlags = new FlagsBuilder() .add(Flag.RECENT) @@ -508,7 +508,7 @@ public abstract class MessageIdMapperTest { MessageId messageId = message1.getMessageId(); Flags newFlags = new Flags(Flag.ANSWERED); - Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), newFlags, FlagsUpdateMode.ADD); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), newFlags, FlagsUpdateMode.ADD).block(); ModSeq modSeqBenwaInboxMailbox = mapperProvider.highestModSeq(benwaInboxMailbox); ModSeq modSeqBenwaWorkMailbox = mapperProvider.highestModSeq(benwaWorkMailbox); @@ -538,7 +538,7 @@ public abstract class MessageIdMapperTest { sut.save(message1); MessageId messageId = message1.getMessageId(); - sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); + sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD).block(); List<MailboxMessage> messages = sut.find(ImmutableList.of(messageId), MessageMapper.FetchType.Body); assertThat(messages).hasSize(1); @@ -554,7 +554,7 @@ public abstract class MessageIdMapperTest { MessageId messageId = message1.getMessageId(); Flags newFlags = new Flags(Flag.ANSWERED); - sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE); + sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE).block(); List<MailboxMessage> messages = sut.find(ImmutableList.of(messageId), MessageMapper.FetchType.Body); assertThat(messages).hasSize(1); @@ -569,7 +569,7 @@ public abstract class MessageIdMapperTest { sut.save(message1); MessageId messageId = message1.getMessageId(); - sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); + sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD).block(); List<MailboxMessage> messages = sut.find(ImmutableList.of(messageId), MessageMapper.FetchType.Body); assertThat(messages).hasSize(1); @@ -587,7 +587,7 @@ public abstract class MessageIdMapperTest { MessageId messageId = message1.getMessageId(); Flags newFlags = new Flags(Flag.ANSWERED); - sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE); + sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE).block(); List<MailboxMessage> messages = sut.find(ImmutableList.of(messageId), MessageMapper.FetchType.Body); assertThat(messages).hasSize(1); @@ -606,7 +606,7 @@ public abstract class MessageIdMapperTest { sut.save(message1InOtherMailbox); MessageId messageId = message1.getMessageId(); - sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); + sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD).block(); List<MailboxMessage> messages = sut.find(ImmutableList.of(messageId), MessageMapper.FetchType.Body); assertThat(messages).hasSize(2); @@ -630,7 +630,7 @@ public abstract class MessageIdMapperTest { sut.save(message4); MessageId messageId = message1.getMessageId(); - sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); + sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD).block(); List<MailboxMessage> messages = sut.find(ImmutableList.of(messageId), MessageMapper.FetchType.Body); assertThat(messages).hasSize(1); @@ -653,7 +653,7 @@ public abstract class MessageIdMapperTest { sut.save(message4); MessageId messageId = message1.getMessageId(); - sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); + sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD).block(); List<MailboxMessage> messages = sut.find(ImmutableList.of(messageId), MessageMapper.FetchType.Body); assertThat(messages).hasSize(1); @@ -673,7 +673,7 @@ public abstract class MessageIdMapperTest { .operation((threadNumber, step) -> sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags("custom-" + threadNumber + "-" + step), - FlagsUpdateMode.ADD)) + FlagsUpdateMode.ADD).block()) .threadCount(threadCount) .operationCount(updateCount) .runSuccessfullyWithin(Duration.ofMinutes(1)); @@ -698,12 +698,14 @@ public abstract class MessageIdMapperTest { sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags("custom-" + threadNumber + "-" + step), - FlagsUpdateMode.ADD); + FlagsUpdateMode.ADD) + .block(); } else { sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags("custom-" + threadNumber + "-" + (updateCount - step - 1)), - FlagsUpdateMode.REMOVE); + FlagsUpdateMode.REMOVE) + .block(); } }) .threadCount(threadCount) @@ -787,7 +789,7 @@ public abstract class MessageIdMapperTest { message1.setModSeq(mapperProvider.generateModSeq(benwaInboxMailbox)); sut.save(message1); - sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.SEEN), FlagsUpdateMode.ADD); + sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.SEEN), FlagsUpdateMode.ADD).block(); assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox).getUnseen()).isEqualTo(0); } @@ -799,7 +801,7 @@ public abstract class MessageIdMapperTest { message1.setFlags(new Flags(Flag.SEEN)); sut.save(message1); - sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.SEEN), FlagsUpdateMode.REMOVE); + sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.SEEN), FlagsUpdateMode.REMOVE).block(); assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox).getUnseen()).isEqualTo(1); } @@ -815,7 +817,8 @@ public abstract class MessageIdMapperTest { sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.SEEN), - FlagsUpdateMode.ADD); + FlagsUpdateMode.ADD) + .block(); List<MailboxMessage> messages = sut.find(ImmutableList.of(message1.getMessageId()), MessageMapper.FetchType.Body); assertThat(messages).hasSize(1); @@ -834,7 +837,8 @@ public abstract class MessageIdMapperTest { sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), flags, - FlagsUpdateMode.ADD); + FlagsUpdateMode.ADD) + .block(); List<MailboxMessage> messages = sut.find(ImmutableList.of(message1.getMessageId()), MessageMapper.FetchType.Body); assertThat(messages).hasSize(1); @@ -853,7 +857,8 @@ public abstract class MessageIdMapperTest { Multimap<MailboxId, UpdatedFlags> mailboxIdUpdatedFlagsMap = sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), flags, - FlagsUpdateMode.ADD); + FlagsUpdateMode.ADD) + .block(); assertThat(mailboxIdUpdatedFlagsMap.asMap()) .containsOnly(MapEntry.entry(message1.getMailboxId(), @@ -873,7 +878,7 @@ public abstract class MessageIdMapperTest { message1.setFlags(new Flags(Flag.RECENT)); sut.save(message1); - sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.REMOVE); + sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.REMOVE).block(); assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox).getUnseen()).isEqualTo(1); } @@ -927,7 +932,7 @@ public abstract class MessageIdMapperTest { addMessageAndSetModSeq(benwaInboxMailbox, message1); addMessageAndSetModSeq(benwaInboxMailbox, message1); - sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); + sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD).block(); assertThat(sut.find(ImmutableList.of(message1.getMessageId()), FetchType.Metadata)) .extracting(MailboxMessage::createFlags) @@ -941,7 +946,7 @@ public abstract class MessageIdMapperTest { addMessageAndSetModSeq(benwaInboxMailbox, message1); addMessageAndSetModSeq(benwaInboxMailbox, message1); - Multimap<MailboxId, UpdatedFlags> map = sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); + Multimap<MailboxId, UpdatedFlags> map = sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD).block(); assertThat(map.values()).hasSize(2); assertThat(map.asMap()).hasSize(1); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
