This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 8227159a964d5b621767b0b29b3aee31c77f80be Author: Benoit Tellier <[email protected]> AuthorDate: Sat Mar 28 15:11:52 2020 +0700 JAMES-3130 Update stored state when a message is two time in the same mailbox If a messageId is contained 2 times in a single mailbox with 2 different uids update will fail with a `java.lang.IndexOutOfBoundsException: Source emitted more than one item` error. --- .../cassandra/mail/CassandraMessageIdMapper.java | 27 +++++++++++----------- .../inmemory/mail/InMemoryMessageIdMapper.java | 10 +++++++- .../store/mail/model/MessageIdMapperTest.java | 17 +++++++++++++- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 750b6bd..3aeb060 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -23,6 +23,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.BiFunction; import javax.mail.Flags; @@ -58,6 +59,7 @@ import reactor.core.scheduler.Schedulers; public class CassandraMessageIdMapper implements MessageIdMapper { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class); + public static final BiFunction<UpdatedFlags, UpdatedFlags, UpdatedFlags> KEEP_FIRST = (a, b) -> a; private final MailboxMapper mailboxMapper; private final CassandraMailboxDAO mailboxDAO; @@ -212,7 +214,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { .filterWhen(mailboxId -> haveMetaData(messageId, mailboxId)) .concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId)) .flatMap(this::updateCounts) - .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight)) + .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight, KEEP_FIRST)) .block(); } @@ -222,15 +224,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper { } private Mono<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { - try { - return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId)) - .single() - .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry()) - .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft())); - } catch (MailboxDeleteDuringUpdateException e) { - LOGGER.info("Mailbox {} was deleted during flag update", mailboxId); - return Mono.empty(); - } + return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId)) + .single() + .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry()) + .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft())) + .onErrorResume(MailboxDeleteDuringUpdateException.class, e -> { + LOGGER.info("Mailbox {} was deleted during flag update", mailboxId); + return Mono.empty(); + }); } private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) { @@ -261,9 +262,9 @@ public class CassandraMessageIdMapper implements MessageIdMapper { private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { CassandraId cassandraId = (CassandraId) mailboxId; return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId)) - .single() - .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new)) - .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId)); + .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId)) + .next() + .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new)); } private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) { diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java index c381f5b..a9dbaf5 100644 --- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java +++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java @@ -24,7 +24,9 @@ import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITE import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.stream.Collectors; import javax.mail.Flags; @@ -45,8 +47,10 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage; import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; public class InMemoryMessageIdMapper implements MessageIdMapper { + private static final BinaryOperator<UpdatedFlags> KEEP_FIRST = (p, q) -> p; private final MailboxMapper mailboxMapper; private final InMemoryMessageMapper messageMapper; @@ -122,7 +126,11 @@ public class InMemoryMessageIdMapper implements MessageIdMapper { .stream() .filter(message -> mailboxIds.contains(message.getMailboxId())) .map(updateMessage(newState, updateMode)) - .collect(Guavate.entriesToMap()); + .distinct() + .collect(Guavate.toImmutableMap( + Pair::getKey, + Pair::getValue, + KEEP_FIRST)); } private Function<MailboxMessage, Pair<MailboxId, UpdatedFlags>> updateMessage(Flags newState, FlagsUpdateMode updateMode) { diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java index 9939095..3de1f43 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java @@ -42,6 +42,7 @@ import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.model.MessageMetaData; import org.apache.james.mailbox.model.UidValidity; import org.apache.james.mailbox.model.UpdatedFlags; import org.apache.james.mailbox.store.mail.MailboxMapper; @@ -71,7 +72,7 @@ public abstract class MessageIdMapperTest { private MailboxMapper mailboxMapper; private MessageIdMapper sut; - private Mailbox benwaInboxMailbox; + protected Mailbox benwaInboxMailbox; private Mailbox benwaWorkMailbox; protected SimpleMailboxMessage message1; @@ -928,6 +929,20 @@ public abstract class MessageIdMapperTest { } @Test + void setFlagsShouldUpdateTwoMessagesInTheSameMailboxWithTheSameMessageId() throws Exception { + addMessageAndSetModSeq(benwaInboxMailbox, message1); + addMessageAndSetModSeq(benwaInboxMailbox, message1); + + sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD); + + assertThat(sut.find(ImmutableList.of(message1.getMessageId()), FetchType.Metadata)) + .extracting(MailboxMessage::createFlags) + .containsExactly( + new Flags(Flag.ANSWERED), + new Flags(Flag.ANSWERED)); + } + + @Test void deletesShouldUpdateUnreadCount() throws Exception { message1.setUid(mapperProvider.generateMessageUid()); message1.setModSeq(mapperProvider.generateModSeq(benwaInboxMailbox)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
