This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4900a05607e7005654bc1347c3e0e6c700a155a3 Author: Benoit Tellier <[email protected]> AuthorDate: Tue Mar 31 17:00:24 2020 +0700 JAMES-3130 MessageIdMapper::setFlags should return all results When a message (identified by its messageId) is stored several time in the same mailbox --- .../cassandra/mail/CassandraMessageIdMapper.java | 22 ++++----- .../inmemory/mail/InMemoryMessageIdMapper.java | 16 ++----- .../james/mailbox/store/StoreMessageIdManager.java | 13 +++--- .../james/mailbox/store/mail/MessageIdMapper.java | 12 ++++- .../store/mail/model/MessageIdMapperTest.java | 54 ++++++++++++++-------- 5 files changed, 67 insertions(+), 50 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 3aeb060..0eeca22 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -21,9 +21,7 @@ package org.apache.james.mailbox.cassandra.mail; import java.util.Collection; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.function.BiFunction; import javax.mail.Flags; @@ -59,7 +57,6 @@ import reactor.core.scheduler.Schedulers; public class CassandraMessageIdMapper implements MessageIdMapper { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class); - public static final BiFunction<UpdatedFlags, UpdatedFlags, UpdatedFlags> KEEP_FIRST = (a, b) -> a; private final MailboxMapper mailboxMapper; private final CassandraMailboxDAO mailboxDAO; @@ -207,14 +204,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper { } @Override - public Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { + public Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { return Flux.fromIterable(mailboxIds) .distinct() .map(mailboxId -> (CassandraId) mailboxId) .filterWhen(mailboxId -> haveMetaData(messageId, mailboxId)) .concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId)) .flatMap(this::updateCounts) - .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight, KEEP_FIRST)) + .collect(Guavate.toImmutableListMultimap(Pair::getLeft, Pair::getRight)) .block(); } @@ -223,15 +220,16 @@ public class CassandraMessageIdMapper implements MessageIdMapper { .hasElements(); } - private Mono<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { + private Flux<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId)) .single() .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry()) - .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft())) .onErrorResume(MailboxDeleteDuringUpdateException.class, e -> { LOGGER.info("Mailbox {} was deleted during flag update", mailboxId); return Mono.empty(); - }); + }) + .flatMapMany(Flux::fromIterable) + .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft())); } private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) { @@ -250,7 +248,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { .thenReturn(pair); } - private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { + private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { try { return updateFlags(mailboxId, messageId, newState, updateMode); } catch (MailboxException e) { @@ -259,12 +257,12 @@ public class CassandraMessageIdMapper implements MessageIdMapper { } } - private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { + private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { CassandraId cassandraId = (CassandraId) mailboxId; return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId)) .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId)) - .next() - .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new)); + .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new)) + .collectList(); } private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) { diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java index a9dbaf5..bc34599 100644 --- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java +++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java @@ -23,10 +23,7 @@ import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITE import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.function.BinaryOperator; import java.util.function.Function; -import java.util.stream.Collectors; import javax.mail.Flags; @@ -47,11 +44,9 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage; import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Multimap; public class InMemoryMessageIdMapper implements MessageIdMapper { - private static final BinaryOperator<UpdatedFlags> KEEP_FIRST = (p, q) -> p; - private final MailboxMapper mailboxMapper; private final InMemoryMessageMapper messageMapper; @@ -120,17 +115,16 @@ public class InMemoryMessageIdMapper implements MessageIdMapper { } @Override - public Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, - Flags newState, FlagsUpdateMode updateMode) throws MailboxException { + public Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, + Flags newState, FlagsUpdateMode updateMode) { return find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata) .stream() .filter(message -> mailboxIds.contains(message.getMailboxId())) .map(updateMessage(newState, updateMode)) .distinct() - .collect(Guavate.toImmutableMap( + .collect(Guavate.toImmutableListMultimap( Pair::getKey, - Pair::getValue, - KEEP_FIRST)); + Pair::getValue)); } private Function<MailboxMessage, Pair<MailboxId, UpdatedFlags>> updateMessage(Flags newState, FlagsUpdateMode updateMode) { diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index 7f10620..7e37bf8 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -74,6 +74,7 @@ import com.github.fge.lambdas.functions.ThrowingFunction; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import reactor.core.publisher.Flux; @@ -117,9 +118,9 @@ public class StoreMessageIdManager implements MessageIdManager { assertRightsOnMailboxes(mailboxIds, mailboxSession, Right.Write); - Map<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace); - for (Map.Entry<MailboxId, UpdatedFlags> entry : updatedFlags.entrySet()) { - dispatchFlagsChange(mailboxSession, entry.getKey(), entry.getValue()); + Multimap<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace); + for (Map.Entry<MailboxId, Collection<UpdatedFlags>> entry : updatedFlags.asMap().entrySet()) { + dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue())); } } @@ -322,15 +323,15 @@ public class StoreMessageIdManager implements MessageIdManager { } } - private void dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, UpdatedFlags updatedFlags) throws MailboxException { - if (updatedFlags.flagsChanged()) { + private void dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags) throws MailboxException { + if (updatedFlags.stream().anyMatch(UpdatedFlags::flagsChanged)) { Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId); eventBus.dispatch(EventFactory.flagsUpdated() .randomEventId() .mailboxSession(mailboxSession) .mailbox(mailbox) - .updatedFlag(updatedFlags) + .updatedFlags(updatedFlags) .build(), new MailboxIdRegistrationKey(mailboxId)) .block(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java index 2f9457f..094ce72 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java @@ -20,7 +20,6 @@ package org.apache.james.mailbox.store.mail; import java.util.Collection; import java.util.List; -import java.util.Map; import javax.mail.Flags; @@ -54,5 +53,14 @@ public interface MessageIdMapper { .forEach(this::delete); } - Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException; + /** + * Updates the flags of the messages with the given MessageId in the supplied mailboxes + * + * More one message can be updated when a message is contained several time in the same mailbox with distinct + * MessageUid. + * + * @return Metadata of the update, indexed by mailboxIds. + * @throws MailboxException + */ + Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException; } diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java index 3de1f43..0c9fc61 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java @@ -60,6 +60,7 @@ import org.junit.jupiter.api.Test; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Multimap; public abstract class MessageIdMapperTest { private static final Username BENWA = Username.of("benwa"); @@ -367,7 +368,7 @@ public abstract class MessageIdMapperTest { MessageId messageId = message1.getMessageId(); Flags newFlags = new Flags(Flag.ANSWERED); - Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.ADD); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.ADD); ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox); UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder() @@ -376,7 +377,7 @@ public abstract class MessageIdMapperTest { .oldFlags(new Flags()) .newFlags(newFlags) .build(); - assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags)); + assertThat(flags.asMap()).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags))); } @Test @@ -394,7 +395,7 @@ public abstract class MessageIdMapperTest { .add("userflag") .build(); - Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REPLACE); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REPLACE); ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox); UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder() @@ -404,7 +405,8 @@ public abstract class MessageIdMapperTest { .newFlags(newFlags) .build(); - assertThat(flags).contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags)); + assertThat(flags.asMap()) + .contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags))); } @Test @@ -422,7 +424,7 @@ public abstract class MessageIdMapperTest { .add("userflag") .build(); - Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE); ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox); UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder() @@ -432,7 +434,8 @@ public abstract class MessageIdMapperTest { .newFlags(new Flags(Flags.Flag.RECENT)) .build(); - assertThat(flags).contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags)); + assertThat(flags.asMap()) + .contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags))); } @Test @@ -466,17 +469,17 @@ public abstract class MessageIdMapperTest { MessageId messageId = message1.getMessageId(); Flags newFlags = new Flags(Flag.ANSWERED); - Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE); - assertThat(flags).isEmpty(); + assertThat(flags.asMap()).isEmpty(); } @Test void setFlagsShouldReturnEmptyWhenMessageIdDoesntExist() throws Exception { MessageId unknownMessageId = mapperProvider.generateMessageId(); - Map<MailboxId, UpdatedFlags> flags = sut.setFlags(unknownMessageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(unknownMessageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE); - assertThat(flags).isEmpty(); + assertThat(flags.asMap()).isEmpty(); } @Test @@ -489,7 +492,7 @@ public abstract class MessageIdMapperTest { MessageId messageId = message1.getMessageId(); - Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); Flags newFlags = new FlagsBuilder() .add(Flag.RECENT) @@ -502,7 +505,8 @@ public abstract class MessageIdMapperTest { .oldFlags(initialFlags) .newFlags(newFlags) .build(); - assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags)); + assertThat(flags.asMap()) + .containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags))); } @Test @@ -518,7 +522,7 @@ public abstract class MessageIdMapperTest { MessageId messageId = message1.getMessageId(); Flags newFlags = new Flags(Flag.ANSWERED); - Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), newFlags, FlagsUpdateMode.ADD); + Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), newFlags, FlagsUpdateMode.ADD); ModSeq modSeqBenwaInboxMailbox = mapperProvider.highestModSeq(benwaInboxMailbox); ModSeq modSeqBenwaWorkMailbox = mapperProvider.highestModSeq(benwaWorkMailbox); @@ -534,8 +538,9 @@ public abstract class MessageIdMapperTest { .oldFlags(new Flags()) .newFlags(newFlags) .build(); - assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags), - MapEntry.entry(message1InOtherMailbox.getMailboxId(), expectedUpdatedFlags2)); + assertThat(flags.asMap()) + .containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)), + MapEntry.entry(message1InOtherMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags2))); } @Test @@ -857,19 +862,19 @@ public abstract class MessageIdMapperTest { message1.setFlags(flags); sut.save(message1); - Map<MailboxId, UpdatedFlags> mailboxIdUpdatedFlagsMap = sut.setFlags(message1.getMessageId(), + Multimap<MailboxId, UpdatedFlags> mailboxIdUpdatedFlagsMap = sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), flags, FlagsUpdateMode.ADD); - assertThat(mailboxIdUpdatedFlagsMap) + assertThat(mailboxIdUpdatedFlagsMap.asMap()) .containsOnly(MapEntry.entry(message1.getMailboxId(), - UpdatedFlags.builder() + ImmutableList.of(UpdatedFlags.builder() .modSeq(modSeq) .uid(message1.getUid()) .newFlags(flags) .oldFlags(flags) - .build())); + .build()))); } @Test @@ -943,6 +948,17 @@ public abstract class MessageIdMapperTest { } @Test + void setFlagsShouldReturnAllUp() throws Exception { + addMessageAndSetModSeq(benwaInboxMailbox, message1); + addMessageAndSetModSeq(benwaInboxMailbox, message1); + + Multimap<MailboxId, UpdatedFlags> map = sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); + + assertThat(map.values()).hasSize(2); + assertThat(map.asMap()).hasSize(1); + } + + @Test void deletesShouldUpdateUnreadCount() throws Exception { message1.setUid(mapperProvider.generateMessageUid()); message1.setModSeq(mapperProvider.generateModSeq(benwaInboxMailbox)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
