JAMES-2272 Migration should succeed when already applied concurrentMigrationIsNotAllowed test can not be return anymore as it was relying on the non indepotent behaviour to ensure tasks was run sequentially. However, using the taskManager, we ensure task, and thus migrations can run sequentially.
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e4af1a8b Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e4af1a8b Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e4af1a8b Branch: refs/heads/master Commit: e4af1a8b4e71b76685980257eca304bf7faf7d78 Parents: 955afb0 Author: benwa <[email protected]> Authored: Wed Jan 3 16:40:28 2018 +0700 Committer: benwa <[email protected]> Committed: Thu Jan 4 15:12:47 2018 +0700 ---------------------------------------------------------------------- .../migration/CassandraMigrationService.java | 10 +-- .../CassandraMigrationServiceTest.java | 14 ++-- .../WebAdminServerIntegrationTest.java | 39 ----------- .../routes/CassandraMigrationRoutesTest.java | 70 +++++++++++++------- 4 files changed, 54 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/e4af1a8b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java index 9ab1177..7ff0775 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java @@ -58,7 +58,6 @@ public class CassandraMigrationService { public Migration upgradeToVersion(SchemaVersion newVersion) { SchemaVersion currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION); - assertMigrationNeeded(newVersion, currentVersion); Migration migrationCombination = IntStream.range(currentVersion.getValue(), newVersion.getValue()) .boxed() @@ -69,13 +68,6 @@ public class CassandraMigrationService { return new MigrationTask(migrationCombination, newVersion); } - private void assertMigrationNeeded(SchemaVersion newVersion, SchemaVersion currentVersion) { - boolean needMigration = currentVersion.isBefore(newVersion); - if (!needMigration) { - throw new IllegalStateException("Current version is already up to date"); - } - } - private SchemaVersion validateVersionNumber(SchemaVersion versionNumber) { if (!allMigrationClazz.containsKey(versionNumber)) { String message = String.format("Can not migrate to %d. No migration class registered.", versionNumber.getValue()); @@ -94,7 +86,7 @@ public class CassandraMigrationService { SchemaVersion newVersion = version.next(); SchemaVersion currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION); if (currentVersion.isAfterOrEquals(newVersion)) { - return Migration.Result.PARTIAL; + return Migration.Result.COMPLETED; } LOG.info("Migrating to version {} ", newVersion); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4af1a8b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java index 4897800..ac1fc82 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java @@ -37,6 +37,7 @@ import java.util.concurrent.Executors; import org.apache.commons.lang.NotImplementedException; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; import org.apache.james.backends.cassandra.versions.SchemaVersion; +import org.apache.james.task.Task; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -91,12 +92,11 @@ public class CassandraMigrationServiceTest { } @Test - public void upgradeToVersionShouldThrowWhenCurrentVersionIsUpToDate() throws Exception { - expectedException.expect(IllegalStateException.class); - + public void upgradeToVersionShouldNotThrowWhenCurrentVersionIsUpToDate() throws Exception { when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION))); - testee.upgradeToVersion(OLDER_VERSION).run(); + assertThat(testee.upgradeToVersion(OLDER_VERSION).run()) + .isEqualTo(Task.Result.COMPLETED); } @Test @@ -109,12 +109,12 @@ public class CassandraMigrationServiceTest { } @Test - public void upgradeToLastVersionShouldThrowWhenVersionIsUpToDate() throws Exception { - expectedException.expect(IllegalStateException.class); + public void upgradeToLastVersionShouldNotThrowWhenVersionIsUpToDate() throws Exception { when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION))); - testee.upgradeToLastVersion().run(); + assertThat(testee.upgradeToLastVersion().run()) + .isEqualTo(Task.Result.COMPLETED); } @Test http://git-wip-us.apache.org/repos/asf/james-project/blob/e4af1a8b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java index fee4465..3f403bc 100644 --- a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java +++ b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java @@ -31,8 +31,6 @@ import static org.hamcrest.Matchers.is; import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; import org.apache.james.CassandraJmapTestRule; import org.apache.james.DockerCassandraRule; @@ -40,8 +38,6 @@ import org.apache.james.GuiceJamesServer; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager; import org.apache.james.modules.MailboxProbeImpl; import org.apache.james.probe.DataProbe; -import org.apache.james.task.TaskManager; -import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.apache.james.utils.DataProbeImpl; import org.apache.james.utils.WebAdminGuiceProbe; import org.apache.james.webadmin.routes.DomainsRoutes; @@ -54,7 +50,6 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; -import org.testcontainers.shaded.com.google.common.collect.ImmutableList; import com.jayway.restassured.RestAssured; import com.jayway.restassured.builder.RequestSpecBuilder; @@ -276,40 +271,6 @@ public class WebAdminServerIntegrationTest { } @Test - public void concurrentMigrationIsNotAllowed() throws Exception { - ConcurrentLinkedQueue<String> taskIds = new ConcurrentLinkedQueue<>(); - int threadCount = 2; - int operationCount = 1; - new ConcurrentTestRunner(threadCount, operationCount, (a, b) -> { - String migrationId = with() - .port(webAdminGuiceProbe.getWebAdminPort()) - .post(UPGRADE_TO_LATEST_VERSION) - .jsonPath() - .get("taskId"); - taskIds.add(migrationId); - }).run() - .awaitTermination(1, TimeUnit.MINUTES); - - String id1 = taskIds.poll(); - String id2 = taskIds.poll(); - String status1 = with() - .port(webAdminGuiceProbe.getWebAdminPort()) - .get("/tasks/" + id1 + "/await") - .jsonPath() - .get("status"); - String status2 = with() - .port(webAdminGuiceProbe.getWebAdminPort()) - .get("/tasks/" + id2 + "/await") - .jsonPath() - .get("status"); - - assertThat(ImmutableList.of(status1, status2)) - .containsOnly( - TaskManager.Status.COMPLETED.getValue(), - TaskManager.Status.FAILED.getValue()); - } - - @Test public void addressGroupsEndpointShouldHandleRequests() throws Exception { dataProbe.addAddressMapping("group", "domain.com", "[email protected]"); dataProbe.addAddressMapping("group", "domain.com", "[email protected]"); http://git-wip-us.apache.org/repos/asf/james-project/blob/e4af1a8b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java index 1372c9b..d3b3046 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java @@ -201,26 +201,38 @@ public class CassandraMigrationRoutesTest { } @Test - public void postShouldNotDoMigrationWhenCurrentVersionIsNewerThan() throws Exception { + public void postShouldCreateTaskWhenCurrentVersionIsNewerThan() throws Exception { when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION))); - Map<String, Object> errors = given() + String taskId = given() .body(String.valueOf(OLDER_VERSION.getValue())) .with() .post("/upgrade") + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") .then() - .statusCode(HttpStatus.CONFLICT_409) - .contentType(ContentType.JSON) - .extract() - .body() + .body("status", is("completed")); + } + + @Test + public void postShouldNotUpdateVersionWhenCurrentVersionIsNewerThan() throws Exception { + when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION))); + + String taskId = given() + .body(String.valueOf(OLDER_VERSION.getValue())) + .with() + .post("/upgrade") .jsonPath() - .getMap("."); + .get("taskId"); - assertThat(errors) - .containsEntry("statusCode", HttpStatus.CONFLICT_409) - .containsEntry("type", "WrongState") - .containsEntry("message", "The migration requested can not be performed") - .containsEntry("cause", "Current version is already up to date"); + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); verify(schemaVersionDAO, times(1)).getCurrentSchemaVersion(); verifyNoMoreInteractions(schemaVersionDAO); @@ -280,24 +292,34 @@ public class CassandraMigrationRoutesTest { } @Test - public void postShouldNotDoMigrationToLatestVersionWhenItIsUpToDate() throws Exception { + public void postShouldCreateTaskWhenItIsUpToDate() throws Exception { when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION))); - Map<String, Object> errors = when() + String taskId = with() .post("/upgrade/latest") + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") .then() - .statusCode(HttpStatus.CONFLICT_409) - .contentType(ContentType.JSON) - .extract() - .body() + .body("status", is("completed")); + } + + @Test + public void postShouldNotUpdateVersionWhenItIsUpToDate() throws Exception { + when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION))); + + String taskId = with() + .post("/upgrade/latest") .jsonPath() - .getMap("."); + .get("taskId"); - assertThat(errors) - .containsEntry("statusCode", HttpStatus.CONFLICT_409) - .containsEntry("type", "WrongState") - .containsEntry("message", "The migration requested can not be performed") - .containsEntry("cause", "Current version is already up to date"); + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); verify(schemaVersionDAO, times(1)).getCurrentSchemaVersion(); verifyNoMoreInteractions(schemaVersionDAO); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
