This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3f390a49f6956e5f3961dd7fd441397443c61b76 Author: Matthieu Baechler <[email protected]> AuthorDate: Mon Jul 22 16:36:49 2019 +0200 JAMES-2813 remove inheritance between Migration and Task to avoid dealing with Result in migration --- .../migration/CassandraMigrationService.java | 10 ++-- .../backends/cassandra/migration/Migration.java | 17 ++++++- .../cassandra/migration/MigrationTask.java | 5 +- .../migration/CassandraMigrationServiceTest.java | 23 ++++++---- .../migration/AttachmentMessageIdCreation.java | 19 +++----- .../mail/migration/AttachmentV2Migration.java | 22 +++------ .../mail/migration/MailboxPathV2Migration.java | 48 +++++++++++--------- .../migration/AttachmentMessageIdCreationTest.java | 23 +++++----- .../mail/migration/AttachmentV2MigrationTest.java | 35 +++++++------- .../mail/migration/MailboxPathV2MigrationTest.java | 2 +- .../modules/server/CassandraRoutesModule.java | 2 +- .../migration/MappingsSourcesMigration.java | 53 ++++++++++++++-------- .../migration/MappingsSourcesMigrationTest.java | 27 ++++++----- .../CassandraMappingsSolveInconsistenciesTask.java | 4 +- .../webadmin/routes/CassandraMigrationRoutes.java | 6 +-- .../routes/CassandraMigrationRoutesTest.java | 2 - 16 files changed, 160 insertions(+), 138 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java index 5dcf3b9..003d42a 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.IntStream; import javax.inject.Inject; import javax.inject.Named; @@ -34,6 +33,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; import org.apache.james.backends.cassandra.versions.SchemaTransition; import org.apache.james.backends.cassandra.versions.SchemaVersion; +import org.apache.james.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +59,7 @@ public class CassandraMigrationService { return Optional.of(latestVersion); } - public Migration upgradeToVersion(SchemaVersion newVersion) { + public Task upgradeToVersion(SchemaVersion newVersion) { SchemaVersion currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION); List<Migration> migrations = new ArrayList<>(); @@ -82,7 +82,7 @@ public class CassandraMigrationService { return transition; } - public Migration upgradeToLastVersion() { + public Task upgradeToLastVersion() { return upgradeToVersion(latestVersion); } @@ -91,11 +91,11 @@ public class CassandraMigrationService { SchemaVersion currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION); SchemaVersion targetVersion = transition.to(); if (currentVersion.isAfterOrEquals(targetVersion)) { - return Migration.Result.COMPLETED; + return; } logger.info("Migrating to version {} ", transition.toAsString()); - return allMigrationClazz.get(transition).run() + allMigrationClazz.get(transition).asTask().run() .onComplete(() -> schemaVersionDAO.updateVersion(transition.to()).block(), () -> logger.info("Migrating to version {} done", transition.toAsString())) .onFailure(() -> logger.warn(failureMessage(transition.to())), diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java index b4f15f4..514b7ca 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java @@ -21,6 +21,21 @@ package org.apache.james.backends.cassandra.migration; import org.apache.james.task.Task; -public interface Migration extends Task { +public interface Migration { + + void apply() throws InterruptedException; + + default Task asTask() { + return this::runTask; + } + + default Task.Result runTask() throws InterruptedException { + try { + this.apply(); + return Task.Result.COMPLETED; + } catch (RuntimeException e) { + return Task.Result.PARTIAL; + } + } } diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java index c88b104..62276aa 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java @@ -23,11 +23,12 @@ import java.util.List; import java.util.Optional; import org.apache.james.backends.cassandra.versions.SchemaVersion; +import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; import com.google.common.collect.ImmutableList; -public class MigrationTask implements Migration { +public class MigrationTask implements Task { public static final String CASSANDRA_MIGRATION = "CassandraMigration"; public static class Details implements TaskExecutionDetails.AdditionalInformation { @@ -53,7 +54,7 @@ public class MigrationTask implements Migration { @Override public Result run() throws InterruptedException { for (Migration migration: migrations) { - migration.run(); + migration.asTask().run(); } return Result.COMPLETED; } diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java index e356612..0cae5d7 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java @@ -70,8 +70,10 @@ public class CassandraMigrationServiceTest { schemaVersionDAO = mock(CassandraSchemaVersionDAO.class); when(schemaVersionDAO.updateVersion(any())).thenReturn(Mono.empty()); + Task successFulTask = mock(Task.class); + when(successFulTask.run()).thenReturn(Task.Result.COMPLETED); successfulMigration = mock(Migration.class); - when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED); + when(successfulMigration.asTask()).thenReturn(successFulTask); Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of( FROM_OLDER_TO_CURRENT, successfulMigration, FROM_CURRENT_TO_LATEST, successfulMigration); @@ -150,7 +152,9 @@ public class CassandraMigrationServiceTest { try { Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of( FROM_OLDER_TO_CURRENT, successfulMigration, - FROM_CURRENT_TO_LATEST, () -> Migration.Result.PARTIAL); + FROM_CURRENT_TO_LATEST, () -> { + throw new RuntimeException(); + }); testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION); when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.of(OLDER_VERSION))); @@ -165,8 +169,9 @@ public class CassandraMigrationServiceTest { @Test public void partialMigrationShouldThrow() throws InterruptedException { - Migration migration1 = mock(Migration.class); - when(migration1.run()).thenReturn(Migration.Result.PARTIAL); + Task failingTask = mock(Task.class); + when(failingTask.run()).thenThrow(MigrationException.class); + Migration migration1 = failingTask::run; Migration migration2 = successfulMigration; Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of( @@ -182,10 +187,10 @@ public class CassandraMigrationServiceTest { @Test public void partialMigrationShouldAbortMigrations() throws InterruptedException { - Migration migration1 = mock(Migration.class); - when(migration1.run()).thenReturn(Migration.Result.PARTIAL); + Task failingTask = mock(Task.class); + when(failingTask.run()).thenThrow(MigrationException.class); + Migration migration1 = failingTask::run; Migration migration2 = mock(Migration.class); - when(migration2.run()).thenReturn(Migration.Result.COMPLETED); Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of( FROM_OLDER_TO_CURRENT, migration1, @@ -198,8 +203,8 @@ public class CassandraMigrationServiceTest { try { testee.upgradeToVersion(LATEST_VERSION).run(); } finally { - verify(migration1, times(1)).run(); - verifyNoMoreInteractions(migration1); + verify(failingTask, times(1)).run(); + verifyNoMoreInteractions(failingTask); verifyZeroInteractions(migration2); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java index bb65783..b70b8db 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java @@ -26,7 +26,6 @@ import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMessageIdDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO.MessageIdAttachmentIds; import org.apache.james.mailbox.model.MessageId; -import org.apache.james.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,24 +45,18 @@ public class AttachmentMessageIdCreation implements Migration { } @Override - public Result run() { - return cassandraMessageDAO.retrieveAllMessageIdAttachmentIds() + public void apply() { + cassandraMessageDAO.retrieveAllMessageIdAttachmentIds() .flatMap(this::createIndex) - .reduce(Result.COMPLETED, Task::combine) - .onErrorResume(this::errorHandling) - .block(); + .doOnError(e -> LOGGER.error("Error while creation attachmentId -> messageIds index", e)) + .blockLast(); } - private Mono<Result> createIndex(MessageIdAttachmentIds message) { + private Mono<Void> createIndex(MessageIdAttachmentIds message) { MessageId messageId = message.getMessageId(); return Flux.fromIterable(message.getAttachmentId()) .flatMap(attachmentId -> attachmentMessageIdDAO.storeAttachmentForMessageId(attachmentId, messageId)) - .then() - .thenReturn(Result.COMPLETED); + .then(); } - private Mono<Result> errorHandling(Throwable e) { - LOGGER.error("Error while creation attachmentId -> messageIds index", e); - return Mono.just(Result.PARTIAL); - } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java index 1bcb329..856afd0 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java @@ -26,7 +26,6 @@ import org.apache.james.blob.api.BlobStore; import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAO; import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2; import org.apache.james.mailbox.model.Attachment; -import org.apache.james.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,24 +47,17 @@ public class AttachmentV2Migration implements Migration { } @Override - public Result run() { - return attachmentDAOV1.retrieveAll() - .flatMap(this::migrateAttachment) - .reduce(Result.COMPLETED, Task::combine) - .onErrorResume(this::handlError) - .block(); + public void apply() { + attachmentDAOV1.retrieveAll() + .flatMap(this::migrateAttachment) + .doOnError(t -> LOGGER.error("Error while performing attachmentDAO V2 migration", t)) + .blockLast(); } - private Mono<Result> handlError(Throwable throwable) { - LOGGER.error("Error while performing attachmentDAO V2 migration", throwable); - return Mono.just(Result.PARTIAL); - } - - private Mono<Result> migrateAttachment(Attachment attachment) { + private Mono<Void> migrateAttachment(Attachment attachment) { return blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes()) .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)) .flatMap(attachmentDAOV2::storeAttachment) - .then(attachmentDAOV1.deleteAttachment(attachment.getAttachmentId())) - .thenReturn(Result.COMPLETED); + .then(attachmentDAOV1.deleteAttachment(attachment.getAttachmentId())); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java index cfa5f60..a063457 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java @@ -30,7 +30,6 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAOImpl; import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV2DAO; import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,38 +68,43 @@ public class MailboxPathV2Migration implements Migration { } @Override - public Result run() { - return daoV1.readAll() + public void apply() { + daoV1.readAll() .flatMap(this::migrate) - .reduce(Result.COMPLETED, Task::combine) - .onErrorResume(this::handleErrorRun) - .block(); - } - - private Mono<Result> handleErrorRun(Throwable throwable) { - LOGGER.error("Error while performing migration", throwable); - return Mono.just(Result.PARTIAL); + .doOnError(t -> LOGGER.error("Error while performing migration", t)) + .blockLast(); } - public Mono<Result> migrate(CassandraIdAndPath idAndPath) { + private Mono<Void> migrate(CassandraIdAndPath idAndPath) { return daoV2.save(idAndPath.getMailboxPath(), idAndPath.getCassandraId()) .then(daoV1.delete(idAndPath.getMailboxPath())) - .thenReturn(Result.COMPLETED) - .onErrorResume(error -> handleErrorMigrate(idAndPath, error)); + .onErrorResume(error -> handleErrorMigrate(idAndPath, error)) + .then(); } - private Mono<Result> handleErrorMigrate(CassandraIdAndPath idAndPath, Throwable throwable) { + private Mono<Void> handleErrorMigrate(CassandraIdAndPath idAndPath, Throwable throwable) { LOGGER.error("Error while performing migration for path {}", idAndPath.getMailboxPath(), throwable); - return Mono.just(Result.PARTIAL); + return Mono.empty(); } @Override - public String type() { - return "Cassandra_mailboxPathV2Migration"; + public Task asTask() { + return new Task() { + @Override + public Result run() throws InterruptedException { + return runTask(); + } + + @Override + public String type() { + return "Cassandra_mailboxPathV2Migration"; + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + return Optional.of(additionalInformation); + } + }; } - @Override - public Optional<TaskExecutionDetails.AdditionalInformation> details() { - return Optional.of(additionalInformation); - } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java index 89ad9b4..39d69fd 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java @@ -36,7 +36,6 @@ import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.CassandraRestartExtension; import org.apache.james.backends.cassandra.components.CassandraModule; -import org.apache.james.backends.cassandra.migration.Migration; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.cassandra.CassandraBlobModule; @@ -54,6 +53,7 @@ import org.apache.james.mailbox.model.MessageAttachment; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.apache.james.task.Task; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -61,7 +61,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; - import reactor.core.publisher.Flux; @ExtendWith(CassandraRestartExtension.class) @@ -102,9 +101,9 @@ class AttachmentMessageIdCreationTest { } @Test - void emptyMigrationShouldSucceed() { - assertThat(migration.run()) - .isEqualTo(Migration.Result.COMPLETED); + void emptyMigrationShouldSucceed() throws InterruptedException { + assertThat(migration.asTask().run()) + .isEqualTo(Task.Result.COMPLETED); } @Test @@ -114,8 +113,8 @@ class AttachmentMessageIdCreationTest { cassandraMessageDAO.save(message).block(); - assertThat(migration.run()) - .isEqualTo(Migration.Result.COMPLETED); + assertThat(migration.asTask().run()) + .isEqualTo(Task.Result.COMPLETED); } @Test @@ -125,8 +124,8 @@ class AttachmentMessageIdCreationTest { cassandraMessageDAO.save(message).block(); - assertThat(migration.run()) - .isEqualTo(Migration.Result.COMPLETED); + assertThat(migration.asTask().run()) + .isEqualTo(Task.Result.COMPLETED); } @Test @@ -136,7 +135,7 @@ class AttachmentMessageIdCreationTest { cassandraMessageDAO.save(message).block(); - migration.run(); + migration.apply(); assertThat(attachmentMessageIdDAO.getOwnerMessageIds(attachment.getAttachmentId()).toIterable()) .containsExactly(messageId); @@ -150,7 +149,7 @@ class AttachmentMessageIdCreationTest { when(cassandraMessageDAO.retrieveAllMessageIdAttachmentIds()).thenReturn(Flux.error(new RuntimeException("Mocked exception"))); - assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL); } @Test @@ -167,7 +166,7 @@ class AttachmentMessageIdCreationTest { when(attachmentMessageIdDAO.storeAttachmentForMessageId(any(AttachmentId.class), any(MessageId.class))) .thenThrow(new RuntimeException()); - assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL); } private SimpleMailboxMessage createMessage(MessageId messageId, Collection<MessageAttachment> attachments) { diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java index 29aeede..2100a37 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java @@ -41,6 +41,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2; import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule; import org.apache.james.mailbox.model.Attachment; import org.apache.james.mailbox.model.AttachmentId; +import org.apache.james.task.Task; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -89,9 +90,9 @@ class AttachmentV2MigrationTest { } @Test - void emptyMigrationShouldSucceed() { - assertThat(migration.run()) - .isEqualTo(Migration.Result.COMPLETED); + void emptyMigrationShouldSucceed() throws InterruptedException { + assertThat(migration.asTask().run()) + .isEqualTo(Task.Result.COMPLETED); } @Test @@ -99,8 +100,8 @@ class AttachmentV2MigrationTest { attachmentDAO.storeAttachment(attachment1).block(); attachmentDAO.storeAttachment(attachment2).block(); - assertThat(migration.run()) - .isEqualTo(Migration.Result.COMPLETED); + assertThat(migration.asTask().run()) + .isEqualTo(Task.Result.COMPLETED); } @Test @@ -108,7 +109,7 @@ class AttachmentV2MigrationTest { attachmentDAO.storeAttachment(attachment1).block(); attachmentDAO.storeAttachment(attachment2).block(); - migration.run(); + migration.apply(); assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID).blockOptional()) .contains(CassandraAttachmentDAOV2.from(attachment1, BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); @@ -125,7 +126,7 @@ class AttachmentV2MigrationTest { attachmentDAO.storeAttachment(attachment1).block(); attachmentDAO.storeAttachment(attachment2).block(); - migration.run(); + migration.apply(); assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID).blockOptional()) .isEmpty(); @@ -134,7 +135,7 @@ class AttachmentV2MigrationTest { } @Test - void runShouldReturnPartialWhenInitialReadFail() { + void runShouldReturnPartialWhenInitialReadFail() throws InterruptedException { CassandraAttachmentDAO attachmentDAO = mock(CassandraAttachmentDAO.class); CassandraAttachmentDAOV2 attachmentDAOV2 = mock(CassandraAttachmentDAOV2.class); CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class); @@ -142,11 +143,11 @@ class AttachmentV2MigrationTest { when(attachmentDAO.retrieveAll()).thenReturn(Flux.error(new RuntimeException())); - assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL); } @Test - void runShouldReturnPartialWhenSavingBlobsFails() { + void runShouldReturnPartialWhenSavingBlobsFails() throws InterruptedException { CassandraAttachmentDAO attachmentDAO = mock(CassandraAttachmentDAO.class); CassandraAttachmentDAOV2 attachmentDAOV2 = mock(CassandraAttachmentDAOV2.class); CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class); @@ -157,11 +158,11 @@ class AttachmentV2MigrationTest { attachment2)); when(blobsDAO.save(any(BucketName.class), any(byte[].class))).thenThrow(new RuntimeException()); - assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL); } @Test - void runShouldReturnPartialWhenSavingAttachmentV2Fail() { + void runShouldReturnPartialWhenSavingAttachmentV2Fail() throws InterruptedException { CassandraAttachmentDAO attachmentDAO = mock(CassandraAttachmentDAO.class); CassandraAttachmentDAOV2 attachmentDAOV2 = mock(CassandraAttachmentDAOV2.class); CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class); @@ -176,11 +177,11 @@ class AttachmentV2MigrationTest { .thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment2.getBytes()))); when(attachmentDAOV2.storeAttachment(any())).thenThrow(new RuntimeException()); - assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL); } @Test - void runShouldReturnPartialWhenDeleteV1AttachmentFail() { + void runShouldReturnPartialWhenDeleteV1AttachmentFail() throws InterruptedException { CassandraAttachmentDAO attachmentDAO = mock(CassandraAttachmentDAO.class); CassandraAttachmentDAOV2 attachmentDAOV2 = mock(CassandraAttachmentDAOV2.class); CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class); @@ -196,11 +197,11 @@ class AttachmentV2MigrationTest { when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty()); when(attachmentDAO.deleteAttachment(any())).thenThrow(new RuntimeException()); - assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL); } @Test - void runShouldReturnPartialWhenAtLeastOneAttachmentMigrationFails() { + void runShouldReturnPartialWhenAtLeastOneAttachmentMigrationFails() throws InterruptedException { CassandraAttachmentDAO attachmentDAO = mock(CassandraAttachmentDAO.class); CassandraAttachmentDAOV2 attachmentDAOV2 = mock(CassandraAttachmentDAOV2.class); CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class); @@ -216,7 +217,7 @@ class AttachmentV2MigrationTest { when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty()); when(attachmentDAO.deleteAttachment(any())).thenReturn(Mono.empty()); - assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL); } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java index e127898..6fce3c2 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java @@ -127,7 +127,7 @@ class MailboxPathV2MigrationTest { void migrationTaskShouldMoveDataToMostRecentDao() { daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).block(); - new MailboxPathV2Migration(daoV1, daoV2).run(); + new MailboxPathV2Migration(daoV1, daoV2).apply(); SoftAssertions softly = new SoftAssertions(); softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).blockOptional()).isEmpty(); diff --git a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java index 025cbef..628def0 100644 --- a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java +++ b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java @@ -56,7 +56,7 @@ public class CassandraRoutesModule extends AbstractModule { routesMultibinder.addBinding().to(CassandraMailboxMergingRoutes.class); MapBinder<SchemaTransition, Migration> allMigrationClazzBinder = MapBinder.newMapBinder(binder(), SchemaTransition.class, Migration.class); - allMigrationClazzBinder.addBinding(FROM_V2_TO_V3).toInstance(() -> Migration.Result.COMPLETED); + allMigrationClazzBinder.addBinding(FROM_V2_TO_V3).toInstance(() -> { }); allMigrationClazzBinder.addBinding(FROM_V3_TO_V4).to(AttachmentV2Migration.class); allMigrationClazzBinder.addBinding(FROM_V4_TO_V5).to(AttachmentMessageIdCreation.class); allMigrationClazzBinder.addBinding(FROM_V5_TO_V6).to(MailboxPathV2Migration.class); diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java index 9d3de07..990d994 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java @@ -26,6 +26,7 @@ import javax.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.migration.Migration; +import org.apache.james.backends.cassandra.migration.MigrationException; import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO; import org.apache.james.rrt.cassandra.CassandraRecipientRewriteTableDAO; import org.apache.james.rrt.lib.Mapping; @@ -74,38 +75,52 @@ public class MappingsSourcesMigration implements Migration { } @Override - public Result run() { - return cassandraRecipientRewriteTableDAO.getAllMappings() + public void apply() { + cassandraRecipientRewriteTableDAO.getAllMappings() .flatMap(this::migrate) - .reduce(Result.COMPLETED, Task::combine) - .doOnError(e -> LOGGER.error("Error while migrating mappings sources", e)) - .onErrorResume(e -> Mono.just(Result.PARTIAL)) + .then(Mono.fromRunnable(() -> { + if (errorMappingsCount.get() > 0) { + throw new MigrationException("MappingsSourcesMigration failed"); + } + })) + .doOnError(t -> LOGGER.error("Error while migrating mappings sources", t)) .block(); } - private Mono<Result> migrate(Pair<MappingSource, Mapping> mappingEntry) { + private Mono<Void> migrate(Pair<MappingSource, Mapping> mappingEntry) { return cassandraMappingsSourcesDAO.addMapping(mappingEntry.getRight(), mappingEntry.getLeft()) - .then(Mono.fromCallable(() -> { - successfulMappingsCount.incrementAndGet(); - return Result.COMPLETED; - })) - .onErrorResume(e -> { + .then(Mono.fromCallable(successfulMappingsCount::incrementAndGet)) + .then() + .onErrorResume(t -> { LOGGER.error("Error while performing migration of mapping source: {} with mapping: {}", - mappingEntry.getLeft().asString(), mappingEntry.getRight().asString(), e); + mappingEntry.getLeft().asString(), mappingEntry.getRight().asString(), t); errorMappingsCount.incrementAndGet(); - return Mono.just(Result.PARTIAL); + return Mono.empty(); }); } @Override - public String type() { - return TYPE; + public Task asTask() { + return new Task() { + + @Override + public Result run() throws InterruptedException { + return runTask(); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + return Optional.of(createAdditionalInformation()); + } + }; } - @Override - public Optional<TaskExecutionDetails.AdditionalInformation> details() { - return Optional.of(createAdditionalInformation()); - } + AdditionalInformation createAdditionalInformation() { return new AdditionalInformation( diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java index 64d066d..c66ce36 100644 --- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java +++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java @@ -31,7 +31,6 @@ import java.util.stream.IntStream; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; -import org.apache.james.backends.cassandra.migration.Migration; import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.core.Domain; import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO; @@ -75,22 +74,22 @@ class MappingsSourcesMigrationTest { } @Test - void emptyMigrationShouldSucceed() { - assertThat(migration.run()).isEqualTo(Migration.Result.COMPLETED); + void emptyMigrationShouldSucceed() throws InterruptedException { + assertThat(migration.asTask().run()).isEqualTo(Task.Result.COMPLETED); } @Test - void migrationShouldSucceedWithData() { + void migrationShouldSucceedWithData() throws InterruptedException { cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block(); - assertThat(migration.run()).isEqualTo(Task.Result.COMPLETED); + assertThat(migration.asTask().run()).isEqualTo(Task.Result.COMPLETED); } @Test void migrationShouldCreateMappingSourceFromMapping() { cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block(); - migration.run(); + migration.apply(); assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block()) .containsExactly(SOURCE); @@ -106,7 +105,7 @@ class MappingsSourcesMigrationTest { cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block(); cassandraRecipientRewriteTableDAO.addMapping(source2, MAPPING).block(); - migration.run(); + migration.apply(); assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block()) .containsOnly(SOURCE, source2); @@ -116,20 +115,20 @@ class MappingsSourcesMigrationTest { } @Test - void migrationShouldReturnPartialWhenGetAllMappingsFromMappingsFail() { + void migrationShouldReturnPartialWhenGetAllMappingsFromMappingsFail() throws InterruptedException { CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = mock(CassandraRecipientRewriteTableDAO.class); CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = mock(CassandraMappingsSourcesDAO.class); migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO); when(cassandraRecipientRewriteTableDAO.getAllMappings()).thenReturn(Flux.error(new RuntimeException())); - assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL); assertThat(migration.createAdditionalInformation().getSuccessfulMappingsCount()).isEqualTo(0); assertThat(migration.createAdditionalInformation().getErrorMappingsCount()).isEqualTo(0); } @Test - void migrationShouldReturnPartialAddMappingFails() { + void migrationShouldReturnPartialAddMappingFails() throws InterruptedException { CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = mock(CassandraRecipientRewriteTableDAO.class); CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = mock(CassandraMappingsSourcesDAO.class); migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO); @@ -139,13 +138,13 @@ class MappingsSourcesMigrationTest { when(cassandraMappingsSourcesDAO.addMapping(any(Mapping.class), any(MappingSource.class))) .thenReturn(Mono.error(new RuntimeException())); - assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL); assertThat(migration.createAdditionalInformation().getSuccessfulMappingsCount()).isEqualTo(0); assertThat(migration.createAdditionalInformation().getErrorMappingsCount()).isEqualTo(1); } @Test - void migrationShouldHaveCorrectErrorCountWhenMultipleAddMappingFails() { + void migrationShouldHaveCorrectErrorCountWhenMultipleAddMappingFails() throws InterruptedException { MappingSource source2 = MappingSource.fromUser("bob", Domain.LOCALHOST); CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = mock(CassandraRecipientRewriteTableDAO.class); @@ -159,7 +158,7 @@ class MappingsSourcesMigrationTest { when(cassandraMappingsSourcesDAO.addMapping(any(Mapping.class), any(MappingSource.class))) .thenReturn(Mono.error(new RuntimeException())); - assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL); assertThat(migration.createAdditionalInformation().getSuccessfulMappingsCount()).isEqualTo(0); assertThat(migration.createAdditionalInformation().getErrorMappingsCount()).isEqualTo(2); } @@ -171,7 +170,7 @@ class MappingsSourcesMigrationTest { .addMapping(MappingSource.parse("source" + i + "@domain"), MAPPING).block()); ConcurrentTestRunner.builder() - .operation((threadNumber, step) -> migration.run()) + .operation((threadNumber, step) -> migration.apply()) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) .runSuccessfullyWithin(Duration.ofMinutes(1)); diff --git a/server/protocols/webadmin/webadmin-cassandra-data/src/main/java/org/apache/james/webadmin/service/CassandraMappingsSolveInconsistenciesTask.java b/server/protocols/webadmin/webadmin-cassandra-data/src/main/java/org/apache/james/webadmin/service/CassandraMappingsSolveInconsistenciesTask.java index d487f34..8752e15 100644 --- a/server/protocols/webadmin/webadmin-cassandra-data/src/main/java/org/apache/james/webadmin/service/CassandraMappingsSolveInconsistenciesTask.java +++ b/server/protocols/webadmin/webadmin-cassandra-data/src/main/java/org/apache/james/webadmin/service/CassandraMappingsSolveInconsistenciesTask.java @@ -33,13 +33,13 @@ import reactor.core.publisher.Mono; public class CassandraMappingsSolveInconsistenciesTask implements Task { public static final String TYPE = "cassandraMappingsSolveInconsistencies"; - private final MappingsSourcesMigration mappingsSourcesMigration; + private final Task mappingsSourcesMigration; private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO; @Inject CassandraMappingsSolveInconsistenciesTask(MappingsSourcesMigration mappingsSourcesMigration, CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO) { - this.mappingsSourcesMigration = mappingsSourcesMigration; + this.mappingsSourcesMigration = mappingsSourcesMigration.asTask(); this.cassandraMappingsSourcesDAO = cassandraMappingsSourcesDAO; } diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java index 0cf9ec5..5effb68 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java @@ -26,7 +26,7 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import org.apache.james.backends.cassandra.migration.CassandraMigrationService; -import org.apache.james.backends.cassandra.migration.Migration; +import org.apache.james.task.Task; import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; import org.apache.james.webadmin.Routes; @@ -107,7 +107,7 @@ public class CassandraMigrationRoutes implements Routes { }) public Object upgradeToLatest(Response response) { try { - Migration migration = cassandraMigrationService.upgradeToLastVersion(); + Task migration = cassandraMigrationService.upgradeToLastVersion(); TaskId taskId = taskManager.submit(migration); return TaskIdDto.respond(response, taskId); } catch (IllegalStateException e) { @@ -143,7 +143,7 @@ public class CassandraMigrationRoutes implements Routes { LOGGER.debug("Cassandra upgrade launched"); try { CassandraVersionRequest cassandraVersionRequest = CassandraVersionRequest.parse(request.body()); - Migration migration = cassandraMigrationService.upgradeToVersion(cassandraVersionRequest.getValue()); + Task migration = cassandraMigrationService.upgradeToVersion(cassandraVersionRequest.getValue()); TaskId taskId = taskManager.submit(migration); return TaskIdDto.from(taskId); } catch (NullPointerException | IllegalArgumentException e) { diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java index 22beef0..f3b09ff 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java @@ -54,7 +54,6 @@ import org.junit.Ignore; import org.junit.Test; import com.google.common.collect.ImmutableMap; - import io.restassured.RestAssured; import io.restassured.http.ContentType; import reactor.core.publisher.Mono; @@ -71,7 +70,6 @@ public class CassandraMigrationRoutesTest { private void createServer() throws InterruptedException { Migration successfulMigration = mock(Migration.class); - when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED); Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of( FROM_OLDER_TO_CURRENT, successfulMigration, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
