This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8227159a964d5b621767b0b29b3aee31c77f80be
Author: Benoit Tellier <[email protected]>
AuthorDate: Sat Mar 28 15:11:52 2020 +0700

    JAMES-3130 Update stored state when a message is two time in the same 
mailbox
    
    If a messageId is contained 2 times in a single mailbox with 2 different
    uids update will fail with a
    `java.lang.IndexOutOfBoundsException: Source emitted more than one item`
    error.
---
 .../cassandra/mail/CassandraMessageIdMapper.java   | 27 +++++++++++-----------
 .../inmemory/mail/InMemoryMessageIdMapper.java     | 10 +++++++-
 .../store/mail/model/MessageIdMapperTest.java      | 17 +++++++++++++-
 3 files changed, 39 insertions(+), 15 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 750b6bd..3aeb060 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -23,6 +23,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.BiFunction;
 
 import javax.mail.Flags;
 
@@ -58,6 +59,7 @@ import reactor.core.scheduler.Schedulers;
 
 public class CassandraMessageIdMapper implements MessageIdMapper {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraMessageIdMapper.class);
+    public static final BiFunction<UpdatedFlags, UpdatedFlags, UpdatedFlags> 
KEEP_FIRST = (a, b) -> a;
 
     private final MailboxMapper mailboxMapper;
     private final CassandraMailboxDAO mailboxDAO;
@@ -212,7 +214,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
             .filterWhen(mailboxId -> haveMetaData(messageId, mailboxId))
             .concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, 
mailboxId, messageId))
             .flatMap(this::updateCounts)
-            .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight))
+            .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight, 
KEEP_FIRST))
             .block();
     }
 
@@ -222,15 +224,14 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     }
 
     private Mono<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags 
newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, 
MessageId messageId) {
-        try {
-            return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, 
mailboxId, messageId))
-                .single()
-                
.retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
-                .map(pair -> buildUpdatedFlags(pair.getRight(), 
pair.getLeft()));
-        } catch (MailboxDeleteDuringUpdateException e) {
-            LOGGER.info("Mailbox {} was deleted during flag update", 
mailboxId);
-            return Mono.empty();
-        }
+        return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, 
mailboxId, messageId))
+            .single()
+            .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
+            .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()))
+            .onErrorResume(MailboxDeleteDuringUpdateException.class, e -> {
+                LOGGER.info("Mailbox {} was deleted during flag update", 
mailboxId);
+                return Mono.empty();
+            });
     }
 
     private Pair<MailboxId, UpdatedFlags> 
buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, 
Flags oldFlags) {
@@ -261,9 +262,9 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> 
updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, 
MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
         CassandraId cassandraId = (CassandraId) mailboxId;
         return imapUidDAO.retrieve((CassandraMessageId) messageId, 
Optional.of(cassandraId))
-            .single()
-            .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
-            .flatMap(oldComposedId -> updateFlags(newState, updateMode, 
cassandraId, oldComposedId));
+            .flatMap(oldComposedId -> updateFlags(newState, updateMode, 
cassandraId, oldComposedId))
+            .next()
+            
.switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new));
     }
 
     private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags 
newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, 
ComposedMessageIdWithMetaData oldComposedId) {
diff --git 
a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
 
b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
index c381f5b..a9dbaf5 100644
--- 
a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
+++ 
b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
@@ -24,7 +24,9 @@ import static 
org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITE
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BinaryOperator;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import javax.mail.Flags;
 
@@ -45,8 +47,10 @@ import 
org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 public class InMemoryMessageIdMapper implements MessageIdMapper {
+    private static final BinaryOperator<UpdatedFlags> KEEP_FIRST = (p, q) -> p;
 
     private final MailboxMapper mailboxMapper;
     private final InMemoryMessageMapper messageMapper;
@@ -122,7 +126,11 @@ public class InMemoryMessageIdMapper implements 
MessageIdMapper {
             .stream()
             .filter(message -> mailboxIds.contains(message.getMailboxId()))
             .map(updateMessage(newState, updateMode))
-            .collect(Guavate.entriesToMap());
+            .distinct()
+            .collect(Guavate.toImmutableMap(
+                Pair::getKey,
+                Pair::getValue,
+                KEEP_FIRST));
     }
 
     private Function<MailboxMessage, Pair<MailboxId, UpdatedFlags>> 
updateMessage(Flags newState, FlagsUpdateMode updateMode) {
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
index 9939095..3de1f43 100644
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
+++ 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
@@ -42,6 +42,7 @@ import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
@@ -71,7 +72,7 @@ public abstract class MessageIdMapperTest {
     private MailboxMapper mailboxMapper;
     private MessageIdMapper sut;
 
-    private Mailbox benwaInboxMailbox;
+    protected Mailbox benwaInboxMailbox;
     private Mailbox benwaWorkMailbox;
     
     protected SimpleMailboxMessage message1;
@@ -928,6 +929,20 @@ public abstract class MessageIdMapperTest {
     }
 
     @Test
+    void setFlagsShouldUpdateTwoMessagesInTheSameMailboxWithTheSameMessageId() 
throws Exception {
+        addMessageAndSetModSeq(benwaInboxMailbox, message1);
+        addMessageAndSetModSeq(benwaInboxMailbox, message1);
+
+        sut.setFlags(message1.getMessageId(), 
ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), 
FlagsUpdateMode.ADD);
+
+        assertThat(sut.find(ImmutableList.of(message1.getMessageId()), 
FetchType.Metadata))
+            .extracting(MailboxMessage::createFlags)
+            .containsExactly(
+                new Flags(Flag.ANSWERED),
+                new Flags(Flag.ANSWERED));
+    }
+
+    @Test
     void deletesShouldUpdateUnreadCount() throws Exception {
         message1.setUid(mapperProvider.generateMessageUid());
         message1.setModSeq(mapperProvider.generateModSeq(benwaInboxMailbox));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to