This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 91491b393a5e80248d8e4f150d7868dfcf825a46 Author: Benoit Tellier <[email protected]> AuthorDate: Wed Apr 29 15:16:21 2020 +0700 JAMES-3140 Better handle CassandraModSeqRetries In an empty modseq, retires did not include the initial row creation, but only its update. (Triggered because "100 mail should be well received" runs way faster with the blob store cache.) --- .../apache/james/backends/cassandra/Scenario.java | 8 ++- .../backends/cassandra/TestingSessionTest.java | 23 ++++----- .../cassandra/mail/CassandraModSeqProvider.java | 16 ++---- .../mail/CassandraModSeqProviderTest.java | 57 ++++++++++++++++++++++ 4 files changed, 79 insertions(+), 25 deletions(-) diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java index f2f3682..da3758d 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java @@ -33,10 +33,16 @@ import com.datastax.driver.core.Statement; import com.google.common.base.Preconditions; public class Scenario { + public static class InjectedFailureException extends RuntimeException { + public InjectedFailureException() { + super("Injected failure"); + } + } + @FunctionalInterface interface Behavior { Behavior THROW = (session, statement) -> { - throw new RuntimeException("Injected failure"); + throw new InjectedFailureException(); }; Behavior EXECUTE_NORMALLY = Session::executeAsync; diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java index 6c8dfc5..54fca6f 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java @@ -30,6 +30,7 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import org.apache.james.backends.cassandra.Scenario.Barrier; +import org.apache.james.backends.cassandra.Scenario.InjectedFailureException; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; @@ -89,7 +90,7 @@ class TestingSessionTest { .whenQueryStartsWith("SELECT value FROM schemaVersion;")); assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) - .isInstanceOf(RuntimeException.class); + .isInstanceOf(InjectedFailureException.class); } @Test @@ -102,7 +103,7 @@ class TestingSessionTest { assertThatThrownBy(() -> new CassandraAsyncExecutor(cassandra.getConf()) .execute(select(VALUE).from(TABLE_NAME)) .block()) - .isInstanceOf(RuntimeException.class); + .isInstanceOf(InjectedFailureException.class); } @Test @@ -113,7 +114,7 @@ class TestingSessionTest { .forAllQueries()); assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) - .isInstanceOf(RuntimeException.class); + .isInstanceOf(InjectedFailureException.class); } @Test @@ -142,9 +143,9 @@ class TestingSessionTest { SoftAssertions.assertSoftly(softly -> { assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) - .isInstanceOf(RuntimeException.class); + .isInstanceOf(InjectedFailureException.class); assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) - .isInstanceOf(RuntimeException.class); + .isInstanceOf(InjectedFailureException.class); assertThatCode(() -> dao.getCurrentSchemaVersion().block()) .doesNotThrowAnyException(); }); @@ -165,7 +166,7 @@ class TestingSessionTest { assertThatCode(() -> dao.getCurrentSchemaVersion().block()) .doesNotThrowAnyException(); assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) - .isInstanceOf(RuntimeException.class); + .isInstanceOf(InjectedFailureException.class); assertThatCode(() -> dao.getCurrentSchemaVersion().block()) .doesNotThrowAnyException(); }); @@ -180,11 +181,11 @@ class TestingSessionTest { SoftAssertions.assertSoftly(softly -> { assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) - .isInstanceOf(RuntimeException.class); + .isInstanceOf(InjectedFailureException.class); assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) - .isInstanceOf(RuntimeException.class); + .isInstanceOf(InjectedFailureException.class); assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) - .isInstanceOf(RuntimeException.class); + .isInstanceOf(InjectedFailureException.class); }); } @@ -198,7 +199,7 @@ class TestingSessionTest { dao.updateVersion(new SchemaVersion(36)).block(); assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) - .isInstanceOf(RuntimeException.class); + .isInstanceOf(InjectedFailureException.class); } @Test @@ -287,6 +288,6 @@ class TestingSessionTest { barrier.releaseCaller(); assertThatThrownBy(operation::block) - .isInstanceOf(RuntimeException.class); + .isInstanceOf(InjectedFailureException.class); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java index f3ee258..15f1a2d 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java @@ -56,7 +56,6 @@ import reactor.util.retry.Retry; public class CassandraModSeqProvider implements ModSeqProvider { public static final String MOD_SEQ_CONDITION = "modSeqCondition"; - private final long maxModSeqRetries; public static class ExceptionRelay extends RuntimeException { private final MailboxException underlying; @@ -83,6 +82,7 @@ public class CassandraModSeqProvider implements ModSeqProvider { } private final CassandraAsyncExecutor cassandraAsyncExecutor; + private final long maxModSeqRetries; private final PreparedStatement select; private final PreparedStatement update; private final PreparedStatement insert; @@ -178,24 +178,14 @@ public class CassandraModSeqProvider implements ModSeqProvider { } public Mono<ModSeq> nextModSeq(CassandraId mailboxId) { + Duration firstBackoff = Duration.ofMillis(10); + return findHighestModSeq(mailboxId) .flatMap(maybeHighestModSeq -> maybeHighestModSeq .map(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq)) .orElseGet(() -> tryInsertModSeq(mailboxId, ModSeq.first()))) - .switchIfEmpty(handleRetries(mailboxId)); - } - - private Mono<ModSeq> handleRetries(CassandraId mailboxId) { - Duration firstBackoff = Duration.ofMillis(10); - return tryFindThenUpdateOnce(mailboxId) .single() .retryWhen(Retry.backoff(maxModSeqRetries, firstBackoff).scheduler(Schedulers.elastic())); } - private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) { - return Mono.defer(() -> findHighestModSeq(mailboxId) - .<ModSeq>handle((t, sink) -> t.ifPresent(sink::next)) - .flatMap(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq))); - } - } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java index 6c328a4..e3aed85 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java @@ -18,7 +18,13 @@ ****************************************************************/ package org.apache.james.mailbox.cassandra.mail; +import static org.apache.james.backends.cassandra.Scenario.Builder.awaitOn; +import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.MAILBOX_ID; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.time.Duration; import java.util.concurrent.ConcurrentSkipListSet; @@ -27,6 +33,8 @@ import java.util.stream.LongStream; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.Scenario; +import org.apache.james.backends.cassandra.Scenario.Barrier; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.core.Username; import org.apache.james.mailbox.ModSeq; @@ -40,8 +48,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import com.datastax.driver.core.querybuilder.QueryBuilder; import com.github.fge.lambdas.Throwing; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + class CassandraModSeqProviderTest { private static final CassandraId CASSANDRA_ID = new CassandraId.Factory().fromString("e22b3ac0-a80b-11e7-bb00-777268d65503"); @@ -84,6 +96,51 @@ class CassandraModSeqProviderTest { } @Test + void failedInsertsShouldBeRetried(CassandraCluster cassandra) throws Exception { + Barrier insertBarrier = new Barrier(2); + Barrier retryBarrier = new Barrier(1); + cassandra.getConf() + .registerScenario( + executeNormally() + .times(2) + .whenQueryStartsWith("SELECT nextModseq FROM modseq WHERE mailboxId=:mailboxId;"), + awaitOn(insertBarrier) + .thenExecuteNormally() + .times(2) + .whenQueryStartsWith("INSERT INTO modseq (nextModseq,mailboxId) VALUES (:nextModseq,:mailboxId) IF NOT EXISTS;"), + awaitOn(retryBarrier) + .thenExecuteNormally() + .times(1) + .whenQueryStartsWith("SELECT nextModseq FROM modseq WHERE mailboxId=:mailboxId;")); + + Mono<ModSeq> operation1 = modSeqProvider.nextModSeq(CASSANDRA_ID) + .subscribeOn(Schedulers.elastic()) + .cache(); + Mono<ModSeq> operation2 = modSeqProvider.nextModSeq(CASSANDRA_ID) + .subscribeOn(Schedulers.elastic()) + .cache(); + + operation1.subscribe(); + operation2.subscribe(); + + insertBarrier.awaitCaller(); + insertBarrier.releaseCaller(); + + retryBarrier.awaitCaller(); + retryBarrier.releaseCaller(); + + // Artificially fail the insert failure + cassandra.getConf() + .execute(QueryBuilder.delete().from(TABLE_NAME) + .where(QueryBuilder.eq(MAILBOX_ID, CASSANDRA_ID.asUuid()))); + + retryBarrier.releaseCaller(); + + assertThatCode(() -> operation1.block(Duration.ofSeconds(1))).doesNotThrowAnyException(); + assertThatCode(() -> operation2.block(Duration.ofSeconds(1))).doesNotThrowAnyException(); + } + + @Test void nextModSeqShouldGenerateUniqueValuesWhenParallelCalls() throws ExecutionException, InterruptedException { int nbEntries = 10; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
