This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3f390a49f6956e5f3961dd7fd441397443c61b76
Author: Matthieu Baechler <[email protected]>
AuthorDate: Mon Jul 22 16:36:49 2019 +0200

    JAMES-2813 remove inheritance between Migration and Task to avoid dealing 
with Result in migration
---
 .../migration/CassandraMigrationService.java       | 10 ++--
 .../backends/cassandra/migration/Migration.java    | 17 ++++++-
 .../cassandra/migration/MigrationTask.java         |  5 +-
 .../migration/CassandraMigrationServiceTest.java   | 23 ++++++----
 .../migration/AttachmentMessageIdCreation.java     | 19 +++-----
 .../mail/migration/AttachmentV2Migration.java      | 22 +++------
 .../mail/migration/MailboxPathV2Migration.java     | 48 +++++++++++---------
 .../migration/AttachmentMessageIdCreationTest.java | 23 +++++-----
 .../mail/migration/AttachmentV2MigrationTest.java  | 35 +++++++-------
 .../mail/migration/MailboxPathV2MigrationTest.java |  2 +-
 .../modules/server/CassandraRoutesModule.java      |  2 +-
 .../migration/MappingsSourcesMigration.java        | 53 ++++++++++++++--------
 .../migration/MappingsSourcesMigrationTest.java    | 27 ++++++-----
 .../CassandraMappingsSolveInconsistenciesTask.java |  4 +-
 .../webadmin/routes/CassandraMigrationRoutes.java  |  6 +--
 .../routes/CassandraMigrationRoutesTest.java       |  2 -
 16 files changed, 160 insertions(+), 138 deletions(-)

diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
index 5dcf3b9..003d42a 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.IntStream;
 
 import javax.inject.Inject;
 import javax.inject.Named;
@@ -34,6 +33,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.backends.cassandra.versions.SchemaTransition;
 import org.apache.james.backends.cassandra.versions.SchemaVersion;
+import org.apache.james.task.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +59,7 @@ public class CassandraMigrationService {
         return Optional.of(latestVersion);
     }
 
