This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 01cb8d1af106486bf60924ed1dfa622880cfe74f Author: LanKhuat <khuatdang...@gmail.com> AuthorDate: Fri Apr 17 01:57:50 2020 +0700 JAMES-3143 Add context & objects describing inconsistencies --- .../task/SolveMessageInconsistenciesService.java | 446 +++++++++++++++++---- .../SolveMessageInconsistenciesServiceTest.java | 265 +++++++++--- 2 files changed, 593 insertions(+), 118 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java index f67ff73..8518f1e 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java @@ -19,7 +19,11 @@ package org.apache.james.mailbox.cassandra.mail.task; +import java.util.Collection; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; @@ -27,105 +31,413 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO; +import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class SolveMessageInconsistenciesService { + + @FunctionalInterface + interface Inconsistency { + Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO); + } + + private static Inconsistency NO_INCONSISTENCY = (context, imapUidDAO, messageIdDAO) -> Mono.just(Task.Result.COMPLETED); + + private static class FailedToRetrieveRecord implements Inconsistency { + private final ComposedMessageIdWithMetaData message; + + private FailedToRetrieveRecord(ComposedMessageIdWithMetaData message) { + this.message = message; + } + + @Override + public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO) { + context.addErrors(message.getComposedMessageId()); + LOGGER.error("Failed to retrieve record: {}", message.getComposedMessageId()); + return Mono.just(Task.Result.PARTIAL); + } + } + + private static class OrphanImapUidEntry implements Inconsistency { + private final ComposedMessageIdWithMetaData message; + + private OrphanImapUidEntry(ComposedMessageIdWithMetaData message) { + this.message = message; + } + + @Override + public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO) { + return messageIdDAO.insert(message) + .doOnSuccess(any -> notifySuccess(context)) + .thenReturn(Task.Result.COMPLETED) + .onErrorResume(error -> { + notifyFailure(context); + return Mono.just(Task.Result.PARTIAL); + }); + } + + private void notifyFailure(Context context) { + context.addErrors(message.getComposedMessageId()); + LOGGER.error("Failed to fix inconsistency for orphan message in ImapUid: {}", message.getComposedMessageId()); + } + + private void notifySuccess(Context context) { + LOGGER.info("Inconsistency fixed for orphan message in ImapUid: {}", message.getComposedMessageId()); + context.incrementAddedMessageIdEntries(); + context.addFixedInconsistency(message.getComposedMessageId()); + } + } + + private static class OutdatedMessageIdEntry implements Inconsistency { + private final ComposedMessageIdWithMetaData messageFromMessageId; + private final ComposedMessageIdWithMetaData messageFromImapUid; + + private OutdatedMessageIdEntry(ComposedMessageIdWithMetaData message, ComposedMessageIdWithMetaData messageFromImapUid) { + this.messageFromMessageId = message; + this.messageFromImapUid = messageFromImapUid; + } + + @Override + public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO) { + return messageIdDAO.updateMetadata(messageFromImapUid) + .doOnSuccess(any -> notifySuccess(context)) + .thenReturn(Task.Result.COMPLETED) + .onErrorResume(error -> { + notifyFailure(context); + return Mono.just(Task.Result.PARTIAL); + }); + } + + private void notifyFailure(Context context) { + context.addErrors(messageFromMessageId.getComposedMessageId()); + LOGGER.error("Failed to fix inconsistency for outdated message in MessageId: {}", messageFromMessageId.getComposedMessageId()); + } + + private void notifySuccess(Context context) { + LOGGER.info("Inconsistency fixed for outdated message in MessageId: {}", messageFromMessageId.getComposedMessageId()); + context.incrementUpdatedMessageIdEntries(); + context.addFixedInconsistency(messageFromMessageId.getComposedMessageId()); + } + } + + private static class OrphanMessageIdEntry implements Inconsistency { + private final ComposedMessageIdWithMetaData message; + + private OrphanMessageIdEntry(ComposedMessageIdWithMetaData message) { + this.message = message; + } + + @Override + public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO) { + return messageIdDAO.delete((CassandraId) message.getComposedMessageId().getMailboxId(), message.getComposedMessageId().getUid()) + .doOnSuccess(any -> notifySuccess(context)) + .thenReturn(Task.Result.COMPLETED) + .onErrorResume(error -> { + notifyFailure(context); + return Mono.just(Task.Result.PARTIAL); + }); + } + + private void notifyFailure(Context context) { + context.addErrors(message.getComposedMessageId()); + LOGGER.error("Failed to fix inconsistency for orphan message in MessageId: {}", message.getComposedMessageId()); + } + + private void notifySuccess(Context context) { + LOGGER.info("Inconsistency fixed for orphan message in MessageId: {}", message.getComposedMessageId()); + context.incrementRemovedMessageIdEntries(); + context.addFixedInconsistency(message.getComposedMessageId()); + } + } + + static class Context { + static class Snapshot { + public static Builder builder() { + return new Builder(); + } + + static class Builder { + private Optional<Long> processedImapUidEntries; + private Optional<Long> processedMessageIdEntries; + private Optional<Long> addedMessageIdEntries; + private Optional<Long> updatedMessageIdEntries; + private Optional<Long> removedMessageIdEntries; + private ImmutableList.Builder<ComposedMessageId> fixedInconsistencies; + private ImmutableList.Builder<ComposedMessageId> errors; + + Builder() { + processedImapUidEntries = Optional.empty(); + processedMessageIdEntries = Optional.empty(); + addedMessageIdEntries = Optional.empty(); + updatedMessageIdEntries = Optional.empty(); + removedMessageIdEntries = Optional.empty(); + fixedInconsistencies = ImmutableList.builder(); + errors = ImmutableList.builder(); + } + + public Builder processedImapUidEntries(long count) { + processedImapUidEntries = Optional.of(count); + return this; + } + + public Builder processedMessageIdEntries(long count) { + processedMessageIdEntries = Optional.of(count); + return this; + } + + public Builder addedMessageIdEntries(long count) { + addedMessageIdEntries = Optional.of(count); + return this; + } + + public Builder updatedMessageIdEntries(long count) { + updatedMessageIdEntries = Optional.of(count); + return this; + } + + public Builder removedMessageIdEntries(long count) { + removedMessageIdEntries = Optional.of(count); + return this; + } + + public Builder addFixedInconsistencies(ComposedMessageId composedMessageId) { + fixedInconsistencies.add(composedMessageId); + return this; + } + + public Builder errors(ComposedMessageId composedMessageId) { + errors.add(composedMessageId); + return this; + } + + public SolveMessageInconsistenciesService.Context.Snapshot build() { + return new SolveMessageInconsistenciesService.Context.Snapshot( + processedImapUidEntries.orElse(0L), + processedMessageIdEntries.orElse(0L), + addedMessageIdEntries.orElse(0L), + updatedMessageIdEntries.orElse(0L), + removedMessageIdEntries.orElse(0L), + fixedInconsistencies.build(), + errors.build()); + } + } + + private final long processedImapUidEntries; + private final long processedMessageIdEntries; + private final long addedMessageIdEntries; + private final long updatedMessageIdEntries; + private final long removedMessageIdEntries; + private final ImmutableList<ComposedMessageId> fixedInconsistencies; + private final ImmutableList<ComposedMessageId> errors; + + private Snapshot(long processedImapUidEntries, long processedMessageIdEntries, + long addedMessageIdEntries, long updatedMessageIdEntries, + long removedMessageIdEntries, + ImmutableList<ComposedMessageId> fixedInconsistencies, + ImmutableList<ComposedMessageId> errors) { + this.processedImapUidEntries = processedImapUidEntries; + this.processedMessageIdEntries = processedMessageIdEntries; + this.addedMessageIdEntries = addedMessageIdEntries; + this.updatedMessageIdEntries = updatedMessageIdEntries; + this.removedMessageIdEntries = removedMessageIdEntries; + this.fixedInconsistencies = fixedInconsistencies; + this.errors = errors; + } + + public long getProcessedImapUidEntries() { + return processedImapUidEntries; + } + + public long getProcessedMessageIdEntries() { + return processedMessageIdEntries; + } + + public long getAddedMessageIdEntries() { + return addedMessageIdEntries; + } + + public long getUpdatedMessageIdEntries() { + return updatedMessageIdEntries; + } + + public long getRemovedMessageIdEntries() { + return removedMessageIdEntries; + } + + public ImmutableList<ComposedMessageId> getFixedInconsistencies() { + return fixedInconsistencies; + } + + public ImmutableList<ComposedMessageId> getErrors() { + return errors; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof Snapshot) { + Snapshot snapshot = (Snapshot) o; + + return Objects.equals(this.processedImapUidEntries, snapshot.processedImapUidEntries) + && Objects.equals(this.processedMessageIdEntries, snapshot.processedMessageIdEntries) + && Objects.equals(this.addedMessageIdEntries, snapshot.addedMessageIdEntries) + && Objects.equals(this.updatedMessageIdEntries, snapshot.updatedMessageIdEntries) + && Objects.equals(this.removedMessageIdEntries, snapshot.removedMessageIdEntries) + && Objects.equals(this.errors, snapshot.errors) + && Objects.equals(this.fixedInconsistencies, snapshot.fixedInconsistencies); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(processedImapUidEntries, processedMessageIdEntries, addedMessageIdEntries, updatedMessageIdEntries, removedMessageIdEntries, fixedInconsistencies, errors); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("processedImapUidEntries", processedImapUidEntries) + .add("processedMessageIdEntries", processedMessageIdEntries) + .add("addedMessageIdEntries", addedMessageIdEntries) + .add("updatedMessageIdEntries", updatedMessageIdEntries) + .add("removedMessageIdEntries", removedMessageIdEntries) + .add("fixedInconsistencies", fixedInconsistencies) + .add("errors", errors) + .toString(); + } + } + + private final AtomicLong processedImapUidEntries; + private final AtomicLong processedMessageIdEntries; + private final AtomicLong addedMessageIdEntries; + private final AtomicLong updatedMessageIdEntries; + private final AtomicLong removedMessageIdEntries; + private final ConcurrentLinkedDeque<ComposedMessageId> fixedInconsistencies; + private final ConcurrentLinkedDeque<ComposedMessageId> errors; + + Context() { + this(new AtomicLong(), new AtomicLong(), new AtomicLong(), new AtomicLong(), new AtomicLong(), ImmutableList.of(), ImmutableList.of()); + } + + private Context(AtomicLong processedImapUidEntries, AtomicLong processedMessageIdEntries, AtomicLong addedMessageIdEntries, + AtomicLong updatedMessageIdEntries, AtomicLong removedMessageIdEntries, Collection<ComposedMessageId> fixedInconsistencies, + Collection<ComposedMessageId> errors) { + this.processedImapUidEntries = processedImapUidEntries; + this.processedMessageIdEntries = processedMessageIdEntries; + this.addedMessageIdEntries = addedMessageIdEntries; + this.updatedMessageIdEntries = updatedMessageIdEntries; + this.removedMessageIdEntries = removedMessageIdEntries; + this.fixedInconsistencies = new ConcurrentLinkedDeque<>(fixedInconsistencies); + this.errors = new ConcurrentLinkedDeque<>(errors); + } + + void incrementProcessedImapUidEntries() { + processedImapUidEntries.incrementAndGet(); + } + + void incrementMessageIdEntries() { + processedMessageIdEntries.incrementAndGet(); + } + + void incrementAddedMessageIdEntries() { + addedMessageIdEntries.incrementAndGet(); + } + + void incrementUpdatedMessageIdEntries() { + updatedMessageIdEntries.incrementAndGet(); + } + + void incrementRemovedMessageIdEntries() { + removedMessageIdEntries.incrementAndGet(); + } + + void addFixedInconsistency(ComposedMessageId messageId) { + fixedInconsistencies.add(messageId); + } + + void addErrors(ComposedMessageId messageId) { + errors.add(messageId); + } + + Snapshot snapshot() { + return new Snapshot( + processedImapUidEntries.get(), + processedMessageIdEntries.get(), + addedMessageIdEntries.get(), + updatedMessageIdEntries.get(), + removedMessageIdEntries.get(), + ImmutableList.copyOf(fixedInconsistencies), + ImmutableList.copyOf(errors)); + } + } + public static final Logger LOGGER = LoggerFactory.getLogger(SolveMessageInconsistenciesService.class); - private final CassandraMessageIdToImapUidDAO idToImapUidDAO; + private final CassandraMessageIdToImapUidDAO messageIdToImapUidDAO; private final CassandraMessageIdDAO messageIdDAO; @Inject - SolveMessageInconsistenciesService(CassandraMessageIdToImapUidDAO idToImapUidDAO, CassandraMessageIdDAO messageIdDAO) { - this.idToImapUidDAO = idToImapUidDAO; + SolveMessageInconsistenciesService(CassandraMessageIdToImapUidDAO messageIdToImapUidDAO, CassandraMessageIdDAO messageIdDAO) { + this.messageIdToImapUidDAO = messageIdToImapUidDAO; this.messageIdDAO = messageIdDAO; } - Mono<Task.Result> fixMessageInconsistencies() { + Mono<Task.Result> fixMessageInconsistencies(Context context) { return Flux.concat( - fixMessageIdInconsistencies(), - fixImapUidInconsistencies()) + fixInconsistenciesInMessageId(context), + fixInconsistenciesInImapUid(context)) .reduce(Task.Result.COMPLETED, Task::combine); } - private Mono<Task.Result> fixMessageIdInconsistencies() { - return idToImapUidDAO.retrieveAllMessages() - .concatMap(this::fetchAndFixMessageId) - .reduce(Task.Result.COMPLETED, Task::combine); + private Flux<Task.Result> fixInconsistenciesInImapUid(Context context) { + return messageIdToImapUidDAO.retrieveAllMessages() + .doOnNext(any -> context.incrementProcessedImapUidEntries()) + .concatMap(this::detectInconsistencyInImapUid) + .concatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO)); } - private Mono<Task.Result> fetchAndFixMessageId(ComposedMessageIdWithMetaData message) { - return idToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId())) - .single() - .flatMap(upToDateMessage -> messageIdDAO.retrieve((CassandraId) upToDateMessage.getComposedMessageId().getMailboxId(), upToDateMessage.getComposedMessageId().getUid()) - .flatMap(Mono::justOrEmpty) - .flatMap(fetchedFromMessageId -> fixWhenMessageFoundInMessageId(upToDateMessage, fetchedFromMessageId))) - .switchIfEmpty(fixWhenMessageNotFoundInMessageId(message)); + private Mono<Inconsistency> detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) { + return messageIdToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId())) + .next() + .flatMap(this::compareWithMessageIdRecord) + .onErrorResume(error -> Mono.just(new FailedToRetrieveRecord(message))); } - private Mono<Task.Result> fixWhenMessageFoundInMessageId(ComposedMessageIdWithMetaData messageFromImapUid, ComposedMessageIdWithMetaData messageFromMessageId) { - return Mono.fromCallable(() -> messageFromImapUid.equals(messageFromMessageId)) - .flatMap(isEqual -> { - if (isEqual) { - return Mono.just(Task.Result.COMPLETED); + private Mono<Inconsistency> compareWithMessageIdRecord(ComposedMessageIdWithMetaData upToDateMessageFromImapUid) { + return messageIdDAO.retrieve((CassandraId) upToDateMessageFromImapUid.getComposedMessageId().getMailboxId(), upToDateMessageFromImapUid.getComposedMessageId().getUid()) + .flatMap(Mono::justOrEmpty) + .map(messageIdRecord -> { + if (messageIdRecord.equals(upToDateMessageFromImapUid)) { + return NO_INCONSISTENCY; } - - return messageIdDAO.updateMetadata(messageFromImapUid) - .then(Mono.just(Task.Result.COMPLETED)) - .onErrorResume(error -> { - LOGGER.error("Error when fixing inconsistency for message: {}", messageFromImapUid, error); - return Mono.just(Task.Result.PARTIAL); - }); - }); - } - - private Mono<Task.Result> fixWhenMessageNotFoundInMessageId(ComposedMessageIdWithMetaData message) { - return messageIdDAO.insert(message) - .then(Mono.just(Task.Result.COMPLETED)) - .onErrorResume(error -> { - LOGGER.error("Error when fixing inconsistency for message: {}", message, error); - return Mono.just(Task.Result.PARTIAL); - }); + return new OutdatedMessageIdEntry(messageIdRecord, upToDateMessageFromImapUid); + }) + .switchIfEmpty(Mono.just(new OrphanImapUidEntry(upToDateMessageFromImapUid))); } - @VisibleForTesting - Mono<Task.Result> fixImapUidInconsistencies() { + private Flux<Task.Result> fixInconsistenciesInMessageId(Context context) { return messageIdDAO.retrieveAllMessages() - .concatMap(message -> process(message)) - .reduce(Task.Result.COMPLETED, Task::combine); + .doOnNext(any -> context.incrementMessageIdEntries()) + .concatMap(this::detectInconsistencyInMessageId) + .concatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO)); } - private Mono<Task.Result> process(ComposedMessageIdWithMetaData message) { + private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) { return messageIdDAO.retrieve((CassandraId) message.getComposedMessageId().getMailboxId(), message.getComposedMessageId().getUid()) .flatMap(Mono::justOrEmpty) - .flatMap(this::fixWhenMessageFound) - .switchIfEmpty(Mono.just(Task.Result.COMPLETED)); - } - - private Mono<Task.Result> fixWhenMessageFound(ComposedMessageIdWithMetaData message) { - return idToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId())) - .flatMap(uidRecord -> { - if (uidRecord.equals(message)) { - return Mono.just(Task.Result.COMPLETED); - } - - return messageIdDAO.updateMetadata(uidRecord) - .then(Mono.just(Task.Result.COMPLETED)); - }) - .switchIfEmpty(messageIdDAO.delete((CassandraId) message.getComposedMessageId().getMailboxId(), message.getComposedMessageId().getUid()) - .then(Mono.just(Task.Result.COMPLETED))) - .single() - .onErrorResume(error -> { - LOGGER.error("Error when fixing inconsistency for message {}", message, error); - return Mono.just(Task.Result.PARTIAL); - }); + .flatMap(upToDateMessage -> messageIdToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId())) + .map(uidRecord -> NO_INCONSISTENCY) + .switchIfEmpty(Mono.just(new OrphanMessageIdEntry(message))) + .next()) + .onErrorResume(error -> Mono.just(new FailedToRetrieveRecord(message))); } -} +} \ No newline at end of file diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java index 8bb845b..06b7054 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java @@ -21,7 +21,6 @@ package org.apache.james.mailbox.cassandra.mail.task; import static org.apache.james.backends.cassandra.Scenario.Builder.fail; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.doReturn; import java.util.Optional; @@ -37,6 +36,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO; +import org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.Context; import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; @@ -47,8 +47,6 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import reactor.core.publisher.Mono; - public class SolveMessageInconsistenciesServiceTest { private static final CassandraId MAILBOX_ID = CassandraId.timeBased(); @@ -102,7 +100,7 @@ public class SolveMessageInconsistenciesServiceTest { @Test void fixMessageInconsistenciesShouldReturnCompletedWhenNoData() { - assertThat(testee.fixMessageInconsistencies().block()) + assertThat(testee.fixMessageInconsistencies(new Context()).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -111,13 +109,13 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1).block(); - assertThat(testee.fixMessageInconsistencies().block()) + assertThat(testee.fixMessageInconsistencies(new Context()).block()) .isEqualTo(Task.Result.COMPLETED); } @Test void fixMailboxInconsistenciesShouldNotAlterStateWhenEmpty() { - testee.fixMessageInconsistencies().block(); + testee.fixMessageInconsistencies(new Context()).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()).isEmpty(); @@ -130,7 +128,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1).block(); - testee.fixMessageInconsistencies().block(); + testee.fixMessageInconsistencies(new Context()).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()) @@ -147,7 +145,7 @@ public class SolveMessageInconsistenciesServiceTest { void fixMessageInconsistenciesShouldReturnCompletedWhenInconsistentData() { imapUidDAO.insert(MESSAGE_1).block(); - assertThat(testee.fixMessageInconsistencies().block()) + assertThat(testee.fixMessageInconsistencies(new Context()).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -155,7 +153,7 @@ public class SolveMessageInconsistenciesServiceTest { void fixMessageInconsistenciesShouldResolveInconsistentData() { imapUidDAO.insert(MESSAGE_1).block(); - testee.fixMessageInconsistencies().block(); + testee.fixMessageInconsistencies(new Context()).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block()) @@ -170,7 +168,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block(); - assertThat(testee.fixMessageInconsistencies().block()) + assertThat(testee.fixMessageInconsistencies(new Context()).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -179,7 +177,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block(); - testee.fixMessageInconsistencies().block(); + testee.fixMessageInconsistencies(new Context()).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block()) @@ -194,7 +192,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block(); - assertThat(testee.fixMessageInconsistencies().block()) + assertThat(testee.fixMessageInconsistencies(new Context()).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -203,7 +201,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block(); - testee.fixMessageInconsistencies().block(); + testee.fixMessageInconsistencies(new Context()).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block()) @@ -224,7 +222,7 @@ public class SolveMessageInconsistenciesServiceTest { .forever() .whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,:messageId,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)")); - assertThat(testee.fixMessageInconsistencies().block()) + assertThat(testee.fixMessageInconsistencies(new Context()).block()) .isEqualTo(Task.Result.PARTIAL); } @@ -238,7 +236,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,:messageId,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)")); - assertThat(testee.fixMessageInconsistencies().block()) + assertThat(testee.fixMessageInconsistencies(new Context()).block()) .isEqualTo(Task.Result.PARTIAL); } @@ -252,7 +250,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,d2bee791-7e63-11ea-883c-95b84008f979,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)")); - testee.fixMessageInconsistencies().block(); + testee.fixMessageInconsistencies(new Context()).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_2, Optional.of(MAILBOX_ID)).collectList().block()) @@ -261,6 +259,46 @@ public class SolveMessageInconsistenciesServiceTest { .isEqualTo(MESSAGE_2); }); } + + @Test + void fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveImapUidRecord(CassandraCluster cassandra) { + Context context = new Context(); + + imapUidDAO.insert(MESSAGE_1).block(); + + cassandra.getConf() + .registerScenario(fail() + .times(1) + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM imapUidTable WHERE messageId=:messageId AND mailboxId=:mailboxId")); + + testee.fixMessageInconsistencies(context).block(); + + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedImapUidEntries(1) + .errors(MESSAGE_1.getComposedMessageId()) + .build()); + } + + @Test + void fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveMessageIdRecord(CassandraCluster cassandra) { + Context context = new Context(); + + imapUidDAO.insert(MESSAGE_1).block(); + + cassandra.getConf() + .registerScenario(fail() + .times(1) + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid")); + + testee.fixMessageInconsistencies(context).block(); + + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedImapUidEntries(1) + .errors(MESSAGE_1.getComposedMessageId()) + .build()); + } } } @@ -271,7 +309,7 @@ public class SolveMessageInconsistenciesServiceTest { void fixMessageInconsistenciesShouldReturnCompletedWhenInconsistentData() { messageIdDAO.insert(MESSAGE_1).block(); - assertThat(testee.fixMessageInconsistencies().block()) + assertThat(testee.fixMessageInconsistencies(new Context()).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -279,7 +317,7 @@ public class SolveMessageInconsistenciesServiceTest { void fixMessageInconsistenciesShouldResolveInconsistentData() { messageIdDAO.insert(MESSAGE_1).block(); - testee.fixMessageInconsistencies().block(); + testee.fixMessageInconsistencies(new Context()).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()) @@ -296,7 +334,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); - assertThat(testee.fixMessageInconsistencies().block()) + assertThat(testee.fixMessageInconsistencies(new Context()).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -307,35 +345,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); - testee.fixMessageInconsistencies().block(); - - SoftAssertions.assertSoftly(softly -> { - softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()) - .containsExactly(MESSAGE_1); - softly.assertThat(messageIdDAO.retrieveAllMessages().collectList().block()) - .containsExactly(MESSAGE_1); - }); - } - - @Test - void fixImapUidInconsistenciesShouldCompleteWhenInconsistent() { - messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block(); - - imapUidDAO.insert(MESSAGE_1).block(); - - testee.fixMessageInconsistencies().block(); - - assertThat(testee.fixImapUidInconsistencies().block()) - .isEqualTo(Task.Result.COMPLETED); - } - - @Test - void fixImapUidInconsistenciesShouldResolveInconsistent() { - messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block(); - - imapUidDAO.insert(MESSAGE_1).block(); - - testee.fixMessageInconsistencies().block(); + testee.fixMessageInconsistencies(new Context()).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()) @@ -356,7 +366,7 @@ public class SolveMessageInconsistenciesServiceTest { .forever() .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid")); - assertThat(testee.fixMessageInconsistencies().block()) + assertThat(testee.fixMessageInconsistencies(new Context()).block()) .isEqualTo(Task.Result.PARTIAL); } @@ -370,7 +380,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid")); - assertThat(testee.fixMessageInconsistencies().block()) + assertThat(testee.fixMessageInconsistencies(new Context()).block()) .isEqualTo(Task.Result.PARTIAL); } @@ -384,7 +394,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;")); - testee.fixMessageInconsistencies().block(); + testee.fixMessageInconsistencies(new Context()).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()) @@ -393,6 +403,159 @@ public class SolveMessageInconsistenciesServiceTest { .containsExactly(MESSAGE_1); }); } + + @Test + void fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveMessageIdRecord(CassandraCluster cassandra) { + Context context = new Context(); + + messageIdDAO.insert(MESSAGE_1).block(); + + cassandra.getConf() + .registerScenario(fail() + .times(1) + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid")); + + testee.fixMessageInconsistencies(context).block(); + + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedMessageIdEntries(1) + .errors(MESSAGE_1.getComposedMessageId()) + .build()); + } + + @Test + void fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveImapUidRecord(CassandraCluster cassandra) { + Context context = new Context(); + + messageIdDAO.insert(MESSAGE_1).block(); + + cassandra.getConf() + .registerScenario(fail() + .times(1) + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM imapUidTable WHERE messageId=:messageId AND mailboxId=:mailboxId")); + + testee.fixMessageInconsistencies(context).block(); + + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedMessageIdEntries(1) + .errors(MESSAGE_1.getComposedMessageId()) + .build()); + } } } + + @Test + void fixMailboxInconsistenciesShouldNotUpdateContextWhenNoData() { + Context context = new Context(); + + testee.fixMessageInconsistencies(context).block(); + + assertThat(context.snapshot()).isEqualToComparingFieldByFieldRecursively(new Context().snapshot()); + } + + @Test + void fixMessageInconsistenciesShouldUpdateContextWhenConsistentData() { + Context context = new Context(); + + imapUidDAO.insert(MESSAGE_1).block(); + messageIdDAO.insert(MESSAGE_1).block(); + + testee.fixMessageInconsistencies(context).block(); + + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedImapUidEntries(1) + .processedMessageIdEntries(1) + .build()); + } + + @Test + void fixMessageInconsistenciesShouldUpdateContextWhenOrphanImapUidMessage() { + Context context = new Context(); + + imapUidDAO.insert(MESSAGE_1).block(); + + testee.fixMessageInconsistencies(context).block(); + + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedImapUidEntries(1) + .addedMessageIdEntries(1) + .addFixedInconsistencies(MESSAGE_1.getComposedMessageId()) + .build()); + } + + @Test + void fixMailboxInconsistenciesShouldUpdateContextWhenInconsistentModSeq() { + Context context = new Context(); + + imapUidDAO.insert(MESSAGE_1).block(); + messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block(); + + testee.fixMessageInconsistencies(context).block(); + + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedImapUidEntries(1) + .processedMessageIdEntries(1) + .updatedMessageIdEntries(1) + .addFixedInconsistencies(MESSAGE_1.getComposedMessageId()) + .build()); + } + + @Test + void fixMailboxInconsistenciesShouldUpdateContextWhenInconsistentFlags() { + Context context = new Context(); + + imapUidDAO.insert(MESSAGE_1).block(); + messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block(); + + testee.fixMessageInconsistencies(context).block(); + + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedImapUidEntries(1) + .processedMessageIdEntries(1) + .updatedMessageIdEntries(1) + .addFixedInconsistencies(MESSAGE_1.getComposedMessageId()) + .build()); + } + + @Test + void fixMailboxInconsistenciesShouldUpdateContextWhenOrphanMessageIdMessage() { + Context context = new Context(); + + messageIdDAO.insert(MESSAGE_1).block(); + + testee.fixMessageInconsistencies(context).block(); + + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedMessageIdEntries(1) + .removedMessageIdEntries(1) + .addFixedInconsistencies(MESSAGE_1.getComposedMessageId()) + .build()); + } + + @Test + void fixMailboxInconsistenciesShouldUpdateContextWhenDeleteError(CassandraCluster cassandra) { + Context context = new Context(); + + messageIdDAO.insert(MESSAGE_1).block(); + + cassandra.getConf() + .registerScenario(fail() + .times(1) + .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;")); + + testee.fixMessageInconsistencies(context).block(); + + assertThat(context.snapshot()) + .isEqualTo(Context.Snapshot.builder() + .processedMessageIdEntries(1) + .errors(MESSAGE_1.getComposedMessageId()) + .build()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org