This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7de356a4f2b4df51a0ff5114fe2c8c0943139329 Author: Benoit Tellier <[email protected]> AuthorDate: Thu Feb 20 13:54:26 2020 +0700 JAMES-3058 Cassandra Testing Session: barrier synchronisation This allow easy concurrency testing for Cassandra tests --- .../james/backends/cassandra/TestingSession.java | 47 +++++++++++--- .../backends/cassandra/TestingSessionTest.java | 71 ++++++++++++++++++++++ 2 files changed, 108 insertions(+), 10 deletions(-) diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java index 5e0ad9d..2e879c8 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java @@ -20,8 +20,8 @@ package org.apache.james.backends.cassandra; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiFunction; import java.util.function.Predicate; import com.datastax.driver.core.BoundStatement; @@ -36,22 +36,45 @@ import com.datastax.driver.core.Statement; import com.google.common.util.concurrent.ListenableFuture; public class TestingSession implements Session { - enum Behavior { - THROW((session, statement) -> { + @FunctionalInterface + interface Behavior { + Behavior THROW = (session, statement) -> { RuntimeException injected_failure = new RuntimeException("Injected failure"); injected_failure.printStackTrace(); throw injected_failure; - }), - EXECUTE_NORMALLY(Session::executeAsync); + }; - private final BiFunction<Session, Statement, ResultSetFuture> behaviour; + Behavior EXECUTE_NORMALLY = Session::executeAsync; - Behavior(BiFunction<Session, Statement, ResultSetFuture> behaviour) { - this.behaviour = behaviour; + static Behavior awaitOn(Barrier barrier) { + return (session, statement) -> { + barrier.call(); + return session.executeAsync(statement); + }; } - ResultSetFuture execute(Session session, Statement statement) { - return behaviour.apply(session, statement); + ResultSetFuture execute(Session session, Statement statement); + } + + public static class Barrier { + private final CountDownLatch callerLatch = new CountDownLatch(1); + private final CountDownLatch awaitCallerLatch = new CountDownLatch(1); + + void awaitCaller() throws InterruptedException { + awaitCallerLatch.await(); + } + + void releaseCaller() { + callerLatch.countDown(); + } + + void call() { + awaitCallerLatch.countDown(); + try { + callerLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } @@ -139,6 +162,10 @@ public class TestingSession implements Session { return condition -> applyCount -> () -> executionHook = new ExecutionHook(condition, Behavior.THROW, applyCount); } + public RequiresCondition awaitOn(Barrier barrier) { + return condition -> applyCount -> () -> executionHook = new ExecutionHook(condition, Behavior.awaitOn(barrier), applyCount); + } + public void resetExecutionHook() { executionHook = NO_EXECUTION_HOOK; } diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java index 1983e99..c044697 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java @@ -19,9 +19,11 @@ package org.apache.james.backends.cassandra; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import org.apache.james.backends.cassandra.TestingSession.Barrier; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.backends.cassandra.versions.SchemaVersion; @@ -30,6 +32,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + class TestingSessionTest { @RegisterExtension static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraSchemaVersionModule.MODULE); @@ -158,4 +163,70 @@ class TestingSessionTest { assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) .isInstanceOf(RuntimeException.class); } + + @Test + void statementShouldNotBeAppliedBeforeBarrierIsReleased(CassandraCluster cassandra) throws Exception { + SchemaVersion originalSchemaVersion = new SchemaVersion(32); + dao.updateVersion(originalSchemaVersion).block(); + Barrier barrier = new Barrier(); + cassandra.getConf() + .awaitOn(barrier) + .whenBoundStatementStartsWith("INSERT INTO schemaVersion") + .times(1) + .setExecutionHook(); + + dao.updateVersion(new SchemaVersion(36)).subscribeOn(Schedulers.elastic()).subscribe(); + + Thread.sleep(100); + + assertThat(dao.getCurrentSchemaVersion().block()) + .contains(originalSchemaVersion); + } + + @Test + void statementShouldBeAppliedWhenBarrierIsReleased(CassandraCluster cassandra) throws Exception { + SchemaVersion originalSchemaVersion = new SchemaVersion(32); + SchemaVersion newVersion = new SchemaVersion(36); + + dao.updateVersion(originalSchemaVersion).block(); + Barrier barrier = new Barrier(); + cassandra.getConf() + .awaitOn(barrier) + .whenBoundStatementStartsWith("INSERT INTO schemaVersion") + .times(1) + .setExecutionHook(); + + Mono<Void> operation = dao.updateVersion(newVersion).cache(); + + operation.subscribeOn(Schedulers.elastic()).subscribe(); + barrier.releaseCaller(); + operation.block(); + + assertThat(dao.getCurrentSchemaVersion().block()) + .contains(newVersion); + } + + @Test + void testShouldBeAbleToAwaitCaller(CassandraCluster cassandra) throws Exception { + SchemaVersion originalSchemaVersion = new SchemaVersion(32); + SchemaVersion newVersion = new SchemaVersion(36); + + dao.updateVersion(originalSchemaVersion).block(); + Barrier barrier = new Barrier(); + cassandra.getConf() + .awaitOn(barrier) + .whenBoundStatementStartsWith("INSERT INTO schemaVersion") + .times(1) + .setExecutionHook(); + + Mono<Void> operation = dao.updateVersion(newVersion).cache(); + + operation.subscribeOn(Schedulers.elastic()).subscribe(); + barrier.awaitCaller(); + barrier.releaseCaller(); + operation.block(); + + assertThat(dao.getCurrentSchemaVersion().block()) + .contains(newVersion); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
