This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e1a4ab80a0144bb3cf4abd6ba50ed932ca6fc4d1 Author: Benoit Tellier <[email protected]> AuthorDate: Tue Jul 7 12:38:24 2020 +0700 JAMES-3265 Flags updates should not infinite loop on denormalization issues Use table of truth upon retries, this avoids the flags operation to keep failing. Note that initial operation uses the projection to fasten range reads. --- .../cassandra/mail/CassandraMessageMapper.java | 9 +++----- .../mail/utils/FlagsUpdateStageResult.java | 14 ++++++------ .../cassandra/mail/CassandraMessageMapperTest.java | 26 ++++++++++++++++++++++ .../mail/utils/FlagsUpdateStageResultTest.java | 20 +++++++++++++---- 4 files changed, 52 insertions(+), 17 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 9fbf32c..fb94eff 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -19,8 +19,6 @@ package org.apache.james.mailbox.cassandra.mail; -import static org.apache.james.util.ReactorUtils.publishIfPresent; - import java.time.Duration; import java.util.Comparator; import java.util.Iterator; @@ -333,11 +331,10 @@ public class CassandraMessageMapper implements MessageMapper { return globalResult; } - private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) { + private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<ComposedMessageId> failed) { if (!failed.isEmpty()) { Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed) - .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid)) - .handle(publishIfPresent()); + .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId()))); return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator); } else { return Mono.empty(); @@ -443,7 +440,7 @@ public class CassandraMessageMapper implements MessageMapper { .newFlags(newFlags) .build()); } else { - return FlagsUpdateStageResult.fail(oldMetaData.getComposedMessageId().getUid()); + return FlagsUpdateStageResult.fail(oldMetaData.getComposedMessageId()); } }); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java index df2858f..ba3eafa 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java @@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.mail.utils; import java.util.List; import java.util.Objects; -import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.UpdatedFlags; import com.google.common.annotations.VisibleForTesting; @@ -33,24 +33,24 @@ public class FlagsUpdateStageResult { return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of(updatedFlags)); } - public static FlagsUpdateStageResult fail(MessageUid uid) { - return new FlagsUpdateStageResult(ImmutableList.of(uid), ImmutableList.of()); + public static FlagsUpdateStageResult fail(ComposedMessageId ids) { + return new FlagsUpdateStageResult(ImmutableList.of(ids), ImmutableList.of()); } public static FlagsUpdateStageResult none() { return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of()); } - private final ImmutableList<MessageUid> failed; + private final ImmutableList<ComposedMessageId> failed; private final ImmutableList<UpdatedFlags> succeeded; @VisibleForTesting - FlagsUpdateStageResult(ImmutableList<MessageUid> failed, ImmutableList<UpdatedFlags> succeeded) { + FlagsUpdateStageResult(ImmutableList<ComposedMessageId> failed, ImmutableList<UpdatedFlags> succeeded) { this.failed = failed; this.succeeded = succeeded; } - public List<MessageUid> getFailed() { + public List<ComposedMessageId> getFailed() { return failed; } @@ -60,7 +60,7 @@ public class FlagsUpdateStageResult { public FlagsUpdateStageResult merge(FlagsUpdateStageResult other) { return new FlagsUpdateStageResult( - ImmutableList.<MessageUid>builder() + ImmutableList.<ComposedMessageId>builder() .addAll(this.failed) .addAll(other.failed) .build(), diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java index 0e17312..75e9146 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java @@ -64,6 +64,32 @@ class CassandraMessageMapperTest extends MessageMapperTest { @Nested class StatementLimitationTests { @Test + void updateFlagsShouldNotRetryOnDeletedMessages(CassandraCluster cassandra) throws MailboxException { + saveMessages(); + + cassandra.getConf().printStatements(); + cassandra.getConf() + .registerScenario(fail() + .forever() + .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;")); + try { + messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid())); + } catch (Exception e) { + // expected + } + + StatementRecorder statementRecorder = new StatementRecorder(); + cassandra.getConf().recordStatements(statementRecorder); + + FlagsUpdateCalculator markAsRead = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.ADD); + messageMapper.updateFlags(benwaInboxMailbox, markAsRead, MessageRange.all()); + + assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement( + "UPDATE modseq SET nextModseq=:nextModseq WHERE mailboxId=:mailboxId IF nextModseq=:modSeqCondition;"))) + .hasSize(2); + } + + @Test void deleteMessagesShouldGroupMessageReads(CassandraCluster cassandra) throws MailboxException { saveMessages(); diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java index 3317b6b..923a9a1 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java @@ -21,29 +21,41 @@ package org.apache.james.mailbox.cassandra.mail.utils; import static org.assertj.core.api.Assertions.assertThat; +import java.util.UUID; + import javax.mail.Flags; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.ModSeq; +import org.apache.james.mailbox.cassandra.ids.CassandraId; +import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; +import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.UpdatedFlags; import org.junit.jupiter.api.Test; +import com.datastax.driver.core.utils.UUIDs; import com.google.common.collect.ImmutableList; import nl.jqno.equalsverifier.EqualsVerifier; class FlagsUpdateStageResultTest { - private static final MessageUid UID = MessageUid.of(1L); - private static final MessageUid OTHER_UID = MessageUid.of(2L); + private static final ComposedMessageId UID = new ComposedMessageId( + CassandraId.of(UUID.fromString("464765a0-e4e7-11e4-aba4-710c1de3782b")), + new CassandraMessageId.Factory().of(UUIDs.timeBased()), + MessageUid.of(1L)); + private static final ComposedMessageId OTHER_UID = new ComposedMessageId( + CassandraId.of(UUID.fromString("464765a0-e4e7-11e4-aba4-710c1de3782b")), + new CassandraMessageId.Factory().of(UUIDs.timeBased()), + MessageUid.of(2L)); private static final UpdatedFlags UPDATED_FLAGS = UpdatedFlags.builder() - .uid(UID) + .uid(UID.getUid()) .modSeq(ModSeq.of(18)) .oldFlags(new Flags()) .newFlags(new Flags(Flags.Flag.SEEN)) .build(); private static final UpdatedFlags OTHER_UPDATED_FLAGS = UpdatedFlags.builder() - .uid(OTHER_UID) + .uid(OTHER_UID.getUid()) .modSeq(ModSeq.of(18)) .oldFlags(new Flags()) .newFlags(new Flags(Flags.Flag.SEEN)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
