This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/postgresql by this push:
     new ae536b8dbc JAMES-2586 Fix sequential issue with updating flags in the 
reactive pipeline
ae536b8dbc is described below

commit ae536b8dbc8ab2f56d6eea5610c9a0e175a4703e
Author: Rene Cordier <[email protected]>
AuthorDate: Fri May 24 16:49:02 2024 +0700

    JAMES-2586 Fix sequential issue with updating flags in the reactive pipeline
---
 .../postgres/mail/PostgresMessageMapper.java       |  3 +-
 .../store/mail/model/MessageMapperTest.java        | 46 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
index ff00ee4e2f..5112324b10 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.postgres.mail;
 
 import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -290,7 +291,7 @@ public class PostgresMessageMapper implements MessageMapper 
{
                                             FlagsUpdateCalculator 
flagsUpdateCalculator) {
         return modSeqProvider.nextModSeqReactive(mailbox.getMailboxId())
             .flatMapMany(newModSeq -> Flux.fromIterable(listMessagesMetaData)
-                .flatMap(messageMetaData -> updateFlags(messageMetaData, 
flagsUpdateCalculator, newModSeq)));
+                .flatMapSequential(messageMetaData -> 
updateFlags(messageMetaData, flagsUpdateCalculator, newModSeq), 
DEFAULT_CONCURRENCY));
     }
 
     private Mono<UpdatedFlags> updateFlags(ComposedMessageIdWithMetaData 
currentMetaData,
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
index 469b74e0b5..ae012ea0e6 100644
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
+++ 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
@@ -60,6 +60,7 @@ import 
org.apache.james.mailbox.store.mail.model.MapperProvider.Capabilities;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.apache.james.util.streams.Iterators;
 import org.apache.james.utils.UpdatableTickingClock;
 import org.junit.Assume;
 import org.junit.jupiter.api.BeforeEach;
@@ -850,6 +851,51 @@ public abstract class MessageMapperTest {
             .hasSize(5);
     }
 
+    @Test
+    void updateFlagsOnRangeShouldReturnUpdatedFlagsWithUidOrderAsc() throws 
MailboxException {
+        saveMessages();
+
+        Iterator<UpdatedFlags> it = 
messageMapper.updateFlags(benwaInboxMailbox,
+            new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), 
FlagsUpdateMode.REPLACE),
+            MessageRange.range(message1.getUid(), message3.getUid()));
+        List<MessageUid> updatedFlagsUids = Iterators.toStream(it)
+            .map(UpdatedFlags::getUid)
+            .collect(ImmutableList.toImmutableList());
+
+        assertThat(updatedFlagsUids)
+            .containsExactly(message1.getUid(), message2.getUid(), 
message3.getUid());
+    }
+
+    @Test
+    void updateFlagsWithRangeFromShouldReturnUpdatedFlagsWithUidOrderAsc() 
throws MailboxException {
+        saveMessages();
+
+        Iterator<UpdatedFlags> it = 
messageMapper.updateFlags(benwaInboxMailbox,
+            new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), 
FlagsUpdateMode.REPLACE),
+            MessageRange.from(message3.getUid()));
+        List<MessageUid> updatedFlagsUids = Iterators.toStream(it)
+            .map(UpdatedFlags::getUid)
+            .collect(ImmutableList.toImmutableList());
+
+        assertThat(updatedFlagsUids)
+            .containsExactly(message3.getUid(), message4.getUid(), 
message5.getUid());
+    }
+
+    @Test
+    void updateFlagsWithRangeAllRangeShouldReturnUpdatedFlagsWithUidOrderAsc() 
throws MailboxException {
+        saveMessages();
+
+        Iterator<UpdatedFlags> it = 
messageMapper.updateFlags(benwaInboxMailbox,
+            new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), 
FlagsUpdateMode.REPLACE),
+            MessageRange.all());
+        List<MessageUid> updatedFlagsUids = Iterators.toStream(it)
+            .map(UpdatedFlags::getUid)
+            .collect(ImmutableList.toImmutableList());
+
+        assertThat(updatedFlagsUids)
+            .containsExactly(message1.getUid(), message2.getUid(), 
message3.getUid(), message4.getUid(), message5.getUid());
+    }
+
     @Test
     void messagePropertiesShouldBeStored() throws Exception {
         PropertyBuilder propBuilder = new PropertyBuilder();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to