This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit fa9136cdc3ad53ee564ce629324b399e4f7f761f Author: Benoit Tellier <[email protected]> AuthorDate: Thu May 7 11:25:19 2020 +0700 JAMES-3143 Add concurrency tests for message projection corrections --- .../SolveMessageInconsistenciesServiceTest.java | 95 ++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java index 06b7054..9b60935 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.cassandra.mail.task; +import static org.apache.james.backends.cassandra.Scenario.Builder.awaitOn; import static org.apache.james.backends.cassandra.Scenario.Builder.fail; import static org.assertj.core.api.Assertions.assertThat; @@ -28,6 +29,7 @@ import javax.mail.Flags; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.Scenario; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.mailbox.MessageUid; @@ -47,6 +49,9 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + public class SolveMessageInconsistenciesServiceTest { private static final CassandraId MAILBOX_ID = CassandraId.timeBased(); @@ -150,6 +155,65 @@ public class SolveMessageInconsistenciesServiceTest { } @Test + void shouldNotConsiderPendingMessageUpdatesAsInconsistency(CassandraCluster cassandra) throws Exception { + imapUidDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block(); + messageIdDAO.insert(MESSAGE_1).block(); + + Scenario.Barrier barrier = new Scenario.Barrier(1); + cassandra.getConf() + .registerScenario(awaitOn(barrier) + .thenExecuteNormally() + .times(1) + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;")); + + Context context = new Context(); + Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache(); + task.subscribe(); + + barrier.awaitCaller(); + messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block(); + barrier.releaseCaller(); + + task.block(); + + // Verify that no inconsistency is fixed + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedImapUidEntries(1) + .processedMessageIdEntries(1) + .build()); + } + + @Test + void shouldNotConsiderPendingMessageInsertsAsInconsistency(CassandraCluster cassandra) throws Exception { + imapUidDAO.insert(MESSAGE_1).block(); + + Scenario.Barrier barrier = new Scenario.Barrier(1); + cassandra.getConf() + .registerScenario(awaitOn(barrier) + .thenExecuteNormally() + .times(1) + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;")); + + Context context = new Context(); + Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache(); + task.subscribe(); + + barrier.awaitCaller(); + messageIdDAO.insert(MESSAGE_1).block(); + barrier.releaseCaller(); + + task.block(); + + // Verify that no inconsistency is fixed + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedImapUidEntries(1) + .processedMessageIdEntries(0) + .build()); + } + + @Test void fixMessageInconsistenciesShouldResolveInconsistentData() { imapUidDAO.insert(MESSAGE_1).block(); @@ -314,6 +378,37 @@ public class SolveMessageInconsistenciesServiceTest { } @Test + void shouldNotConsiderPendingMessageDeleteAsInconsistency(CassandraCluster cassandra) throws Exception { + messageIdDAO.insert(MESSAGE_1).block(); + + Scenario.Barrier barrier = new Scenario.Barrier(1); + cassandra.getConf() + .registerScenario(awaitOn(barrier) + .thenExecuteNormally() + .times(1) + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted," + + "flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable " + + "WHERE mailboxId=:mailboxId AND uid=:uid;")); + + Context context = new Context(); + Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache(); + task.subscribe(); + + barrier.awaitCaller(); + messageIdDAO.delete(MAILBOX_ID, MESSAGE_UID_1).block(); + barrier.releaseCaller(); + + task.block(); + + // Verify that no inconsistency is fixed + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedImapUidEntries(0) + .processedMessageIdEntries(1) + .build()); + } + + @Test void fixMessageInconsistenciesShouldResolveInconsistentData() { messageIdDAO.insert(MESSAGE_1).block(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
