This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 190d80dc10f60e99a15aba366a8c758e4a2b3a09 Author: Matthieu Baechler <[email protected]> AuthorDate: Mon Jul 22 17:38:45 2019 +0200 JAMES-2813 MigrationTask now only keep a target version and compute everything else at runtime --- .../migration/CassandraMigrationService.java | 60 +++--------- .../migration/CassandraSchemaTransitions.java | 25 +++++ .../cassandra/migration/MigrationTask.java | 101 ++++++++++++++++++--- .../migration/CassandraMigrationServiceTest.java | 31 ++++--- .../modules/server/CassandraRoutesModule.java | 4 + .../routes/CassandraMigrationRoutesTest.java | 14 +-- 6 files changed, 158 insertions(+), 77 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java index 7c71cbd..543ca3f 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java @@ -21,8 +21,6 @@ package org.apache.james.backends.cassandra.migration; import static org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager.DEFAULT_VERSION; -import java.util.List; -import java.util.Map; import java.util.Optional; import javax.inject.Inject; @@ -36,20 +34,21 @@ import org.apache.james.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.steveash.guavate.Guavate; - public class CassandraMigrationService { public static final String LATEST_VERSION = "latestVersion"; private final CassandraSchemaVersionDAO schemaVersionDAO; + private final CassandraSchemaTransitions transitions; + private final MigrationTask.Factory taskFactory; private final SchemaVersion latestVersion; - private final Map<SchemaTransition, Migration> allMigrationClazz; private final Logger logger = LoggerFactory.getLogger(CassandraMigrationService.class); @Inject - public CassandraMigrationService(CassandraSchemaVersionDAO schemaVersionDAO, Map<SchemaTransition, Migration> allMigrationClazz, @Named(LATEST_VERSION) SchemaVersion latestVersion) { + public CassandraMigrationService(CassandraSchemaVersionDAO schemaVersionDAO, CassandraSchemaTransitions transitions, + MigrationTask.Factory factory, @Named(LATEST_VERSION) SchemaVersion latestVersion) { this.schemaVersionDAO = schemaVersionDAO; + this.transitions = transitions; + this.taskFactory = factory; this.latestVersion = latestVersion; - this.allMigrationClazz = allMigrationClazz; } public Optional<SchemaVersion> getCurrentVersion() { @@ -61,53 +60,24 @@ public class CassandraMigrationService { } public Task upgradeToVersion(SchemaVersion target) { - SchemaVersion currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION); + checkTarget(target); + return taskFactory.create(target); + } - List<Migration> migrations = currentVersion.listTransitionsForTarget(target) - .stream() - .map(this::validateTransitionExists) - .map(this::toMigration) - .collect(Guavate.toImmutableList()); - return new MigrationTask(migrations, target); + private void checkTarget(SchemaVersion target) { + getCurrentVersion().orElse(DEFAULT_VERSION).listTransitionsForTarget(target).forEach(this::checkMigration); } - private SchemaTransition validateTransitionExists(SchemaTransition transition) { - if (!allMigrationClazz.containsKey(transition)) { + private void checkMigration(SchemaTransition transition) { + transitions.findMigration(transition).orElseThrow(() -> { String message = String.format("Can not migrate from %s to %s. No migration class registered.", transition.fromAsString(), transition.toAsString()); logger.error(message); - throw new NotImplementedException(message); - } - return transition; + return new NotImplementedException(message); + }); } public Task upgradeToLastVersion() { return upgradeToVersion(latestVersion); } - private Migration toMigration(SchemaTransition transition) { - return () -> { - SchemaVersion currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION); - SchemaVersion targetVersion = transition.to(); - if (currentVersion.isAfterOrEquals(targetVersion)) { - return; - } - - logger.info("Migrating to version {} ", transition.toAsString()); - allMigrationClazz.get(transition).asTask().run() - .onComplete(() -> schemaVersionDAO.updateVersion(transition.to()).block(), - () -> logger.info("Migrating to version {} done", transition.toAsString())) - .onFailure(() -> logger.warn(failureMessage(transition.to())), - () -> throwMigrationException(transition.to())); - }; - } - - private void throwMigrationException(SchemaVersion newVersion) { - throw new MigrationException(failureMessage(newVersion)); - } - - private String failureMessage(SchemaVersion newVersion) { - return String.format("Migrating to version %d partially done. " + - "Please check logs for cause of failure and re-run this migration.", newVersion.getValue()); - } - } diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraSchemaTransitions.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraSchemaTransitions.java new file mode 100644 index 0000000..6780c9d --- /dev/null +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraSchemaTransitions.java @@ -0,0 +1,25 @@ +package org.apache.james.backends.cassandra.migration; + +import java.util.Map; +import java.util.Optional; + +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.versions.SchemaTransition; + +import com.google.common.annotations.VisibleForTesting; + +public class CassandraSchemaTransitions { + + private final Map<SchemaTransition, Migration> transitions; + + @Inject + @VisibleForTesting + public CassandraSchemaTransitions(Map<SchemaTransition, Migration> transitions) { + this.transitions = transitions; + } + + public Optional<Migration> findMigration(SchemaTransition transition) { + return Optional.ofNullable(transitions.get(transition)); + } +} diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java index 62276aa..a41d768 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java @@ -19,16 +19,49 @@ package org.apache.james.backends.cassandra.migration; -import java.util.List; +import static org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager.DEFAULT_VERSION; + import java.util.Optional; +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; +import org.apache.james.backends.cassandra.versions.SchemaTransition; import org.apache.james.backends.cassandra.versions.SchemaVersion; import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableList; +import com.github.fge.lambdas.Throwing; +import com.google.common.annotations.VisibleForTesting; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; public class MigrationTask implements Task { + + public interface Factory { + MigrationTask create(SchemaVersion target); + } + + public static class Impl implements Factory { + private final CassandraSchemaVersionDAO schemaVersionDAO; + private final CassandraSchemaTransitions transitions; + + @Inject + private Impl(CassandraSchemaVersionDAO schemaVersionDAO, CassandraSchemaTransitions transitions) { + this.schemaVersionDAO = schemaVersionDAO; + this.transitions = transitions; + } + + @Override + public MigrationTask create(SchemaVersion target) { + return new MigrationTask(schemaVersionDAO, transitions, target); + } + } + + private static final Logger LOGGER = LoggerFactory.getLogger(MigrationTask.class); + public static final String CASSANDRA_MIGRATION = "CassandraMigration"; public static class Details implements TaskExecutionDetails.AdditionalInformation { @@ -43,22 +76,66 @@ public class MigrationTask implements Task { } } - private final ImmutableList<Migration> migrations; - private final SchemaVersion toVersion; + private final CassandraSchemaVersionDAO schemaVersionDAO; + private final CassandraSchemaTransitions transitions; + private final SchemaVersion target; - public MigrationTask(List<Migration> migration, SchemaVersion toVersion) { - this.migrations = ImmutableList.copyOf(migration); - this.toVersion = toVersion; + @VisibleForTesting + public MigrationTask(CassandraSchemaVersionDAO schemaVersionDAO, CassandraSchemaTransitions transitions, SchemaVersion target) { + this.schemaVersionDAO = schemaVersionDAO; + this.transitions = transitions; + this.target = target; } @Override - public Result run() throws InterruptedException { - for (Migration migration: migrations) { - migration.asTask().run(); - } + public Result run() { + getCurrentVersion().listTransitionsForTarget(target) + .stream() + .map(this::migration) + .forEach(Throwing.consumer(this::runMigration).sneakyThrow()); return Result.COMPLETED; } + private SchemaVersion getCurrentVersion() { + return schemaVersionDAO.getCurrentSchemaVersion().block().orElse(DEFAULT_VERSION); + } + + private Tuple2<SchemaTransition, Migration> migration(SchemaTransition transition) { + return Tuples.of( + transition, + transitions.findMigration(transition) + .orElseThrow(() -> new MigrationException("unable to find a required Migration for transition " + transition))); + + } + + private void runMigration(Tuple2<SchemaTransition, Migration> tuple) throws InterruptedException { + SchemaVersion currentVersion = getCurrentVersion(); + SchemaTransition transition = tuple.getT1(); + SchemaVersion targetVersion = transition.to(); + if (currentVersion.isAfterOrEquals(targetVersion)) { + return; + } + + LOGGER.info("Migrating to version {} ", transition.toAsString()); + Migration migration = tuple.getT2(); + migration.asTask().run() + .onComplete( + () -> schemaVersionDAO.updateVersion(transition.to()).block(), + () -> LOGGER.info("Migrating to version {} done", transition.toAsString())) + .onFailure( + () -> LOGGER.warn(failureMessage(transition.to())), + () -> throwMigrationException(transition.to())); + } + + private void throwMigrationException(SchemaVersion newVersion) { + throw new MigrationException(failureMessage(newVersion)); + } + + private String failureMessage(SchemaVersion newVersion) { + return String.format("Migrating to version %d partially done. " + + "Please check logs for cause of failure and re-run this migration.", newVersion.getValue()); + } + @Override public String type() { return CASSANDRA_MIGRATION; @@ -66,6 +143,6 @@ public class MigrationTask implements Task { @Override public Optional<TaskExecutionDetails.AdditionalInformation> details() { - return Optional.of(new Details(toVersion)); + return Optional.of(new Details(target)); } } diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java index 0cae5d7..0410313 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java @@ -68,16 +68,17 @@ public class CassandraMigrationServiceTest { @Before public void setUp() throws Exception { schemaVersionDAO = mock(CassandraSchemaVersionDAO.class); + when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.empty())); when(schemaVersionDAO.updateVersion(any())).thenReturn(Mono.empty()); Task successFulTask = mock(Task.class); when(successFulTask.run()).thenReturn(Task.Result.COMPLETED); successfulMigration = mock(Migration.class); when(successfulMigration.asTask()).thenReturn(successFulTask); - Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of( + CassandraSchemaTransitions transitions = new CassandraSchemaTransitions(ImmutableMap.of( FROM_OLDER_TO_CURRENT, successfulMigration, - FROM_CURRENT_TO_LATEST, successfulMigration); - testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION); + FROM_CURRENT_TO_LATEST, successfulMigration)); + testee = new CassandraMigrationService(schemaVersionDAO, transitions, version -> new MigrationTask(schemaVersionDAO, transitions, version), LATEST_VERSION); executorService = Executors.newFixedThreadPool(2, NamedThreadFactory.withClassName(getClass())); } @@ -137,9 +138,9 @@ public class CassandraMigrationServiceTest { @Test public void upgradeToVersionShouldThrowOnMissingVersion() throws InterruptedException { - Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of(FROM_OLDER_TO_CURRENT, successfulMigration); + CassandraSchemaTransitions transitions = new CassandraSchemaTransitions(ImmutableMap.of(FROM_OLDER_TO_CURRENT, successfulMigration)); - testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION); + testee = new CassandraMigrationService(schemaVersionDAO, transitions, version -> new MigrationTask(schemaVersionDAO, transitions, version), LATEST_VERSION); when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.of(OLDER_VERSION))); expectedException.expect(NotImplementedException.class); @@ -150,13 +151,13 @@ public class CassandraMigrationServiceTest { @Test public void upgradeToVersionShouldUpdateIntermediarySuccessfulMigrationsInCaseOfError() throws InterruptedException { try { - Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of( + CassandraSchemaTransitions transitions = new CassandraSchemaTransitions(ImmutableMap.of( FROM_OLDER_TO_CURRENT, successfulMigration, FROM_CURRENT_TO_LATEST, () -> { throw new RuntimeException(); - }); + })); - testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION); + testee = new CassandraMigrationService(schemaVersionDAO, transitions, version -> new MigrationTask(schemaVersionDAO, transitions, version), LATEST_VERSION); when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.of(OLDER_VERSION))); expectedException.expect(RuntimeException.class); @@ -169,16 +170,17 @@ public class CassandraMigrationServiceTest { @Test public void partialMigrationShouldThrow() throws InterruptedException { + InMemorySchemaDAO schemaVersionDAO = new InMemorySchemaDAO(OLDER_VERSION); Task failingTask = mock(Task.class); when(failingTask.run()).thenThrow(MigrationException.class); Migration migration1 = failingTask::run; Migration migration2 = successfulMigration; - Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of( + CassandraSchemaTransitions transitions = new CassandraSchemaTransitions(ImmutableMap.of( FROM_OLDER_TO_CURRENT, migration1, - FROM_CURRENT_TO_LATEST, migration2); + FROM_CURRENT_TO_LATEST, migration2)); - testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION); + testee = new CassandraMigrationService(schemaVersionDAO, transitions, version -> new MigrationTask(schemaVersionDAO, transitions, version), LATEST_VERSION); expectedException.expect(MigrationException.class); @@ -187,16 +189,17 @@ public class CassandraMigrationServiceTest { @Test public void partialMigrationShouldAbortMigrations() throws InterruptedException { + InMemorySchemaDAO schemaVersionDAO = new InMemorySchemaDAO(OLDER_VERSION); Task failingTask = mock(Task.class); when(failingTask.run()).thenThrow(MigrationException.class); Migration migration1 = failingTask::run; Migration migration2 = mock(Migration.class); - Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of( + CassandraSchemaTransitions transitions = new CassandraSchemaTransitions(ImmutableMap.of( FROM_OLDER_TO_CURRENT, migration1, - FROM_CURRENT_TO_LATEST, migration2); + FROM_CURRENT_TO_LATEST, migration2)); - testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION); + testee = new CassandraMigrationService(schemaVersionDAO, transitions, version -> new MigrationTask(schemaVersionDAO, transitions, version), LATEST_VERSION); expectedException.expect(MigrationException.class); diff --git a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java index 628def0..a4bdd83 100644 --- a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java +++ b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java @@ -21,6 +21,7 @@ package org.apache.james.modules.server; import org.apache.james.backends.cassandra.migration.CassandraMigrationService; import org.apache.james.backends.cassandra.migration.Migration; +import org.apache.james.backends.cassandra.migration.MigrationTask; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager; import org.apache.james.backends.cassandra.versions.SchemaTransition; import org.apache.james.backends.cassandra.versions.SchemaVersion; @@ -47,10 +48,13 @@ public class CassandraRoutesModule extends AbstractModule { @Override protected void configure() { + bind(MigrationTask.Impl.class).in(Scopes.SINGLETON); bind(CassandraRoutesModule.class).in(Scopes.SINGLETON); bind(CassandraMailboxMergingRoutes.class).in(Scopes.SINGLETON); bind(CassandraMigrationService.class).in(Scopes.SINGLETON); + bind(MigrationTask.Factory.class).to(MigrationTask.Impl.class); + Multibinder<Routes> routesMultibinder = Multibinder.newSetBinder(binder(), Routes.class); routesMultibinder.addBinding().to(CassandraMigrationRoutes.class); routesMultibinder.addBinding().to(CassandraMailboxMergingRoutes.class); diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java index f3b09ff..ff1126b 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Optional; import org.apache.james.backends.cassandra.migration.CassandraMigrationService; +import org.apache.james.backends.cassandra.migration.CassandraSchemaTransitions; import org.apache.james.backends.cassandra.migration.Migration; import org.apache.james.backends.cassandra.migration.MigrationTask; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; @@ -69,19 +70,20 @@ public class CassandraMigrationRoutesTest { private MemoryTaskManager taskManager; private void createServer() throws InterruptedException { - Migration successfulMigration = mock(Migration.class); + Migration successfulMigration = () -> { }; - Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of( + CassandraSchemaTransitions transitions = new CassandraSchemaTransitions(ImmutableMap.of( FROM_OLDER_TO_CURRENT, successfulMigration, - FROM_CURRENT_TO_LATEST, successfulMigration); + FROM_CURRENT_TO_LATEST, successfulMigration)); schemaVersionDAO = mock(CassandraSchemaVersionDAO.class); + when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.empty())); when(schemaVersionDAO.updateVersion(any())).thenReturn(Mono.empty()); taskManager = new MemoryTaskManager(); JsonTransformer jsonTransformer = new JsonTransformer(); webAdminServer = WebAdminUtils.createWebAdminServer( - new CassandraMigrationRoutes(new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION), + new CassandraMigrationRoutes(new CassandraMigrationService(schemaVersionDAO, transitions, version -> new MigrationTask(schemaVersionDAO, transitions, version), LATEST_VERSION), taskManager, jsonTransformer), new TasksRoutes(taskManager, jsonTransformer)) .start(); @@ -224,7 +226,7 @@ public class CassandraMigrationRoutesTest { .basePath(TasksRoutes.BASE) .get(taskId + "/await"); - verify(schemaVersionDAO, times(1)).getCurrentSchemaVersion(); + verify(schemaVersionDAO, atLeastOnce()).getCurrentSchemaVersion(); verifyNoMoreInteractions(schemaVersionDAO); } @@ -311,7 +313,7 @@ public class CassandraMigrationRoutesTest { .basePath(TasksRoutes.BASE) .get(taskId + "/await"); - verify(schemaVersionDAO, times(1)).getCurrentSchemaVersion(); + verify(schemaVersionDAO, atLeastOnce()).getCurrentSchemaVersion(); verifyNoMoreInteractions(schemaVersionDAO); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