-    public Migration upgradeToVersion(SchemaVersion newVersion) {
+    public Task upgradeToVersion(SchemaVersion newVersion) {
         SchemaVersion currentVersion = 
getCurrentVersion().orElse(DEFAULT_VERSION);
 
         List<Migration> migrations = new ArrayList<>();
@@ -82,7 +82,7 @@ public class CassandraMigrationService {
         return transition;
     }
 
-    public Migration upgradeToLastVersion() {
+    public Task upgradeToLastVersion() {
         return upgradeToVersion(latestVersion);
     }
 
@@ -91,11 +91,11 @@ public class CassandraMigrationService {
             SchemaVersion currentVersion = 
getCurrentVersion().orElse(DEFAULT_VERSION);
             SchemaVersion targetVersion = transition.to();
             if (currentVersion.isAfterOrEquals(targetVersion)) {
-                return Migration.Result.COMPLETED;
+                return;
             }
 
             logger.info("Migrating to version {} ", transition.toAsString());
-            return allMigrationClazz.get(transition).run()
+            allMigrationClazz.get(transition).asTask().run()
                 .onComplete(() -> 
schemaVersionDAO.updateVersion(transition.to()).block(),
                     () -> logger.info("Migrating to version {} done", 
transition.toAsString()))
                 .onFailure(() -> logger.warn(failureMessage(transition.to())),
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java
index b4f15f4..514b7ca 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java
@@ -21,6 +21,21 @@ package org.apache.james.backends.cassandra.migration;
 
 import org.apache.james.task.Task;
 
-public interface Migration extends Task {
+public interface Migration {
+
+    void apply() throws InterruptedException;
+
+    default Task asTask() {
+        return this::runTask;
+    }
+
+    default Task.Result runTask() throws InterruptedException {
+        try {
+            this.apply();
+            return Task.Result.COMPLETED;
+        } catch (RuntimeException e) {
+            return Task.Result.PARTIAL;
+        }
+    }
 
 }
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java
index c88b104..62276aa 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java
@@ -23,11 +23,12 @@ import java.util.List;
 import java.util.Optional;
 
 import org.apache.james.backends.cassandra.versions.SchemaVersion;
+import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 
 import com.google.common.collect.ImmutableList;
 
-public class MigrationTask implements Migration {
+public class MigrationTask implements Task {
     public static final String CASSANDRA_MIGRATION = "CassandraMigration";
 
     public static class Details implements 
TaskExecutionDetails.AdditionalInformation {
@@ -53,7 +54,7 @@ public class MigrationTask implements Migration {
     @Override
     public Result run() throws InterruptedException {
         for (Migration migration: migrations) {
-            migration.run();
+            migration.asTask().run();
         }
         return Result.COMPLETED;
     }
diff --git 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
index e356612..0cae5d7 100644
--- 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
+++ 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
@@ -70,8 +70,10 @@ public class CassandraMigrationServiceTest {
         schemaVersionDAO = mock(CassandraSchemaVersionDAO.class);
         when(schemaVersionDAO.updateVersion(any())).thenReturn(Mono.empty());
 
+        Task successFulTask = mock(Task.class);
+        when(successFulTask.run()).thenReturn(Task.Result.COMPLETED);
         successfulMigration = mock(Migration.class);
-        when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED);
+        when(successfulMigration.asTask()).thenReturn(successFulTask);
         Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of(
             FROM_OLDER_TO_CURRENT, successfulMigration,
             FROM_CURRENT_TO_LATEST, successfulMigration);
@@ -150,7 +152,9 @@ public class CassandraMigrationServiceTest {
         try {
             Map<SchemaTransition, Migration> allMigrationClazz = 
ImmutableMap.of(
                 FROM_OLDER_TO_CURRENT, successfulMigration,
-                FROM_CURRENT_TO_LATEST, () -> Migration.Result.PARTIAL);
+                FROM_CURRENT_TO_LATEST, () -> {
+                    throw new RuntimeException();
+                });
 
             testee = new CassandraMigrationService(schemaVersionDAO, 
allMigrationClazz, LATEST_VERSION);
             
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.of(OLDER_VERSION)));
@@ -165,8 +169,9 @@ public class CassandraMigrationServiceTest {
 
     @Test
     public void partialMigrationShouldThrow() throws InterruptedException {
-        Migration migration1 = mock(Migration.class);
-        when(migration1.run()).thenReturn(Migration.Result.PARTIAL);
+        Task failingTask = mock(Task.class);
+        when(failingTask.run()).thenThrow(MigrationException.class);
+        Migration migration1 = failingTask::run;
         Migration migration2 = successfulMigration;
 
         Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of(
@@ -182,10 +187,10 @@ public class CassandraMigrationServiceTest {
 
     @Test
     public void partialMigrationShouldAbortMigrations() throws 
InterruptedException {
-        Migration migration1 = mock(Migration.class);
-        when(migration1.run()).thenReturn(Migration.Result.PARTIAL);
+        Task failingTask = mock(Task.class);
+        when(failingTask.run()).thenThrow(MigrationException.class);
+        Migration migration1 = failingTask::run;
         Migration migration2 = mock(Migration.class);
-        when(migration2.run()).thenReturn(Migration.Result.COMPLETED);
 
         Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of(
                 FROM_OLDER_TO_CURRENT, migration1,
@@ -198,8 +203,8 @@ public class CassandraMigrationServiceTest {
         try {
             testee.upgradeToVersion(LATEST_VERSION).run();
         } finally {
-            verify(migration1, times(1)).run();
-            verifyNoMoreInteractions(migration1);
+            verify(failingTask, times(1)).run();
+            verifyNoMoreInteractions(failingTask);
             verifyZeroInteractions(migration2);
         }
     }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
index bb65783..b70b8db 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
@@ -26,7 +26,6 @@ import 
org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import 
org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO.MessageIdAttachmentIds;
 import org.apache.james.mailbox.model.MessageId;
-import org.apache.james.task.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,24 +45,18 @@ public class AttachmentMessageIdCreation implements 
Migration {
     }
 
     @Override
-    public Result run() {
-        return cassandraMessageDAO.retrieveAllMessageIdAttachmentIds()
+    public void apply() {
+        cassandraMessageDAO.retrieveAllMessageIdAttachmentIds()
             .flatMap(this::createIndex)
-            .reduce(Result.COMPLETED, Task::combine)
-            .onErrorResume(this::errorHandling)
-            .block();
+            .doOnError(e -> LOGGER.error("Error while creation attachmentId -> 
messageIds index", e))
+            .blockLast();
     }
 
-    private Mono<Result> createIndex(MessageIdAttachmentIds message) {
+    private Mono<Void> createIndex(MessageIdAttachmentIds message) {
         MessageId messageId = message.getMessageId();
         return Flux.fromIterable(message.getAttachmentId())
             .flatMap(attachmentId -> 
attachmentMessageIdDAO.storeAttachmentForMessageId(attachmentId, messageId))
-            .then()
-            .thenReturn(Result.COMPLETED);
+            .then();
     }
 
-    private Mono<Result> errorHandling(Throwable e) {
-        LOGGER.error("Error while creation attachmentId -> messageIds index", 
e);
-        return Mono.just(Result.PARTIAL);
-    }
 }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
index 1bcb329..856afd0 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
@@ -26,7 +26,6 @@ import org.apache.james.blob.api.BlobStore;
 import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2;
 import org.apache.james.mailbox.model.Attachment;
-import org.apache.james.task.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,24 +47,17 @@ public class AttachmentV2Migration implements Migration {
     }
 
     @Override
-    public Result run() {
-        return attachmentDAOV1.retrieveAll()
-            .flatMap(this::migrateAttachment)
-            .reduce(Result.COMPLETED, Task::combine)
-            .onErrorResume(this::handlError)
-            .block();
+    public void apply() {
+        attachmentDAOV1.retrieveAll()
+                .flatMap(this::migrateAttachment)
+                .doOnError(t -> LOGGER.error("Error while performing 
attachmentDAO V2 migration", t))
+                .blockLast();
     }
 
-    private Mono<Result> handlError(Throwable throwable) {
-        LOGGER.error("Error while performing attachmentDAO V2 migration", 
throwable);
-        return Mono.just(Result.PARTIAL);
-    }
-
-    private Mono<Result> migrateAttachment(Attachment attachment) {
+    private Mono<Void> migrateAttachment(Attachment attachment) {
         return blobStore.save(blobStore.getDefaultBucketName(), 
attachment.getBytes())
             .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
             .flatMap(attachmentDAOV2::storeAttachment)
-            
.then(attachmentDAOV1.deleteAttachment(attachment.getAttachmentId()))
-            .thenReturn(Result.COMPLETED);
+            
.then(attachmentDAOV1.deleteAttachment(attachment.getAttachmentId()));
     }
 }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
index cfa5f60..a063457 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
@@ -30,7 +30,6 @@ import 
org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAOImpl;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV2DAO;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,38 +68,43 @@ public class MailboxPathV2Migration implements Migration {
     }
 
     @Override
-    public Result run() {
-        return daoV1.readAll()
+    public void apply() {
+        daoV1.readAll()
             .flatMap(this::migrate)
-            .reduce(Result.COMPLETED, Task::combine)
-            .onErrorResume(this::handleErrorRun)
-            .block();
-    }
-
-    private Mono<Result> handleErrorRun(Throwable throwable) {
-            LOGGER.error("Error while performing migration", throwable);
-            return Mono.just(Result.PARTIAL);
+            .doOnError(t -> LOGGER.error("Error while performing migration", 
t))
+            .blockLast();
     }
 
-    public Mono<Result> migrate(CassandraIdAndPath idAndPath) {
+    private Mono<Void> migrate(CassandraIdAndPath idAndPath) {
         return daoV2.save(idAndPath.getMailboxPath(), 
idAndPath.getCassandraId())
             .then(daoV1.delete(idAndPath.getMailboxPath()))
-            .thenReturn(Result.COMPLETED)
-            .onErrorResume(error -> handleErrorMigrate(idAndPath, error));
+            .onErrorResume(error -> handleErrorMigrate(idAndPath, error))
+            .then();
     }
 
-    private Mono<Result> handleErrorMigrate(CassandraIdAndPath idAndPath, 
Throwable throwable) {
+    private Mono<Void> handleErrorMigrate(CassandraIdAndPath idAndPath, 
Throwable throwable) {
         LOGGER.error("Error while performing migration for path {}", 
idAndPath.getMailboxPath(), throwable);
-        return Mono.just(Result.PARTIAL);
+        return Mono.empty();
     }
 
     @Override
-    public String type() {
-        return "Cassandra_mailboxPathV2Migration";
+    public Task asTask() {
+        return new Task() {
+            @Override
+            public Result run() throws InterruptedException {
+                return runTask();
+            }
+
+            @Override
+            public String type() {
+                return "Cassandra_mailboxPathV2Migration";
+            }
+
+            @Override
+            public Optional<TaskExecutionDetails.AdditionalInformation> 
details() {
+                return Optional.of(additionalInformation);
+            }
+        };
     }
 
-    @Override
-    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
-        return Optional.of(additionalInformation);
-    }
 }
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
index 89ad9b4..39d69fd 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
@@ -36,7 +36,6 @@ import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.CassandraRestartExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.backends.cassandra.migration.Migration;
 import 
org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.cassandra.CassandraBlobModule;
@@ -54,6 +53,7 @@ import org.apache.james.mailbox.model.MessageAttachment;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.task.Task;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -61,7 +61,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-
 import reactor.core.publisher.Flux;
 
 @ExtendWith(CassandraRestartExtension.class)
@@ -102,9 +101,9 @@ class AttachmentMessageIdCreationTest {
     }
 
     @Test
-    void emptyMigrationShouldSucceed() {
-        assertThat(migration.run())
-            .isEqualTo(Migration.Result.COMPLETED);
+    void emptyMigrationShouldSucceed() throws InterruptedException {
+        assertThat(migration.asTask().run())
+            .isEqualTo(Task.Result.COMPLETED);
     }
 
     @Test
@@ -114,8 +113,8 @@ class AttachmentMessageIdCreationTest {
 
         cassandraMessageDAO.save(message).block();
 
-        assertThat(migration.run())
-            .isEqualTo(Migration.Result.COMPLETED);
+        assertThat(migration.asTask().run())
+            .isEqualTo(Task.Result.COMPLETED);
     }
 
     @Test
@@ -125,8 +124,8 @@ class AttachmentMessageIdCreationTest {
 
         cassandraMessageDAO.save(message).block();
 
-        assertThat(migration.run())
-            .isEqualTo(Migration.Result.COMPLETED);
+        assertThat(migration.asTask().run())
+            .isEqualTo(Task.Result.COMPLETED);
     }
 
     @Test
@@ -136,7 +135,7 @@ class AttachmentMessageIdCreationTest {
 
         cassandraMessageDAO.save(message).block();
 
-        migration.run();
+        migration.apply();
 
         
assertThat(attachmentMessageIdDAO.getOwnerMessageIds(attachment.getAttachmentId()).toIterable())
             .containsExactly(messageId);
@@ -150,7 +149,7 @@ class AttachmentMessageIdCreationTest {
 
         
when(cassandraMessageDAO.retrieveAllMessageIdAttachmentIds()).thenReturn(Flux.error(new
 RuntimeException("Mocked exception")));
 
-        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL);
     }
 
     @Test
@@ -167,7 +166,7 @@ class AttachmentMessageIdCreationTest {
         
when(attachmentMessageIdDAO.storeAttachmentForMessageId(any(AttachmentId.class),
 any(MessageId.class)))
             .thenThrow(new RuntimeException());
 
-        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL);
     }
 
     private SimpleMailboxMessage createMessage(MessageId messageId, 
Collection<MessageAttachment> attachments) {
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
index 29aeede..2100a37 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
@@ -41,6 +41,7 @@ import 
org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2;
 import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule;
 import org.apache.james.mailbox.model.Attachment;
 import org.apache.james.mailbox.model.AttachmentId;
+import org.apache.james.task.Task;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -89,9 +90,9 @@ class AttachmentV2MigrationTest {
     }
 
     @Test
-    void emptyMigrationShouldSucceed() {
-        assertThat(migration.run())
-            .isEqualTo(Migration.Result.COMPLETED);
+    void emptyMigrationShouldSucceed() throws InterruptedException {
+        assertThat(migration.asTask().run())
+            .isEqualTo(Task.Result.COMPLETED);
     }
 
     @Test
@@ -99,8 +100,8 @@ class AttachmentV2MigrationTest {
         attachmentDAO.storeAttachment(attachment1).block();
         attachmentDAO.storeAttachment(attachment2).block();
 
-        assertThat(migration.run())
-            .isEqualTo(Migration.Result.COMPLETED);
+        assertThat(migration.asTask().run())
+            .isEqualTo(Task.Result.COMPLETED);
     }
 
     @Test
@@ -108,7 +109,7 @@ class AttachmentV2MigrationTest {
         attachmentDAO.storeAttachment(attachment1).block();
         attachmentDAO.storeAttachment(attachment2).block();
 
-        migration.run();
+        migration.apply();
 
         
assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID).blockOptional())
             .contains(CassandraAttachmentDAOV2.from(attachment1, 
BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
@@ -125,7 +126,7 @@ class AttachmentV2MigrationTest {
         attachmentDAO.storeAttachment(attachment1).block();
         attachmentDAO.storeAttachment(attachment2).block();
 
-        migration.run();
+        migration.apply();
 
         assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID).blockOptional())
             .isEmpty();
@@ -134,7 +135,7 @@ class AttachmentV2MigrationTest {
     }
 
     @Test
-    void runShouldReturnPartialWhenInitialReadFail() {
+    void runShouldReturnPartialWhenInitialReadFail() throws 
InterruptedException {
         CassandraAttachmentDAO attachmentDAO = 
mock(CassandraAttachmentDAO.class);
         CassandraAttachmentDAOV2 attachmentDAOV2 = 
mock(CassandraAttachmentDAOV2.class);
         CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class);
@@ -142,11 +143,11 @@ class AttachmentV2MigrationTest {
 
         when(attachmentDAO.retrieveAll()).thenReturn(Flux.error(new 
RuntimeException()));
 
-        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL);
     }
 
     @Test
-    void runShouldReturnPartialWhenSavingBlobsFails() {
+    void runShouldReturnPartialWhenSavingBlobsFails() throws 
InterruptedException {
         CassandraAttachmentDAO attachmentDAO = 
mock(CassandraAttachmentDAO.class);
         CassandraAttachmentDAOV2 attachmentDAOV2 = 
mock(CassandraAttachmentDAOV2.class);
         CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class);
@@ -157,11 +158,11 @@ class AttachmentV2MigrationTest {
             attachment2));
         when(blobsDAO.save(any(BucketName.class), 
any(byte[].class))).thenThrow(new RuntimeException());
 
-        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL);
     }
 
     @Test
-    void runShouldReturnPartialWhenSavingAttachmentV2Fail() {
+    void runShouldReturnPartialWhenSavingAttachmentV2Fail() throws 
InterruptedException {
         CassandraAttachmentDAO attachmentDAO = 
mock(CassandraAttachmentDAO.class);
         CassandraAttachmentDAOV2 attachmentDAOV2 = 
mock(CassandraAttachmentDAOV2.class);
         CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class);
@@ -176,11 +177,11 @@ class AttachmentV2MigrationTest {
             
.thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
         when(attachmentDAOV2.storeAttachment(any())).thenThrow(new 
RuntimeException());
 
-        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL);
     }
 
     @Test
-    void runShouldReturnPartialWhenDeleteV1AttachmentFail() {
+    void runShouldReturnPartialWhenDeleteV1AttachmentFail() throws 
InterruptedException {
         CassandraAttachmentDAO attachmentDAO = 
mock(CassandraAttachmentDAO.class);
         CassandraAttachmentDAOV2 attachmentDAOV2 = 
mock(CassandraAttachmentDAOV2.class);
         CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class);
@@ -196,11 +197,11 @@ class AttachmentV2MigrationTest {
         when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty());
         when(attachmentDAO.deleteAttachment(any())).thenThrow(new 
RuntimeException());
 
-        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL);
     }
 
     @Test
-    void runShouldReturnPartialWhenAtLeastOneAttachmentMigrationFails() {
+    void runShouldReturnPartialWhenAtLeastOneAttachmentMigrationFails() throws 
InterruptedException {
         CassandraAttachmentDAO attachmentDAO = 
mock(CassandraAttachmentDAO.class);
         CassandraAttachmentDAOV2 attachmentDAOV2 = 
mock(CassandraAttachmentDAOV2.class);
         CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class);
@@ -216,7 +217,7 @@ class AttachmentV2MigrationTest {
         when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty());
         when(attachmentDAO.deleteAttachment(any())).thenReturn(Mono.empty());
 
-        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL);
     }
 
 }
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
index e127898..6fce3c2 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
@@ -127,7 +127,7 @@ class MailboxPathV2MigrationTest {
     void migrationTaskShouldMoveDataToMostRecentDao() {
         daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).block();
 
-        new MailboxPathV2Migration(daoV1, daoV2).run();
+        new MailboxPathV2Migration(daoV1, daoV2).apply();
 
         SoftAssertions softly = new SoftAssertions();
         
softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).blockOptional()).isEmpty();
diff --git 
a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
 
b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
index 025cbef..628def0 100644
--- 
a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
+++ 
b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
@@ -56,7 +56,7 @@ public class CassandraRoutesModule extends AbstractModule {
         routesMultibinder.addBinding().to(CassandraMailboxMergingRoutes.class);
 
         MapBinder<SchemaTransition, Migration> allMigrationClazzBinder = 
MapBinder.newMapBinder(binder(), SchemaTransition.class, Migration.class);
-        allMigrationClazzBinder.addBinding(FROM_V2_TO_V3).toInstance(() -> 
Migration.Result.COMPLETED);
+        allMigrationClazzBinder.addBinding(FROM_V2_TO_V3).toInstance(() -> { 
});
         
allMigrationClazzBinder.addBinding(FROM_V3_TO_V4).to(AttachmentV2Migration.class);
         
allMigrationClazzBinder.addBinding(FROM_V4_TO_V5).to(AttachmentMessageIdCreation.class);
         
allMigrationClazzBinder.addBinding(FROM_V5_TO_V6).to(MailboxPathV2Migration.class);
diff --git 
a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
 
b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
index 9d3de07..990d994 100644
--- 
a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
+++ 
b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
@@ -26,6 +26,7 @@ import javax.inject.Inject;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.backends.cassandra.migration.MigrationException;
 import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO;
 import org.apache.james.rrt.cassandra.CassandraRecipientRewriteTableDAO;
 import org.apache.james.rrt.lib.Mapping;
@@ -74,38 +75,52 @@ public class MappingsSourcesMigration implements Migration {
     }
 
     @Override
-    public Result run() {
-        return cassandraRecipientRewriteTableDAO.getAllMappings()
+    public void apply() {
+        cassandraRecipientRewriteTableDAO.getAllMappings()
             .flatMap(this::migrate)
-            .reduce(Result.COMPLETED, Task::combine)
-            .doOnError(e -> LOGGER.error("Error while migrating mappings 
sources", e))
-            .onErrorResume(e -> Mono.just(Result.PARTIAL))
+            .then(Mono.fromRunnable(() -> {
+                if (errorMappingsCount.get() > 0) {
+                    throw new MigrationException("MappingsSourcesMigration 
failed");
+                }
+            }))
+            .doOnError(t -> LOGGER.error("Error while migrating mappings 
sources", t))
             .block();
     }
 
-    private Mono<Result> migrate(Pair<MappingSource, Mapping> mappingEntry) {
+    private Mono<Void> migrate(Pair<MappingSource, Mapping> mappingEntry) {
         return cassandraMappingsSourcesDAO.addMapping(mappingEntry.getRight(), 
mappingEntry.getLeft())
-            .then(Mono.fromCallable(() -> {
-                successfulMappingsCount.incrementAndGet();
-                return Result.COMPLETED;
-            }))
-            .onErrorResume(e -> {
+            .then(Mono.fromCallable(successfulMappingsCount::incrementAndGet))
+            .then()
+            .onErrorResume(t -> {
                 LOGGER.error("Error while performing migration of mapping 
source: {} with mapping: {}",
-                    mappingEntry.getLeft().asString(), 
mappingEntry.getRight().asString(), e);
+                        mappingEntry.getLeft().asString(), 
mappingEntry.getRight().asString(), t);
                 errorMappingsCount.incrementAndGet();
-                return Mono.just(Result.PARTIAL);
+                return Mono.empty();
             });
     }
 
     @Override
-    public String type() {
-        return TYPE;
+    public Task asTask() {
+        return new Task() {
+
+            @Override
+            public Result run() throws InterruptedException {
+                return runTask();
+            }
+
+            @Override
+            public String type() {
+                return TYPE;
+            }
+
+            @Override
+            public Optional<TaskExecutionDetails.AdditionalInformation> 
details() {
+                return Optional.of(createAdditionalInformation());
+            }
+        };
     }
 
-    @Override
-    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
-        return Optional.of(createAdditionalInformation());
-    }
+
 
     AdditionalInformation createAdditionalInformation() {
         return new AdditionalInformation(
diff --git 
a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
 
b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
index 64d066d..c66ce36 100644
--- 
a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
+++ 
b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
@@ -31,7 +31,6 @@ import java.util.stream.IntStream;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
-import org.apache.james.backends.cassandra.migration.Migration;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.core.Domain;
 import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO;
@@ -75,22 +74,22 @@ class MappingsSourcesMigrationTest {
     }
 
     @Test
-    void emptyMigrationShouldSucceed() {
-        assertThat(migration.run()).isEqualTo(Migration.Result.COMPLETED);
+    void emptyMigrationShouldSucceed() throws InterruptedException {
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.COMPLETED);
     }
 
     @Test
-    void migrationShouldSucceedWithData() {
+    void migrationShouldSucceedWithData() throws InterruptedException {
         cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
 
-        assertThat(migration.run()).isEqualTo(Task.Result.COMPLETED);
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.COMPLETED);
     }
 
     @Test
     void migrationShouldCreateMappingSourceFromMapping() {
         cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
 
-        migration.run();
+        migration.apply();
 
         
assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block())
             .containsExactly(SOURCE);
@@ -106,7 +105,7 @@ class MappingsSourcesMigrationTest {
         cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
         cassandraRecipientRewriteTableDAO.addMapping(source2, MAPPING).block();
 
-        migration.run();
+        migration.apply();
 
         
assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block())
             .containsOnly(SOURCE, source2);
@@ -116,20 +115,20 @@ class MappingsSourcesMigrationTest {
     }
 
     @Test
-    void migrationShouldReturnPartialWhenGetAllMappingsFromMappingsFail() {
+    void migrationShouldReturnPartialWhenGetAllMappingsFromMappingsFail() 
throws InterruptedException {
         CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = 
mock(CassandraRecipientRewriteTableDAO.class);
         CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = 
mock(CassandraMappingsSourcesDAO.class);
         migration = new 
MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, 
cassandraMappingsSourcesDAO);
 
         
when(cassandraRecipientRewriteTableDAO.getAllMappings()).thenReturn(Flux.error(new
 RuntimeException()));
 
-        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL);
         
assertThat(migration.createAdditionalInformation().getSuccessfulMappingsCount()).isEqualTo(0);
         
assertThat(migration.createAdditionalInformation().getErrorMappingsCount()).isEqualTo(0);
     }
 
     @Test
-    void migrationShouldReturnPartialAddMappingFails() {
+    void migrationShouldReturnPartialAddMappingFails() throws 
InterruptedException {
         CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = 
mock(CassandraRecipientRewriteTableDAO.class);
         CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = 
mock(CassandraMappingsSourcesDAO.class);
         migration = new 
MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, 
cassandraMappingsSourcesDAO);
@@ -139,13 +138,13 @@ class MappingsSourcesMigrationTest {
         when(cassandraMappingsSourcesDAO.addMapping(any(Mapping.class), 
any(MappingSource.class)))
             .thenReturn(Mono.error(new RuntimeException()));
 
-        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL);
         
assertThat(migration.createAdditionalInformation().getSuccessfulMappingsCount()).isEqualTo(0);
         
assertThat(migration.createAdditionalInformation().getErrorMappingsCount()).isEqualTo(1);
     }
 
     @Test
-    void migrationShouldHaveCorrectErrorCountWhenMultipleAddMappingFails() {
+    void migrationShouldHaveCorrectErrorCountWhenMultipleAddMappingFails() 
throws InterruptedException {
         MappingSource source2 = MappingSource.fromUser("bob", 
Domain.LOCALHOST);
 
         CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = 
mock(CassandraRecipientRewriteTableDAO.class);
@@ -159,7 +158,7 @@ class MappingsSourcesMigrationTest {
         when(cassandraMappingsSourcesDAO.addMapping(any(Mapping.class), 
any(MappingSource.class)))
             .thenReturn(Mono.error(new RuntimeException()));
 
-        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+        assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL);
         
assertThat(migration.createAdditionalInformation().getSuccessfulMappingsCount()).isEqualTo(0);
         
assertThat(migration.createAdditionalInformation().getErrorMappingsCount()).isEqualTo(2);
     }
@@ -171,7 +170,7 @@ class MappingsSourcesMigrationTest {
                 .addMapping(MappingSource.parse("source" + i + "@domain"), 
MAPPING).block());
 
         ConcurrentTestRunner.builder()
-            .operation((threadNumber, step) -> migration.run())
+            .operation((threadNumber, step) -> migration.apply())
             .threadCount(THREAD_COUNT)
             .operationCount(OPERATION_COUNT)
             .runSuccessfullyWithin(Duration.ofMinutes(1));
diff --git 
a/server/protocols/webadmin/webadmin-cassandra-data/src/main/java/org/apache/james/webadmin/service/CassandraMappingsSolveInconsistenciesTask.java
 
b/server/protocols/webadmin/webadmin-cassandra-data/src/main/java/org/apache/james/webadmin/service/CassandraMappingsSolveInconsistenciesTask.java
index d487f34..8752e15 100644
--- 
a/server/protocols/webadmin/webadmin-cassandra-data/src/main/java/org/apache/james/webadmin/service/CassandraMappingsSolveInconsistenciesTask.java
+++ 
b/server/protocols/webadmin/webadmin-cassandra-data/src/main/java/org/apache/james/webadmin/service/CassandraMappingsSolveInconsistenciesTask.java
@@ -33,13 +33,13 @@ import reactor.core.publisher.Mono;
 public class CassandraMappingsSolveInconsistenciesTask implements Task {
     public static final String TYPE = "cassandraMappingsSolveInconsistencies";
 
-    private final MappingsSourcesMigration mappingsSourcesMigration;
+    private final Task mappingsSourcesMigration;
     private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
 
     @Inject
     CassandraMappingsSolveInconsistenciesTask(MappingsSourcesMigration 
mappingsSourcesMigration,
                                               CassandraMappingsSourcesDAO 
cassandraMappingsSourcesDAO) {
-        this.mappingsSourcesMigration = mappingsSourcesMigration;
+        this.mappingsSourcesMigration = mappingsSourcesMigration.asTask();
         this.cassandraMappingsSourcesDAO = cassandraMappingsSourcesDAO;
     }
 
diff --git 
a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
 
b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
index 0cf9ec5..5effb68 100644
--- 
a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
+++ 
b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
@@ -26,7 +26,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 
 import org.apache.james.backends.cassandra.migration.CassandraMigrationService;
-import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.task.Task;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
 import org.apache.james.webadmin.Routes;
@@ -107,7 +107,7 @@ public class CassandraMigrationRoutes implements Routes {
     })
     public Object upgradeToLatest(Response response) {
         try {
-            Migration migration = 
cassandraMigrationService.upgradeToLastVersion();
+            Task migration = cassandraMigrationService.upgradeToLastVersion();
             TaskId taskId = taskManager.submit(migration);
             return TaskIdDto.respond(response, taskId);
         } catch (IllegalStateException e) {
@@ -143,7 +143,7 @@ public class CassandraMigrationRoutes implements Routes {
         LOGGER.debug("Cassandra upgrade launched");
         try {
             CassandraVersionRequest cassandraVersionRequest = 
CassandraVersionRequest.parse(request.body());
-            Migration migration = 
cassandraMigrationService.upgradeToVersion(cassandraVersionRequest.getValue());
+            Task migration = 
cassandraMigrationService.upgradeToVersion(cassandraVersionRequest.getValue());
             TaskId taskId = taskManager.submit(migration);
             return TaskIdDto.from(taskId);
         } catch (NullPointerException | IllegalArgumentException e) {
diff --git 
a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
 
b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
index 22beef0..f3b09ff 100644
--- 
a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
@@ -54,7 +54,6 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableMap;
-
 import io.restassured.RestAssured;
 import io.restassured.http.ContentType;
 import reactor.core.publisher.Mono;
@@ -71,7 +70,6 @@ public class CassandraMigrationRoutesTest {
 
     private void createServer() throws InterruptedException {
         Migration successfulMigration = mock(Migration.class);
-        when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED);
 
         Map<SchemaTransition, Migration> allMigrationClazz = ImmutableMap.of(
             FROM_OLDER_TO_CURRENT, successfulMigration,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to