This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 01cb8d1af106486bf60924ed1dfa622880cfe74f
Author: LanKhuat <khuatdang...@gmail.com>
AuthorDate: Fri Apr 17 01:57:50 2020 +0700

    JAMES-3143 Add context & objects describing inconsistencies
---
 .../task/SolveMessageInconsistenciesService.java   | 446 +++++++++++++++++----
 .../SolveMessageInconsistenciesServiceTest.java    | 265 +++++++++---
 2 files changed, 593 insertions(+), 118 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
index f67ff73..8518f1e 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
@@ -19,7 +19,11 @@
 
 package org.apache.james.mailbox.cassandra.mail.task;
 
+import java.util.Collection;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
 
@@ -27,105 +31,413 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
+import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.task.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class SolveMessageInconsistenciesService {
+
+    @FunctionalInterface
+    interface Inconsistency {
+        Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO 
imapUidDAO, CassandraMessageIdDAO messageIdDAO);
+    }
+
+    private static Inconsistency NO_INCONSISTENCY = (context, imapUidDAO, 
messageIdDAO) -> Mono.just(Task.Result.COMPLETED);
+
+    private static class FailedToRetrieveRecord implements Inconsistency {
+        private final ComposedMessageIdWithMetaData message;
+
+        private FailedToRetrieveRecord(ComposedMessageIdWithMetaData message) {
+            this.message = message;
+        }
+
+        @Override
+        public Mono<Task.Result> fix(Context context, 
CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO) {
+            context.addErrors(message.getComposedMessageId());
+            LOGGER.error("Failed to retrieve record: {}", 
message.getComposedMessageId());
+            return Mono.just(Task.Result.PARTIAL);
+        }
+    }
+
+    private static class OrphanImapUidEntry implements Inconsistency {
+        private final ComposedMessageIdWithMetaData message;
+
+        private OrphanImapUidEntry(ComposedMessageIdWithMetaData message) {
+            this.message = message;
+        }
+
+        @Override
+        public Mono<Task.Result> fix(Context context, 
CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO) {
+            return messageIdDAO.insert(message)
+                .doOnSuccess(any -> notifySuccess(context))
+                .thenReturn(Task.Result.COMPLETED)
+                .onErrorResume(error -> {
+                    notifyFailure(context);
+                    return Mono.just(Task.Result.PARTIAL);
+                });
+        }
+
+        private void notifyFailure(Context context) {
+            context.addErrors(message.getComposedMessageId());
+            LOGGER.error("Failed to fix inconsistency for orphan message in 
ImapUid: {}", message.getComposedMessageId());
+        }
+
+        private void notifySuccess(Context context) {
+            LOGGER.info("Inconsistency fixed for orphan message in ImapUid: 
{}", message.getComposedMessageId());
+            context.incrementAddedMessageIdEntries();
+            context.addFixedInconsistency(message.getComposedMessageId());
+        }
+    }
+
+    private static class OutdatedMessageIdEntry implements Inconsistency {
+        private final ComposedMessageIdWithMetaData messageFromMessageId;
+        private final ComposedMessageIdWithMetaData messageFromImapUid;
+
+        private OutdatedMessageIdEntry(ComposedMessageIdWithMetaData message, 
ComposedMessageIdWithMetaData messageFromImapUid) {
+            this.messageFromMessageId = message;
+            this.messageFromImapUid = messageFromImapUid;
+        }
+
+        @Override
+        public Mono<Task.Result> fix(Context context, 
CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO) {
+            return messageIdDAO.updateMetadata(messageFromImapUid)
+                .doOnSuccess(any -> notifySuccess(context))
+                .thenReturn(Task.Result.COMPLETED)
+                .onErrorResume(error -> {
+                    notifyFailure(context);
+                    return Mono.just(Task.Result.PARTIAL);
+                });
+        }
+
+        private void notifyFailure(Context context) {
+            context.addErrors(messageFromMessageId.getComposedMessageId());
+            LOGGER.error("Failed to fix inconsistency for outdated message in 
MessageId: {}", messageFromMessageId.getComposedMessageId());
+        }
+
+        private void notifySuccess(Context context) {
+            LOGGER.info("Inconsistency fixed for outdated message in 
MessageId: {}", messageFromMessageId.getComposedMessageId());
+            context.incrementUpdatedMessageIdEntries();
+            
context.addFixedInconsistency(messageFromMessageId.getComposedMessageId());
+        }
+    }
+
+    private static class OrphanMessageIdEntry implements Inconsistency {
+        private final ComposedMessageIdWithMetaData message;
+
+        private OrphanMessageIdEntry(ComposedMessageIdWithMetaData message) {
+            this.message = message;
+        }
+
+        @Override
+        public Mono<Task.Result> fix(Context context, 
CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO) {
+            return messageIdDAO.delete((CassandraId) 
message.getComposedMessageId().getMailboxId(), 
message.getComposedMessageId().getUid())
+                .doOnSuccess(any -> notifySuccess(context))
+                .thenReturn(Task.Result.COMPLETED)
+                .onErrorResume(error -> {
+                    notifyFailure(context);
+                    return Mono.just(Task.Result.PARTIAL);
+                });
+        }
+
+        private void notifyFailure(Context context) {
+            context.addErrors(message.getComposedMessageId());
+            LOGGER.error("Failed to fix inconsistency for orphan message in 
MessageId: {}", message.getComposedMessageId());
+        }
+
+        private void notifySuccess(Context context) {
+            LOGGER.info("Inconsistency fixed for orphan message in MessageId: 
{}", message.getComposedMessageId());
+            context.incrementRemovedMessageIdEntries();
+            context.addFixedInconsistency(message.getComposedMessageId());
+        }
+    }
+
+    static class Context {
+        static class Snapshot {
+            public static Builder builder() {
+                return new Builder();
+            }
+
+            static class Builder {
+                private Optional<Long> processedImapUidEntries;
+                private Optional<Long> processedMessageIdEntries;
+                private Optional<Long> addedMessageIdEntries;
+                private Optional<Long> updatedMessageIdEntries;
+                private Optional<Long> removedMessageIdEntries;
+                private ImmutableList.Builder<ComposedMessageId> 
fixedInconsistencies;
+                private ImmutableList.Builder<ComposedMessageId> errors;
+
+                Builder() {
+                    processedImapUidEntries = Optional.empty();
+                    processedMessageIdEntries = Optional.empty();
+                    addedMessageIdEntries = Optional.empty();
+                    updatedMessageIdEntries = Optional.empty();
+                    removedMessageIdEntries = Optional.empty();
+                    fixedInconsistencies = ImmutableList.builder();
+                    errors = ImmutableList.builder();
+                }
+
+                public Builder processedImapUidEntries(long count) {
+                    processedImapUidEntries = Optional.of(count);
+                    return this;
+                }
+
+                public Builder processedMessageIdEntries(long count) {
+                    processedMessageIdEntries = Optional.of(count);
+                    return this;
+                }
+
+                public Builder addedMessageIdEntries(long count) {
+                    addedMessageIdEntries = Optional.of(count);
+                    return this;
+                }
+
+                public Builder updatedMessageIdEntries(long count) {
+                    updatedMessageIdEntries = Optional.of(count);
+                    return this;
+                }
+
+                public Builder removedMessageIdEntries(long count) {
+                    removedMessageIdEntries = Optional.of(count);
+                    return this;
+                }
+
+                public Builder addFixedInconsistencies(ComposedMessageId 
composedMessageId) {
+                    fixedInconsistencies.add(composedMessageId);
+                    return this;
+                }
+
+                public Builder errors(ComposedMessageId composedMessageId) {
+                    errors.add(composedMessageId);
+                    return this;
+                }
+
+                public SolveMessageInconsistenciesService.Context.Snapshot 
build() {
+                    return new 
SolveMessageInconsistenciesService.Context.Snapshot(
+                        processedImapUidEntries.orElse(0L),
+                        processedMessageIdEntries.orElse(0L),
+                        addedMessageIdEntries.orElse(0L),
+                        updatedMessageIdEntries.orElse(0L),
+                        removedMessageIdEntries.orElse(0L),
+                        fixedInconsistencies.build(),
+                        errors.build());
+                }
+            }
+
+            private final long processedImapUidEntries;
+            private final long processedMessageIdEntries;
+            private final long addedMessageIdEntries;
+            private final long updatedMessageIdEntries;
+            private final long removedMessageIdEntries;
+            private final ImmutableList<ComposedMessageId> 
fixedInconsistencies;
+            private final ImmutableList<ComposedMessageId> errors;
+
+            private Snapshot(long processedImapUidEntries, long 
processedMessageIdEntries,
+                             long addedMessageIdEntries, long 
updatedMessageIdEntries,
+                             long removedMessageIdEntries,
+                             ImmutableList<ComposedMessageId> 
fixedInconsistencies,
+                             ImmutableList<ComposedMessageId> errors) {
+                this.processedImapUidEntries = processedImapUidEntries;
+                this.processedMessageIdEntries = processedMessageIdEntries;
+                this.addedMessageIdEntries = addedMessageIdEntries;
+                this.updatedMessageIdEntries = updatedMessageIdEntries;
+                this.removedMessageIdEntries = removedMessageIdEntries;
+                this.fixedInconsistencies = fixedInconsistencies;
+                this.errors = errors;
+            }
+
+            public long getProcessedImapUidEntries() {
+                return processedImapUidEntries;
+            }
+
+            public long getProcessedMessageIdEntries() {
+                return processedMessageIdEntries;
+            }
+
+            public long getAddedMessageIdEntries() {
+                return addedMessageIdEntries;
+            }
+
+            public long getUpdatedMessageIdEntries() {
+                return updatedMessageIdEntries;
+            }
+
+            public long getRemovedMessageIdEntries() {
+                return removedMessageIdEntries;
+            }
+
+            public ImmutableList<ComposedMessageId> getFixedInconsistencies() {
+                return fixedInconsistencies;
+            }
+
+            public ImmutableList<ComposedMessageId> getErrors() {
+                return errors;
+            }
+
+            @Override
+            public final boolean equals(Object o) {
+                if (o instanceof Snapshot) {
+                    Snapshot snapshot = (Snapshot) o;
+
+                    return Objects.equals(this.processedImapUidEntries, 
snapshot.processedImapUidEntries)
+                        && Objects.equals(this.processedMessageIdEntries, 
snapshot.processedMessageIdEntries)
+                        && Objects.equals(this.addedMessageIdEntries, 
snapshot.addedMessageIdEntries)
+                        && Objects.equals(this.updatedMessageIdEntries, 
snapshot.updatedMessageIdEntries)
+                        && Objects.equals(this.removedMessageIdEntries, 
snapshot.removedMessageIdEntries)
+                        && Objects.equals(this.errors, snapshot.errors)
+                        && Objects.equals(this.fixedInconsistencies, 
snapshot.fixedInconsistencies);
+                }
+                return false;
+            }
+
+            @Override
+            public final int hashCode() {
+                return Objects.hash(processedImapUidEntries, 
processedMessageIdEntries, addedMessageIdEntries, updatedMessageIdEntries, 
removedMessageIdEntries, fixedInconsistencies, errors);
+            }
+
+            @Override
+            public String toString() {
+                return MoreObjects.toStringHelper(this)
+                    .add("processedImapUidEntries", processedImapUidEntries)
+                    .add("processedMessageIdEntries", 
processedMessageIdEntries)
+                    .add("addedMessageIdEntries", addedMessageIdEntries)
+                    .add("updatedMessageIdEntries", updatedMessageIdEntries)
+                    .add("removedMessageIdEntries", removedMessageIdEntries)
+                    .add("fixedInconsistencies", fixedInconsistencies)
+                    .add("errors", errors)
+                    .toString();
+            }
+        }
+
+        private final AtomicLong processedImapUidEntries;
+        private final AtomicLong processedMessageIdEntries;
+        private final AtomicLong addedMessageIdEntries;
+        private final AtomicLong updatedMessageIdEntries;
+        private final AtomicLong removedMessageIdEntries;
+        private final ConcurrentLinkedDeque<ComposedMessageId> 
fixedInconsistencies;
+        private final ConcurrentLinkedDeque<ComposedMessageId> errors;
+
+        Context() {
+            this(new AtomicLong(), new AtomicLong(), new AtomicLong(), new 
AtomicLong(), new AtomicLong(), ImmutableList.of(), ImmutableList.of());
+        }
+
+        private Context(AtomicLong processedImapUidEntries, AtomicLong 
processedMessageIdEntries, AtomicLong addedMessageIdEntries,
+                        AtomicLong updatedMessageIdEntries, AtomicLong 
removedMessageIdEntries, Collection<ComposedMessageId> fixedInconsistencies,
+                        Collection<ComposedMessageId> errors) {
+            this.processedImapUidEntries = processedImapUidEntries;
+            this.processedMessageIdEntries = processedMessageIdEntries;
+            this.addedMessageIdEntries = addedMessageIdEntries;
+            this.updatedMessageIdEntries = updatedMessageIdEntries;
+            this.removedMessageIdEntries = removedMessageIdEntries;
+            this.fixedInconsistencies = new 
ConcurrentLinkedDeque<>(fixedInconsistencies);
+            this.errors = new ConcurrentLinkedDeque<>(errors);
+        }
+
+        void incrementProcessedImapUidEntries() {
+            processedImapUidEntries.incrementAndGet();
+        }
+
+        void incrementMessageIdEntries() {
+            processedMessageIdEntries.incrementAndGet();
+        }
+
+        void incrementAddedMessageIdEntries() {
+            addedMessageIdEntries.incrementAndGet();
+        }
+
+        void incrementUpdatedMessageIdEntries() {
+            updatedMessageIdEntries.incrementAndGet();
+        }
+
+        void incrementRemovedMessageIdEntries() {
+            removedMessageIdEntries.incrementAndGet();
+        }
+
+        void addFixedInconsistency(ComposedMessageId messageId) {
+            fixedInconsistencies.add(messageId);
+        }
+
+        void addErrors(ComposedMessageId messageId) {
+            errors.add(messageId);
+        }
+
+        Snapshot snapshot() {
+            return new Snapshot(
+                processedImapUidEntries.get(),
+                processedMessageIdEntries.get(),
+                addedMessageIdEntries.get(),
+                updatedMessageIdEntries.get(),
+                removedMessageIdEntries.get(),
+                ImmutableList.copyOf(fixedInconsistencies),
+                ImmutableList.copyOf(errors));
+        }
+    }
+
     public static final Logger LOGGER = 
LoggerFactory.getLogger(SolveMessageInconsistenciesService.class);
 
-    private final CassandraMessageIdToImapUidDAO idToImapUidDAO;
+    private final CassandraMessageIdToImapUidDAO messageIdToImapUidDAO;
     private final CassandraMessageIdDAO messageIdDAO;
 
     @Inject
-    SolveMessageInconsistenciesService(CassandraMessageIdToImapUidDAO 
idToImapUidDAO, CassandraMessageIdDAO messageIdDAO) {
-        this.idToImapUidDAO = idToImapUidDAO;
+    SolveMessageInconsistenciesService(CassandraMessageIdToImapUidDAO 
messageIdToImapUidDAO, CassandraMessageIdDAO messageIdDAO) {
+        this.messageIdToImapUidDAO = messageIdToImapUidDAO;
         this.messageIdDAO = messageIdDAO;
     }
 
-    Mono<Task.Result> fixMessageInconsistencies() {
+    Mono<Task.Result> fixMessageInconsistencies(Context context) {
         return Flux.concat(
-            fixMessageIdInconsistencies(),
-            fixImapUidInconsistencies())
+            fixInconsistenciesInMessageId(context),
+            fixInconsistenciesInImapUid(context))
             .reduce(Task.Result.COMPLETED, Task::combine);
     }
 
-    private Mono<Task.Result> fixMessageIdInconsistencies() {
-        return idToImapUidDAO.retrieveAllMessages()
-            .concatMap(this::fetchAndFixMessageId)
-            .reduce(Task.Result.COMPLETED, Task::combine);
+    private Flux<Task.Result> fixInconsistenciesInImapUid(Context context) {
+        return messageIdToImapUidDAO.retrieveAllMessages()
+            .doOnNext(any -> context.incrementProcessedImapUidEntries())
+            .concatMap(this::detectInconsistencyInImapUid)
+            .concatMap(inconsistency -> inconsistency.fix(context, 
messageIdToImapUidDAO, messageIdDAO));
     }
 
-    private Mono<Task.Result> 
fetchAndFixMessageId(ComposedMessageIdWithMetaData message) {
-        return idToImapUidDAO.retrieve((CassandraMessageId) 
message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) 
message.getComposedMessageId().getMailboxId()))
-            .single()
-            .flatMap(upToDateMessage -> messageIdDAO.retrieve((CassandraId) 
upToDateMessage.getComposedMessageId().getMailboxId(), 
upToDateMessage.getComposedMessageId().getUid())
-                .flatMap(Mono::justOrEmpty)
-                .flatMap(fetchedFromMessageId -> 
fixWhenMessageFoundInMessageId(upToDateMessage, fetchedFromMessageId)))
-            .switchIfEmpty(fixWhenMessageNotFoundInMessageId(message));
+    private Mono<Inconsistency> 
detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) {
+        return messageIdToImapUidDAO.retrieve((CassandraMessageId) 
message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) 
message.getComposedMessageId().getMailboxId()))
+            .next()
+            .flatMap(this::compareWithMessageIdRecord)
+            .onErrorResume(error -> Mono.just(new 
FailedToRetrieveRecord(message)));
     }
 
-    private Mono<Task.Result> 
fixWhenMessageFoundInMessageId(ComposedMessageIdWithMetaData 
messageFromImapUid, ComposedMessageIdWithMetaData messageFromMessageId) {
-        return Mono.fromCallable(() -> 
messageFromImapUid.equals(messageFromMessageId))
-            .flatMap(isEqual -> {
-                if (isEqual) {
-                    return Mono.just(Task.Result.COMPLETED);
+    private Mono<Inconsistency> 
compareWithMessageIdRecord(ComposedMessageIdWithMetaData 
upToDateMessageFromImapUid) {
+        return messageIdDAO.retrieve((CassandraId) 
upToDateMessageFromImapUid.getComposedMessageId().getMailboxId(), 
upToDateMessageFromImapUid.getComposedMessageId().getUid())
+            .flatMap(Mono::justOrEmpty)
+            .map(messageIdRecord -> {
+                if (messageIdRecord.equals(upToDateMessageFromImapUid)) {
+                    return NO_INCONSISTENCY;
                 }
-
-                return messageIdDAO.updateMetadata(messageFromImapUid)
-                    .then(Mono.just(Task.Result.COMPLETED))
-                    .onErrorResume(error -> {
-                        LOGGER.error("Error when fixing inconsistency for 
message: {}", messageFromImapUid, error);
-                        return Mono.just(Task.Result.PARTIAL);
-                    });
-            });
-    }
-
-    private Mono<Task.Result> 
fixWhenMessageNotFoundInMessageId(ComposedMessageIdWithMetaData message) {
-        return messageIdDAO.insert(message)
-            .then(Mono.just(Task.Result.COMPLETED))
-            .onErrorResume(error -> {
-                LOGGER.error("Error when fixing inconsistency for message: 
{}", message, error);
-                return Mono.just(Task.Result.PARTIAL);
-            });
+                return new OutdatedMessageIdEntry(messageIdRecord, 
upToDateMessageFromImapUid);
+            })
+            .switchIfEmpty(Mono.just(new 
OrphanImapUidEntry(upToDateMessageFromImapUid)));
     }
 
-    @VisibleForTesting
-    Mono<Task.Result> fixImapUidInconsistencies() {
+    private Flux<Task.Result> fixInconsistenciesInMessageId(Context context) {
         return messageIdDAO.retrieveAllMessages()
-            .concatMap(message -> process(message))
-            .reduce(Task.Result.COMPLETED, Task::combine);
+            .doOnNext(any -> context.incrementMessageIdEntries())
+            .concatMap(this::detectInconsistencyInMessageId)
+            .concatMap(inconsistency -> inconsistency.fix(context, 
messageIdToImapUidDAO, messageIdDAO));
     }
 
-    private Mono<Task.Result> process(ComposedMessageIdWithMetaData message) {
+    private Mono<Inconsistency> 
detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) {
         return messageIdDAO.retrieve((CassandraId) 
message.getComposedMessageId().getMailboxId(), 
message.getComposedMessageId().getUid())
             .flatMap(Mono::justOrEmpty)
-            .flatMap(this::fixWhenMessageFound)
-            .switchIfEmpty(Mono.just(Task.Result.COMPLETED));
-    }
-
-    private Mono<Task.Result> 
fixWhenMessageFound(ComposedMessageIdWithMetaData message) {
-        return idToImapUidDAO.retrieve((CassandraMessageId) 
message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) 
message.getComposedMessageId().getMailboxId()))
-            .flatMap(uidRecord -> {
-                if (uidRecord.equals(message)) {
-                    return Mono.just(Task.Result.COMPLETED);
-                }
-
-                return messageIdDAO.updateMetadata(uidRecord)
-                    .then(Mono.just(Task.Result.COMPLETED));
-            })
-            .switchIfEmpty(messageIdDAO.delete((CassandraId) 
message.getComposedMessageId().getMailboxId(), 
message.getComposedMessageId().getUid())
-                .then(Mono.just(Task.Result.COMPLETED)))
-            .single()
-            .onErrorResume(error -> {
-                LOGGER.error("Error when fixing inconsistency for message {}", 
message, error);
-                return Mono.just(Task.Result.PARTIAL);
-            });
+            .flatMap(upToDateMessage -> 
messageIdToImapUidDAO.retrieve((CassandraMessageId) 
message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) 
message.getComposedMessageId().getMailboxId()))
+                .map(uidRecord -> NO_INCONSISTENCY)
+                .switchIfEmpty(Mono.just(new OrphanMessageIdEntry(message)))
+                .next())
+            .onErrorResume(error -> Mono.just(new 
FailedToRetrieveRecord(message)));
     }
-}
+}
\ No newline at end of file
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java
index 8bb845b..06b7054 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java
@@ -21,7 +21,6 @@ package org.apache.james.mailbox.cassandra.mail.task;
 
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.doReturn;
 
 import java.util.Optional;
 
@@ -37,6 +36,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
+import 
org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.Context;
 import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
@@ -47,8 +47,6 @@ import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import reactor.core.publisher.Mono;
-
 public class SolveMessageInconsistenciesServiceTest {
 
     private static final CassandraId MAILBOX_ID = CassandraId.timeBased();
@@ -102,7 +100,7 @@ public class SolveMessageInconsistenciesServiceTest {
 
     @Test
     void fixMessageInconsistenciesShouldReturnCompletedWhenNoData() {
-        assertThat(testee.fixMessageInconsistencies().block())
+        assertThat(testee.fixMessageInconsistencies(new Context()).block())
             .isEqualTo(Task.Result.COMPLETED);
     }
 
@@ -111,13 +109,13 @@ public class SolveMessageInconsistenciesServiceTest {
         imapUidDAO.insert(MESSAGE_1).block();
         messageIdDAO.insert(MESSAGE_1).block();
 
-        assertThat(testee.fixMessageInconsistencies().block())
+        assertThat(testee.fixMessageInconsistencies(new Context()).block())
             .isEqualTo(Task.Result.COMPLETED);
     }
 
     @Test
     void fixMailboxInconsistenciesShouldNotAlterStateWhenEmpty() {
-        testee.fixMessageInconsistencies().block();
+        testee.fixMessageInconsistencies(new Context()).block();
 
         SoftAssertions.assertSoftly(softly -> {
             
softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()).isEmpty();
@@ -130,7 +128,7 @@ public class SolveMessageInconsistenciesServiceTest {
         imapUidDAO.insert(MESSAGE_1).block();
         messageIdDAO.insert(MESSAGE_1).block();
 
-        testee.fixMessageInconsistencies().block();
+        testee.fixMessageInconsistencies(new Context()).block();
 
         SoftAssertions.assertSoftly(softly -> {
             
softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -147,7 +145,7 @@ public class SolveMessageInconsistenciesServiceTest {
         void 
fixMessageInconsistenciesShouldReturnCompletedWhenInconsistentData() {
             imapUidDAO.insert(MESSAGE_1).block();
 
-            assertThat(testee.fixMessageInconsistencies().block())
+            assertThat(testee.fixMessageInconsistencies(new Context()).block())
                 .isEqualTo(Task.Result.COMPLETED);
         }
 
@@ -155,7 +153,7 @@ public class SolveMessageInconsistenciesServiceTest {
         void fixMessageInconsistenciesShouldResolveInconsistentData() {
             imapUidDAO.insert(MESSAGE_1).block();
 
-            testee.fixMessageInconsistencies().block();
+            testee.fixMessageInconsistencies(new Context()).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, 
Optional.of(MAILBOX_ID)).collectList().block())
@@ -170,7 +168,7 @@ public class SolveMessageInconsistenciesServiceTest {
             imapUidDAO.insert(MESSAGE_1).block();
             messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block();
 
-            assertThat(testee.fixMessageInconsistencies().block())
+            assertThat(testee.fixMessageInconsistencies(new Context()).block())
                 .isEqualTo(Task.Result.COMPLETED);
         }
 
@@ -179,7 +177,7 @@ public class SolveMessageInconsistenciesServiceTest {
             imapUidDAO.insert(MESSAGE_1).block();
             messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block();
 
-            testee.fixMessageInconsistencies().block();
+            testee.fixMessageInconsistencies(new Context()).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, 
Optional.of(MAILBOX_ID)).collectList().block())
@@ -194,7 +192,7 @@ public class SolveMessageInconsistenciesServiceTest {
             imapUidDAO.insert(MESSAGE_1).block();
             messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block();
 
-            assertThat(testee.fixMessageInconsistencies().block())
+            assertThat(testee.fixMessageInconsistencies(new Context()).block())
                 .isEqualTo(Task.Result.COMPLETED);
         }
 
@@ -203,7 +201,7 @@ public class SolveMessageInconsistenciesServiceTest {
             imapUidDAO.insert(MESSAGE_1).block();
             messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block();
 
-            testee.fixMessageInconsistencies().block();
+            testee.fixMessageInconsistencies(new Context()).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, 
Optional.of(MAILBOX_ID)).collectList().block())
@@ -224,7 +222,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .forever()
                         .whenQueryStartsWith("INSERT INTO messageIdTable 
(mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags)
 VALUES 
(:mailboxId,:uid,:modSeq,:messageId,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)"));
 
-                assertThat(testee.fixMessageInconsistencies().block())
+                assertThat(testee.fixMessageInconsistencies(new 
Context()).block())
                     .isEqualTo(Task.Result.PARTIAL);
             }
 
@@ -238,7 +236,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("INSERT INTO messageIdTable 
(mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags)
 VALUES 
(:mailboxId,:uid,:modSeq,:messageId,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)"));
 
-                assertThat(testee.fixMessageInconsistencies().block())
+                assertThat(testee.fixMessageInconsistencies(new 
Context()).block())
                     .isEqualTo(Task.Result.PARTIAL);
             }
 
@@ -252,7 +250,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("INSERT INTO messageIdTable 
(mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags)
 VALUES 
(:mailboxId,:uid,:modSeq,d2bee791-7e63-11ea-883c-95b84008f979,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)"));
 
-                testee.fixMessageInconsistencies().block();
+                testee.fixMessageInconsistencies(new Context()).block();
 
                 SoftAssertions.assertSoftly(softly -> {
                     softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_2, 
Optional.of(MAILBOX_ID)).collectList().block())
@@ -261,6 +259,46 @@ public class SolveMessageInconsistenciesServiceTest {
                         .isEqualTo(MESSAGE_2);
                 });
             }
+
+            @Test
+            void 
fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveImapUidRecord(CassandraCluster
 cassandra) {
+                Context context = new Context();
+
+                imapUidDAO.insert(MESSAGE_1).block();
+
+                cassandra.getConf()
+                    .registerScenario(fail()
+                        .times(1)
+                        .whenQueryStartsWith("SELECT 
messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags
 FROM imapUidTable WHERE messageId=:messageId AND mailboxId=:mailboxId"));
+
+                testee.fixMessageInconsistencies(context).block();
+
+                assertThat(context.snapshot())
+                    .isEqualTo(Context.Snapshot.builder()
+                        .processedImapUidEntries(1)
+                        .errors(MESSAGE_1.getComposedMessageId())
+                        .build());
+            }
+
+            @Test
+            void 
fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveMessageIdRecord(CassandraCluster
 cassandra) {
+                Context context = new Context();
+
+                imapUidDAO.insert(MESSAGE_1).block();
+
+                cassandra.getConf()
+                    .registerScenario(fail()
+                        .times(1)
+                        .whenQueryStartsWith("SELECT 
messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags
 FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid"));
+
+                testee.fixMessageInconsistencies(context).block();
+
+                assertThat(context.snapshot())
+                    .isEqualTo(Context.Snapshot.builder()
+                        .processedImapUidEntries(1)
+                        .errors(MESSAGE_1.getComposedMessageId())
+                        .build());
+            }
         }
     }
 
@@ -271,7 +309,7 @@ public class SolveMessageInconsistenciesServiceTest {
         void 
fixMessageInconsistenciesShouldReturnCompletedWhenInconsistentData() {
             messageIdDAO.insert(MESSAGE_1).block();
 
-            assertThat(testee.fixMessageInconsistencies().block())
+            assertThat(testee.fixMessageInconsistencies(new Context()).block())
                 .isEqualTo(Task.Result.COMPLETED);
         }
 
@@ -279,7 +317,7 @@ public class SolveMessageInconsistenciesServiceTest {
         void fixMessageInconsistenciesShouldResolveInconsistentData() {
             messageIdDAO.insert(MESSAGE_1).block();
 
-            testee.fixMessageInconsistencies().block();
+            testee.fixMessageInconsistencies(new Context()).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 
softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -296,7 +334,7 @@ public class SolveMessageInconsistenciesServiceTest {
 
             imapUidDAO.insert(MESSAGE_1).block();
 
-            assertThat(testee.fixMessageInconsistencies().block())
+            assertThat(testee.fixMessageInconsistencies(new Context()).block())
                 .isEqualTo(Task.Result.COMPLETED);
         }
 
@@ -307,35 +345,7 @@ public class SolveMessageInconsistenciesServiceTest {
 
             imapUidDAO.insert(MESSAGE_1).block();
 
-            testee.fixMessageInconsistencies().block();
-
-            SoftAssertions.assertSoftly(softly -> {
-                
softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
-                    .containsExactly(MESSAGE_1);
-                
softly.assertThat(messageIdDAO.retrieveAllMessages().collectList().block())
-                    .containsExactly(MESSAGE_1);
-            });
-        }
-
-        @Test
-        void fixImapUidInconsistenciesShouldCompleteWhenInconsistent() {
-            messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block();
-
-            imapUidDAO.insert(MESSAGE_1).block();
-
-            testee.fixMessageInconsistencies().block();
-
-            assertThat(testee.fixImapUidInconsistencies().block())
-                .isEqualTo(Task.Result.COMPLETED);
-        }
-
-        @Test
-        void fixImapUidInconsistenciesShouldResolveInconsistent() {
-            messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block();
-
-            imapUidDAO.insert(MESSAGE_1).block();
-
-            testee.fixMessageInconsistencies().block();
+            testee.fixMessageInconsistencies(new Context()).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 
softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -356,7 +366,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .forever()
                         .whenQueryStartsWith("DELETE FROM messageIdTable WHERE 
mailboxId=:mailboxId AND uid=:uid"));
 
-                assertThat(testee.fixMessageInconsistencies().block())
+                assertThat(testee.fixMessageInconsistencies(new 
Context()).block())
                     .isEqualTo(Task.Result.PARTIAL);
             }
 
@@ -370,7 +380,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("DELETE FROM messageIdTable WHERE 
mailboxId=:mailboxId AND uid=:uid"));
 
-                assertThat(testee.fixMessageInconsistencies().block())
+                assertThat(testee.fixMessageInconsistencies(new 
Context()).block())
                     .isEqualTo(Task.Result.PARTIAL);
             }
 
@@ -384,7 +394,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("DELETE FROM messageIdTable WHERE 
mailboxId=:mailboxId AND uid=:uid;"));
 
-                testee.fixMessageInconsistencies().block();
+                testee.fixMessageInconsistencies(new Context()).block();
 
                 SoftAssertions.assertSoftly(softly -> {
                     
softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -393,6 +403,159 @@ public class SolveMessageInconsistenciesServiceTest {
                         .containsExactly(MESSAGE_1);
                 });
             }
+
+            @Test
+            void 
fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveMessageIdRecord(CassandraCluster
 cassandra) {
+                Context context = new Context();
+
+                messageIdDAO.insert(MESSAGE_1).block();
+
+                cassandra.getConf()
+                    .registerScenario(fail()
+                        .times(1)
+                        .whenQueryStartsWith("SELECT 
messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags
 FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid"));
+
+                testee.fixMessageInconsistencies(context).block();
+
+                assertThat(context.snapshot())
+                    .isEqualTo(Context.Snapshot.builder()
+                        .processedMessageIdEntries(1)
+                        .errors(MESSAGE_1.getComposedMessageId())
+                        .build());
+            }
+
+            @Test
+            void 
fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveImapUidRecord(CassandraCluster
 cassandra) {
+                Context context = new Context();
+
+                messageIdDAO.insert(MESSAGE_1).block();
+
+                cassandra.getConf()
+                    .registerScenario(fail()
+                        .times(1)
+                        .whenQueryStartsWith("SELECT 
messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags
 FROM imapUidTable WHERE messageId=:messageId AND mailboxId=:mailboxId"));
+
+                testee.fixMessageInconsistencies(context).block();
+
+                assertThat(context.snapshot())
+                    .isEqualTo(Context.Snapshot.builder()
+                        .processedMessageIdEntries(1)
+                        .errors(MESSAGE_1.getComposedMessageId())
+                        .build());
+            }
         }
     }
+
+    @Test
+    void fixMailboxInconsistenciesShouldNotUpdateContextWhenNoData() {
+        Context context = new Context();
+
+        testee.fixMessageInconsistencies(context).block();
+
+        
assertThat(context.snapshot()).isEqualToComparingFieldByFieldRecursively(new 
Context().snapshot());
+    }
+
+    @Test
+    void fixMessageInconsistenciesShouldUpdateContextWhenConsistentData() {
+        Context context = new Context();
+
+        imapUidDAO.insert(MESSAGE_1).block();
+        messageIdDAO.insert(MESSAGE_1).block();
+
+        testee.fixMessageInconsistencies(context).block();
+
+        assertThat(context.snapshot())
+            .isEqualTo(Context.Snapshot.builder()
+                .processedImapUidEntries(1)
+                .processedMessageIdEntries(1)
+                .build());
+    }
+
+    @Test
+    void 
fixMessageInconsistenciesShouldUpdateContextWhenOrphanImapUidMessage() {
+        Context context = new Context();
+
+        imapUidDAO.insert(MESSAGE_1).block();
+
+        testee.fixMessageInconsistencies(context).block();
+
+        assertThat(context.snapshot())
+            .isEqualTo(Context.Snapshot.builder()
+                .processedImapUidEntries(1)
+                .addedMessageIdEntries(1)
+                .addFixedInconsistencies(MESSAGE_1.getComposedMessageId())
+                .build());
+    }
+
+    @Test
+    void fixMailboxInconsistenciesShouldUpdateContextWhenInconsistentModSeq() {
+        Context context = new Context();
+
+        imapUidDAO.insert(MESSAGE_1).block();
+        messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block();
+
+        testee.fixMessageInconsistencies(context).block();
+
+        assertThat(context.snapshot())
+            .isEqualTo(Context.Snapshot.builder()
+                .processedImapUidEntries(1)
+                .processedMessageIdEntries(1)
+                .updatedMessageIdEntries(1)
+                .addFixedInconsistencies(MESSAGE_1.getComposedMessageId())
+                .build());
+    }
+
+    @Test
+    void fixMailboxInconsistenciesShouldUpdateContextWhenInconsistentFlags() {
+        Context context = new Context();
+
+        imapUidDAO.insert(MESSAGE_1).block();
+        messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block();
+
+        testee.fixMessageInconsistencies(context).block();
+
+        assertThat(context.snapshot())
+            .isEqualTo(Context.Snapshot.builder()
+                .processedImapUidEntries(1)
+                .processedMessageIdEntries(1)
+                .updatedMessageIdEntries(1)
+                .addFixedInconsistencies(MESSAGE_1.getComposedMessageId())
+                .build());
+    }
+
+    @Test
+    void 
fixMailboxInconsistenciesShouldUpdateContextWhenOrphanMessageIdMessage() {
+        Context context = new Context();
+
+        messageIdDAO.insert(MESSAGE_1).block();
+
+        testee.fixMessageInconsistencies(context).block();
+
+        assertThat(context.snapshot())
+            .isEqualTo(Context.Snapshot.builder()
+                .processedMessageIdEntries(1)
+                .removedMessageIdEntries(1)
+                .addFixedInconsistencies(MESSAGE_1.getComposedMessageId())
+                .build());
+    }
+
+    @Test
+    void 
fixMailboxInconsistenciesShouldUpdateContextWhenDeleteError(CassandraCluster 
cassandra) {
+        Context context = new Context();
+
+        messageIdDAO.insert(MESSAGE_1).block();
+
+        cassandra.getConf()
+            .registerScenario(fail()
+                .times(1)
+                .whenQueryStartsWith("DELETE FROM messageIdTable WHERE 
mailboxId=:mailboxId AND uid=:uid;"));
+
+        testee.fixMessageInconsistencies(context).block();
+
+        assertThat(context.snapshot())
+            .isEqualTo(Context.Snapshot.builder()
+                .processedMessageIdEntries(1)
+                .errors(MESSAGE_1.getComposedMessageId())
+                .build());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to