JAMES-2272 Cassandra migration service should be located in Cassandra back-end
That will makes running Cassandra migration more generic and would be a good move toward CLI implementation Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b87d49c7 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b87d49c7 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b87d49c7 Branch: refs/heads/master Commit: b87d49c79df4961d9975469babb94f51958dac03 Parents: 3f26590 Author: benwa <[email protected]> Authored: Wed Dec 27 11:36:45 2017 +0700 Committer: benwa <[email protected]> Committed: Thu Jan 4 15:00:43 2018 +0700 ---------------------------------------------------------------------- .../migration/CassandraMigrationService.java | 96 +++++++ .../cassandra/migration/MigrationException.java | 26 ++ .../CassandraMigrationServiceTest.java | 260 ++++++++++++++++++ .../modules/server/CassandraRoutesModule.java | 2 +- .../routes/CassandraMigrationRoutes.java | 4 +- .../service/CassandraMigrationService.java | 98 ------- .../webadmin/service/MigrationException.java | 26 -- .../routes/CassandraMigrationRoutesTest.java | 2 +- .../service/CassandraMigrationServiceTest.java | 261 ------------------- 9 files changed, 386 insertions(+), 389 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java new file mode 100644 index 0000000..0eb80aa --- /dev/null +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java @@ -0,0 +1,96 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.cassandra.migration; + +import java.util.Map; +import java.util.Optional; +import java.util.stream.IntStream; + +import javax.inject.Inject; +import javax.inject.Named; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; +import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class CassandraMigrationService { + public static final String LATEST_VERSION = "latestVersion"; + private final CassandraSchemaVersionDAO schemaVersionDAO; + private final int latestVersion; + private final Map<Integer, Migration> allMigrationClazz; + private final Logger LOG = LoggerFactory.getLogger(CassandraMigrationService.class); + + @Inject + public CassandraMigrationService(CassandraSchemaVersionDAO schemaVersionDAO, Map<Integer, Migration> allMigrationClazz, @Named(LATEST_VERSION) int latestVersion) { + Preconditions.checkArgument(latestVersion >= 0, "The latest version must be positive"); + this.schemaVersionDAO = schemaVersionDAO; + this.latestVersion = latestVersion; + this.allMigrationClazz = allMigrationClazz; + } + + public Optional<Integer> getCurrentVersion() { + return schemaVersionDAO.getCurrentSchemaVersion().join(); + } + + public Optional<Integer> getLatestVersion() { + return Optional.of(latestVersion); + } + + public synchronized void upgradeToVersion(int newVersion) { + int currentVersion = schemaVersionDAO.getCurrentSchemaVersion().join().orElse(CassandraSchemaVersionManager.DEFAULT_VERSION); + if (currentVersion >= newVersion) { + throw new IllegalStateException("Current version is already up to date"); + } + + IntStream.range(currentVersion, newVersion) + .boxed() + .forEach(this::doMigration); + } + + public void upgradeToLastVersion() { + upgradeToVersion(latestVersion); + } + + private void doMigration(Integer version) { + if (allMigrationClazz.containsKey(version)) { + LOG.info("Migrating to version {} ", version + 1); + Migration.Result migrationResult = allMigrationClazz.get(version).run(); + if (migrationResult == Migration.Result.COMPLETED) { + schemaVersionDAO.updateVersion(version + 1); + LOG.info("Migrating to version {} done", version + 1); + } else { + String message = String.format("Migrating to version %d partially done. " + + "Please check logs for cause of failure and re-run this migration.", + version + 1); + LOG.warn(message); + throw new MigrationException(message); + } + } else { + String message = String.format("Can not migrate to %d. No migration class registered.", version + 1); + LOG.error(message); + throw new NotImplementedException(message); + } + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java new file mode 100644 index 0000000..41aa423 --- /dev/null +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java @@ -0,0 +1,26 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.cassandra.migration; + +public class MigrationException extends RuntimeException { + public MigrationException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java new file mode 100644 index 0000000..fa2a29f --- /dev/null +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java @@ -0,0 +1,260 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.cassandra.migration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.datastax.driver.core.Session; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; + +public class CassandraMigrationServiceTest { + private static final int LATEST_VERSION = 3; + private static final int CURRENT_VERSION = 2; + private static final int OLDER_VERSION = 1; + private CassandraMigrationService testee; + private CassandraSchemaVersionDAO schemaVersionDAO; + private ExecutorService executorService; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private Migration successfulMigration; + + @Before + public void setUp() throws Exception { + schemaVersionDAO = mock(CassandraSchemaVersionDAO.class); + successfulMigration = mock(Migration.class); + when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED); + Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() + .put(OLDER_VERSION, successfulMigration) + .put(CURRENT_VERSION, successfulMigration) + .put(LATEST_VERSION, successfulMigration) + .build(); + testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION); + executorService = Executors.newFixedThreadPool(2); + } + + @After + public void tearDown() { + executorService.shutdownNow(); + } + + @Test + public void getCurrentVersionShouldReturnCurrentVersion() throws Exception { + when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION))); + + assertThat(testee.getCurrentVersion().get()).isEqualTo(CURRENT_VERSION); + } + + @Test + public void getLatestVersionShouldReturnTheLatestVersion() throws Exception { + assertThat(testee.getLatestVersion().get()).isEqualTo(LATEST_VERSION); + } + + @Test + public void upgradeToVersionShouldThrowWhenCurrentVersionIsUpToDate() throws Exception { + expectedException.expect(IllegalStateException.class); + + when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION))); + + testee.upgradeToVersion(OLDER_VERSION); + } + + @Test + public void upgradeToVersionShouldUpdateToVersion() throws Exception { + when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION))); + + testee.upgradeToVersion(CURRENT_VERSION); + + verify(schemaVersionDAO, times(1)).updateVersion(eq(CURRENT_VERSION)); + } + + @Test + public void upgradeToLastVersionShouldThrowWhenVersionIsUpToDate() throws Exception { + expectedException.expect(IllegalStateException.class); + + when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION))); + + testee.upgradeToLastVersion(); + } + + @Test + public void upgradeToLastVersionShouldUpdateToLatestVersion() throws Exception { + when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION))); + + testee.upgradeToLastVersion(); + + verify(schemaVersionDAO, times(1)).updateVersion(eq(LATEST_VERSION)); + } + + @Test + public void upgradeToVersionShouldThrowOnMissingVersion() throws Exception { + Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() + .put(OLDER_VERSION, successfulMigration) + .put(LATEST_VERSION, successfulMigration) + .build(); + testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION); + when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION))); + + expectedException.expect(NotImplementedException.class); + + testee.upgradeToVersion(LATEST_VERSION); + } + + @Test + public void upgradeToVersionShouldUpdateIntermediarySuccessfulMigrationsInCaseOfError() throws Exception { + try { + Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() + .put(OLDER_VERSION, successfulMigration) + .put(LATEST_VERSION, successfulMigration) + .build(); + testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION); + when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION))); + + expectedException.expect(RuntimeException.class); + + testee.upgradeToVersion(LATEST_VERSION); + } finally { + verify(schemaVersionDAO).updateVersion(CURRENT_VERSION); + } + } + + @Test + public void concurrentMigrationsShouldFail() throws Exception { + // Given a stateful migration service + Migration wait1SecondMigration = mock(Migration.class); + doAnswer(invocation -> { + Thread.sleep(1000); + return Migration.Result.COMPLETED; + }).when(wait1SecondMigration).run(); + Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() + .put(OLDER_VERSION, wait1SecondMigration) + .put(CURRENT_VERSION, wait1SecondMigration) + .put(LATEST_VERSION, wait1SecondMigration) + .build(); + testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION); + + // When I perform a concurrent migration + AtomicInteger encounteredExceptionCount = new AtomicInteger(0); + executorService.submit(() -> testee.upgradeToVersion(LATEST_VERSION)); + executorService.submit(() -> { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + + try { + testee.upgradeToVersion(LATEST_VERSION); + } catch (IllegalStateException e) { + encounteredExceptionCount.incrementAndGet(); + } + }); + executorService.awaitTermination(10, TimeUnit.SECONDS); + + // Then the second migration fails + assertThat(encounteredExceptionCount.get()).isEqualTo(1); + } + + @Test + public void partialMigrationShouldThrow() throws Exception { + Migration migration1 = mock(Migration.class); + when(migration1.run()).thenReturn(Migration.Result.PARTIAL); + Migration migration2 = successfulMigration; + + Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() + .put(OLDER_VERSION, migration1) + .put(CURRENT_VERSION, migration2) + .build(); + testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION); + + expectedException.expect(MigrationException.class); + + testee.upgradeToVersion(LATEST_VERSION); + } + + @Test + public void partialMigrationShouldAbortMigrations() throws Exception { + Migration migration1 = mock(Migration.class); + when(migration1.run()).thenReturn(Migration.Result.PARTIAL); + Migration migration2 = mock(Migration.class); + when(migration2.run()).thenReturn(Migration.Result.COMPLETED); + + Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() + .put(OLDER_VERSION, migration1) + .put(CURRENT_VERSION, migration2) + .build(); + testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION); + + expectedException.expect(MigrationException.class); + + try { + testee.upgradeToVersion(LATEST_VERSION); + } finally { + verify(migration1, times(1)).run(); + verifyNoMoreInteractions(migration1); + verifyZeroInteractions(migration2); + } + } + + public static class InMemorySchemaDAO extends CassandraSchemaVersionDAO { + private int currentVersion; + + public InMemorySchemaDAO(int currentVersion) { + super(mock(Session.class), null); + this.currentVersion = currentVersion; + } + + @Override + public CompletableFuture<Optional<Integer>> getCurrentSchemaVersion() { + return CompletableFuture.completedFuture(Optional.of(currentVersion)); + } + + @Override + public CompletableFuture<Void> updateVersion(int newVersion) { + currentVersion = newVersion; + return CompletableFuture.completedFuture(null); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java index 6fe2792..16b821b 100644 --- a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java +++ b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java @@ -19,13 +19,13 @@ package org.apache.james.modules.server; +import org.apache.james.backends.cassandra.migration.CassandraMigrationService; import org.apache.james.backends.cassandra.migration.Migration; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager; import org.apache.james.mailbox.cassandra.mail.migration.AttachmentMessageIdCreation; import org.apache.james.mailbox.cassandra.mail.migration.AttachmentV2Migration; import org.apache.james.webadmin.Routes; import org.apache.james.webadmin.routes.CassandraMigrationRoutes; -import org.apache.james.webadmin.service.CassandraMigrationService; import com.google.inject.AbstractModule; import com.google.inject.Scopes; http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java index 3a42657..dce04cf 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java @@ -21,11 +21,11 @@ package org.apache.james.webadmin.routes; import javax.inject.Inject; +import org.apache.james.backends.cassandra.migration.CassandraMigrationService; +import org.apache.james.backends.cassandra.migration.MigrationException; import org.apache.james.webadmin.Constants; import org.apache.james.webadmin.Routes; import org.apache.james.webadmin.dto.CassandraVersionRequest; -import org.apache.james.webadmin.service.CassandraMigrationService; -import org.apache.james.webadmin.service.MigrationException; import org.apache.james.webadmin.utils.ErrorResponder; import org.apache.james.webadmin.utils.ErrorResponder.ErrorType; import org.apache.james.webadmin.utils.JsonTransformer; http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java deleted file mode 100644 index 004cd60..0000000 --- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java +++ /dev/null @@ -1,98 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.webadmin.service; - -import java.util.Map; -import java.util.Optional; -import java.util.stream.IntStream; - -import javax.inject.Inject; -import javax.inject.Named; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; -import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager; -import org.apache.james.mailbox.cassandra.mail.migration.Migration; -import org.apache.james.webadmin.dto.CassandraVersionResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -public class CassandraMigrationService { - public static final String LATEST_VERSION = "latestVersion"; - private final CassandraSchemaVersionDAO schemaVersionDAO; - private final int latestVersion; - private final Map<Integer, Migration> allMigrationClazz; - private final Logger LOG = LoggerFactory.getLogger(CassandraMigrationService.class); - - @Inject - public CassandraMigrationService(CassandraSchemaVersionDAO schemaVersionDAO, Map<Integer, Migration> allMigrationClazz, @Named(LATEST_VERSION) int latestVersion) { - Preconditions.checkArgument(latestVersion >= 0, "The latest version must be positive"); - this.schemaVersionDAO = schemaVersionDAO; - this.latestVersion = latestVersion; - this.allMigrationClazz = allMigrationClazz; - } - - public CassandraVersionResponse getCurrentVersion() { - return new CassandraVersionResponse(schemaVersionDAO.getCurrentSchemaVersion().join()); - } - - public CassandraVersionResponse getLatestVersion() { - return new CassandraVersionResponse(Optional.of(latestVersion)); - } - - public synchronized void upgradeToVersion(int newVersion) { - int currentVersion = schemaVersionDAO.getCurrentSchemaVersion().join().orElse(CassandraSchemaVersionManager.DEFAULT_VERSION); - if (currentVersion >= newVersion) { - throw new IllegalStateException("Current version is already up to date"); - } - - IntStream.range(currentVersion, newVersion) - .boxed() - .forEach(this::doMigration); - } - - public void upgradeToLastVersion() { - upgradeToVersion(latestVersion); - } - - private void doMigration(Integer version) { - if (allMigrationClazz.containsKey(version)) { - LOG.info("Migrating to version {} ", version + 1); - Migration.MigrationResult migrationResult = allMigrationClazz.get(version).run(); - if (migrationResult == Migration.MigrationResult.COMPLETED) { - schemaVersionDAO.updateVersion(version + 1); - LOG.info("Migrating to version {} done", version + 1); - } else { - String message = String.format("Migrating to version %d partially done. " + - "Please check logs for cause of failure and re-run this migration.", - version + 1); - LOG.warn(message); - throw new MigrationException(message); - } - } else { - String message = String.format("Can not migrate to %d. No migration class registered.", version + 1); - LOG.error(message); - throw new NotImplementedException(message); - } - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java deleted file mode 100644 index 7b9d74f..0000000 --- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java +++ /dev/null @@ -1,26 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.webadmin.service; - -public class MigrationException extends RuntimeException { - public MigrationException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java index 1115e87..f3fc05a 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java @@ -37,12 +37,12 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import org.apache.james.backends.cassandra.migration.CassandraMigrationService; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; import org.apache.james.mailbox.cassandra.mail.migration.Migration; import org.apache.james.metrics.logger.DefaultMetricFactory; import org.apache.james.webadmin.WebAdminServer; import org.apache.james.webadmin.WebAdminUtils; -import org.apache.james.webadmin.service.CassandraMigrationService; import org.apache.james.webadmin.utils.JsonTransformer; import org.eclipse.jetty.http.HttpStatus; import org.junit.After; http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java deleted file mode 100644 index 0e7496f..0000000 --- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java +++ /dev/null @@ -1,261 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.webadmin.service; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.james.backends.cassandra.migration.Migration; -import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import com.datastax.driver.core.Session; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; - -public class CassandraMigrationServiceTest { - private static final int LATEST_VERSION = 3; - private static final int CURRENT_VERSION = 2; - private static final int OLDER_VERSION = 1; - private CassandraMigrationService testee; - private CassandraSchemaVersionDAO schemaVersionDAO; - private ExecutorService executorService; - - @Rule - public ExpectedException expectedException = ExpectedException.none(); - private Migration successfulMigration; - - @Before - public void setUp() throws Exception { - schemaVersionDAO = mock(CassandraSchemaVersionDAO.class); - successfulMigration = mock(Migration.class); - when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED); - Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() - .put(OLDER_VERSION, successfulMigration) - .put(CURRENT_VERSION, successfulMigration) - .put(LATEST_VERSION, successfulMigration) - .build(); - testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION); - executorService = Executors.newFixedThreadPool(2); - } - - @After - public void tearDown() { - executorService.shutdownNow(); - } - - @Test - public void getCurrentVersionShouldReturnCurrentVersion() throws Exception { - when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION))); - - assertThat(testee.getCurrentVersion().getversion().get()).isEqualTo(CURRENT_VERSION); - } - - @Test - public void getLatestVersionShouldReturnTheLatestVersion() throws Exception { - assertThat(testee.getLatestVersion().getversion().get()).isEqualTo(LATEST_VERSION); - } - - @Test - public void upgradeToVersionShouldThrowWhenCurrentVersionIsUpToDate() throws Exception { - expectedException.expect(IllegalStateException.class); - - when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION))); - - testee.upgradeToVersion(OLDER_VERSION); - } - - @Test - public void upgradeToVersionShouldUpdateToVersion() throws Exception { - when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION))); - - testee.upgradeToVersion(CURRENT_VERSION); - - verify(schemaVersionDAO, times(1)).updateVersion(eq(CURRENT_VERSION)); - } - - @Test - public void upgradeToLastVersionShouldThrowWhenVersionIsUpToDate() throws Exception { - expectedException.expect(IllegalStateException.class); - - when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION))); - - testee.upgradeToLastVersion(); - } - - @Test - public void upgradeToLastVersionShouldUpdateToLatestVersion() throws Exception { - when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION))); - - testee.upgradeToLastVersion(); - - verify(schemaVersionDAO, times(1)).updateVersion(eq(LATEST_VERSION)); - } - - @Test - public void upgradeToVersionShouldThrowOnMissingVersion() throws Exception { - Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() - .put(OLDER_VERSION, successfulMigration) - .put(LATEST_VERSION, successfulMigration) - .build(); - testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION); - when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION))); - - expectedException.expect(NotImplementedException.class); - - testee.upgradeToVersion(LATEST_VERSION); - } - - @Test - public void upgradeToVersionShouldUpdateIntermediarySuccessfulMigrationsInCaseOfError() throws Exception { - try { - Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() - .put(OLDER_VERSION, successfulMigration) - .put(LATEST_VERSION, successfulMigration) - .build(); - testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION); - when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION))); - - expectedException.expect(RuntimeException.class); - - testee.upgradeToVersion(LATEST_VERSION); - } finally { - verify(schemaVersionDAO).updateVersion(CURRENT_VERSION); - } - } - - @Test - public void concurrentMigrationsShouldFail() throws Exception { - // Given a stateful migration service - Migration wait1SecondMigration = mock(Migration.class); - doAnswer(invocation -> { - Thread.sleep(1000); - return Migration.Result.COMPLETED; - }).when(wait1SecondMigration).run(); - Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() - .put(OLDER_VERSION, wait1SecondMigration) - .put(CURRENT_VERSION, wait1SecondMigration) - .put(LATEST_VERSION, wait1SecondMigration) - .build(); - testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION); - - // When I perform a concurrent migration - AtomicInteger encounteredExceptionCount = new AtomicInteger(0); - executorService.submit(() -> testee.upgradeToVersion(LATEST_VERSION)); - executorService.submit(() -> { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - throw Throwables.propagate(e); - } - - try { - testee.upgradeToVersion(LATEST_VERSION); - } catch (IllegalStateException e) { - encounteredExceptionCount.incrementAndGet(); - } - }); - executorService.awaitTermination(10, TimeUnit.SECONDS); - - // Then the second migration fails - assertThat(encounteredExceptionCount.get()).isEqualTo(1); - } - - @Test - public void partialMigrationShouldThrow() throws Exception { - Migration migration1 = mock(Migration.class); - when(migration1.run()).thenReturn(Migration.Result.PARTIAL); - Migration migration2 = successfulMigration; - - Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() - .put(OLDER_VERSION, migration1) - .put(CURRENT_VERSION, migration2) - .build(); - testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION); - - expectedException.expect(MigrationException.class); - - testee.upgradeToVersion(LATEST_VERSION); - } - - @Test - public void partialMigrationShouldAbortMigrations() throws Exception { - Migration migration1 = mock(Migration.class); - when(migration1.run()).thenReturn(Migration.Result.PARTIAL); - Migration migration2 = mock(Migration.class); - when(migration2.run()).thenReturn(Migration.Result.COMPLETED); - - Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder() - .put(OLDER_VERSION, migration1) - .put(CURRENT_VERSION, migration2) - .build(); - testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION); - - expectedException.expect(MigrationException.class); - - try { - testee.upgradeToVersion(LATEST_VERSION); - } finally { - verify(migration1, times(1)).run(); - verifyNoMoreInteractions(migration1); - verifyZeroInteractions(migration2); - } - } - - public static class InMemorySchemaDAO extends CassandraSchemaVersionDAO { - private int currentVersion; - - public InMemorySchemaDAO(int currentVersion) { - super(mock(Session.class), null); - this.currentVersion = currentVersion; - } - - @Override - public CompletableFuture<Optional<Integer>> getCurrentSchemaVersion() { - return CompletableFuture.completedFuture(Optional.of(currentVersion)); - } - - @Override - public CompletableFuture<Void> updateVersion(int newVersion) { - currentVersion = newVersion; - return CompletableFuture.completedFuture(null); - } - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
