This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/postgresql by this push:
new ae536b8dbc JAMES-2586 Fix sequential issue with updating flags in the
reactive pipeline
ae536b8dbc is described below
commit ae536b8dbc8ab2f56d6eea5610c9a0e175a4703e
Author: Rene Cordier <[email protected]>
AuthorDate: Fri May 24 16:49:02 2024 +0700
JAMES-2586 Fix sequential issue with updating flags in the reactive pipeline
---
.../postgres/mail/PostgresMessageMapper.java | 3 +-
.../store/mail/model/MessageMapperTest.java | 46 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 1 deletion(-)
diff --git
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
index ff00ee4e2f..5112324b10 100644
---
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
+++
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
@@ -20,6 +20,7 @@
package org.apache.james.mailbox.postgres.mail;
import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
import java.io.IOException;
import java.io.InputStream;
@@ -290,7 +291,7 @@ public class PostgresMessageMapper implements MessageMapper
{
FlagsUpdateCalculator
flagsUpdateCalculator) {
return modSeqProvider.nextModSeqReactive(mailbox.getMailboxId())
.flatMapMany(newModSeq -> Flux.fromIterable(listMessagesMetaData)
- .flatMap(messageMetaData -> updateFlags(messageMetaData,
flagsUpdateCalculator, newModSeq)));
+ .flatMapSequential(messageMetaData ->
updateFlags(messageMetaData, flagsUpdateCalculator, newModSeq),
DEFAULT_CONCURRENCY));
}
private Mono<UpdatedFlags> updateFlags(ComposedMessageIdWithMetaData
currentMetaData,
diff --git
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
index 469b74e0b5..ae012ea0e6 100644
---
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
+++
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
@@ -60,6 +60,7 @@ import
org.apache.james.mailbox.store.mail.model.MapperProvider.Capabilities;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.apache.james.util.streams.Iterators;
import org.apache.james.utils.UpdatableTickingClock;
import org.junit.Assume;
import org.junit.jupiter.api.BeforeEach;
@@ -850,6 +851,51 @@ public abstract class MessageMapperTest {
.hasSize(5);
}
+ @Test
+ void updateFlagsOnRangeShouldReturnUpdatedFlagsWithUidOrderAsc() throws
MailboxException {
+ saveMessages();
+
+ Iterator<UpdatedFlags> it =
messageMapper.updateFlags(benwaInboxMailbox,
+ new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN),
FlagsUpdateMode.REPLACE),
+ MessageRange.range(message1.getUid(), message3.getUid()));
+ List<MessageUid> updatedFlagsUids = Iterators.toStream(it)
+ .map(UpdatedFlags::getUid)
+ .collect(ImmutableList.toImmutableList());
+
+ assertThat(updatedFlagsUids)
+ .containsExactly(message1.getUid(), message2.getUid(),
message3.getUid());
+ }
+
+ @Test
+ void updateFlagsWithRangeFromShouldReturnUpdatedFlagsWithUidOrderAsc()
throws MailboxException {
+ saveMessages();
+
+ Iterator<UpdatedFlags> it =
messageMapper.updateFlags(benwaInboxMailbox,
+ new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN),
FlagsUpdateMode.REPLACE),
+ MessageRange.from(message3.getUid()));
+ List<MessageUid> updatedFlagsUids = Iterators.toStream(it)
+ .map(UpdatedFlags::getUid)
+ .collect(ImmutableList.toImmutableList());
+
+ assertThat(updatedFlagsUids)
+ .containsExactly(message3.getUid(), message4.getUid(),
message5.getUid());
+ }
+
+ @Test
+ void updateFlagsWithRangeAllRangeShouldReturnUpdatedFlagsWithUidOrderAsc()
throws MailboxException {
+ saveMessages();
+
+ Iterator<UpdatedFlags> it =
messageMapper.updateFlags(benwaInboxMailbox,
+ new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN),
FlagsUpdateMode.REPLACE),
+ MessageRange.all());
+ List<MessageUid> updatedFlagsUids = Iterators.toStream(it)
+ .map(UpdatedFlags::getUid)
+ .collect(ImmutableList.toImmutableList());
+
+ assertThat(updatedFlagsUids)
+ .containsExactly(message1.getUid(), message2.getUid(),
message3.getUid(), message4.getUid(), message5.getUid());
+ }
+
@Test
void messagePropertiesShouldBeStored() throws Exception {
PropertyBuilder propBuilder = new PropertyBuilder();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]