This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d82a3aa4324c5f56fd14effbaa0c25230614395e Author: LanKhuat <khuatdang...@gmail.com> AuthorDate: Wed Jun 3 17:40:00 2020 +0700 JAMES-3201 Add details about mailbox failures for indexing tasks --- .../indexer/ReIndexingExecutionFailures.java | 30 ++- .../tools/indexer/ErrorRecoveryIndexationTask.java | 14 +- .../indexer/ErrorRecoveryIndexationTaskDTO.java | 2 +- .../mailbox/tools/indexer/ReIndexerPerformer.java | 68 ++++-- .../mailbox/tools/indexer/ReprocessingContext.java | 8 +- .../indexer/ReprocessingContextInformationDTO.java | 2 +- .../SerializableReIndexingExecutionFailures.java | 2 +- .../tools/indexer/CassandraReIndexerImplTest.java | 243 ++++++++++++++++++++- 8 files changed, 339 insertions(+), 30 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java index 429c9a4..97446cb 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java @@ -25,6 +25,7 @@ import java.util.Objects; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.model.MailboxId; +import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; public class ReIndexingExecutionFailures { @@ -61,16 +62,29 @@ public class ReIndexingExecutionFailures { return Objects.hash(mailboxId, uid); } + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("mailboxId", mailboxId) + .add("uid", uid) + .toString(); + } } - private final List<ReIndexingFailure> failures; + private final List<ReIndexingFailure> messageFailures; + private final List<MailboxId> mailboxFailures; - public ReIndexingExecutionFailures(List<ReIndexingFailure> failures) { - this.failures = failures; + public ReIndexingExecutionFailures(List<ReIndexingFailure> messageFailures, List<MailboxId> mailboxFailures) { + this.messageFailures = messageFailures; + this.mailboxFailures = mailboxFailures; } - public List<ReIndexingFailure> failures() { - return ImmutableList.copyOf(failures); + public List<ReIndexingFailure> messageFailures() { + return ImmutableList.copyOf(messageFailures); + } + + public List<MailboxId> mailboxFailures() { + return ImmutableList.copyOf(mailboxFailures); } @Override @@ -78,14 +92,14 @@ public class ReIndexingExecutionFailures { if (o instanceof ReIndexingExecutionFailures) { ReIndexingExecutionFailures that = (ReIndexingExecutionFailures) o; - return Objects.equals(this.failures, that.failures); + return Objects.equals(this.messageFailures, that.messageFailures) + && Objects.equals(this.mailboxFailures, that.mailboxFailures); } return false; } @Override public final int hashCode() { - return Objects.hash(failures); + return Objects.hash(messageFailures, mailboxFailures); } - } diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java index df20874..91dfbe4 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java @@ -49,8 +49,8 @@ public class ErrorRecoveryIndexationTask implements Task { this.mailboxIdFactory = mailboxIdFactory; } - private List<ReIndexingExecutionFailures.ReIndexingFailure> failuresFromDTO(List<ErrorRecoveryIndexationTaskDTO.ReindexingFailureDTO> failureDTOs) { - return failureDTOs + private List<ReIndexingExecutionFailures.ReIndexingFailure> messageFailuresFromDTO(List<ErrorRecoveryIndexationTaskDTO.ReindexingFailureDTO> messageFailures) { + return messageFailures .stream() .flatMap(dto -> dto.getUids() .stream() @@ -58,9 +58,17 @@ public class ErrorRecoveryIndexationTask implements Task { .collect(Guavate.toImmutableList()); } + private List<MailboxId> mailboxFailuresFromDTO(Optional<List<String>> mailboxFailures) { + return mailboxFailures.map(mailboxIdList -> + mailboxIdList.stream() + .map(mailboxIdFactory::fromString) + .collect(Guavate.toImmutableList())) + .orElse(ImmutableList.of()); + } + public ErrorRecoveryIndexationTask create(ErrorRecoveryIndexationTaskDTO dto) { return new ErrorRecoveryIndexationTask(reIndexerPerformer, - new ReIndexingExecutionFailures(failuresFromDTO(dto.getPreviousFailures())), + new ReIndexingExecutionFailures(messageFailuresFromDTO(dto.getPreviousFailures())), dto.getRunningOptions() .map(RunningOptionsDTO::toDomainObject) .orElse(RunningOptions.DEFAULT)); diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java index 1385686..2ce86ba 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java @@ -49,7 +49,7 @@ public class ErrorRecoveryIndexationTaskDTO implements TaskDTO { public static ErrorRecoveryIndexationTaskDTO of(ErrorRecoveryIndexationTask task, String type) { Multimap<MailboxId, ReIndexingExecutionFailures.ReIndexingFailure> failuresByMailboxId = task.getPreviousFailures() - .failures() + .messageFailures() .stream() .collect(Guavate.toImmutableListMultimap(ReIndexingExecutionFailures.ReIndexingFailure::getMailboxId, Function.identity())); diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java index e9b08d3..9dd74cb 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java @@ -102,7 +102,7 @@ public class ReIndexerPerformer { LOGGER.info("Starting a full reindex"); Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list() - .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession)); + .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext)); return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext) .doFinally(any -> LOGGER.info("Full reindex finished")); @@ -113,7 +113,7 @@ public class ReIndexerPerformer { Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) .findMailboxByIdReactive(mailboxId) - .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession)); + .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext)); return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext); } @@ -127,7 +127,7 @@ public class ReIndexerPerformer { try { Flux<ReIndexingEntry> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound()) - .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession)); + .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext)); return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext) .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString())); @@ -162,11 +162,27 @@ public class ReIndexerPerformer { } Mono<Result> reIndexErrors(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) { - return reIndexMessages( - Flux.fromIterable(previousReIndexingFailures.failures()) - .flatMap(this::createReindexingEntryFromFailure), - runningOptions, - reprocessingContext); + MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); + MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession); + + return Flux.merge( + reIndexMessages(Flux.fromIterable(previousReIndexingFailures.messageFailures()) + .flatMap(failure -> createReindexingEntryFromFailure(failure, reprocessingContext)), + runningOptions, + reprocessingContext), + reIndexMailboxes(Flux.fromIterable(previousReIndexingFailures.mailboxFailures()) + .flatMap(mapper::findMailboxByIdReactive), + mailboxSession, + reprocessingContext, + runningOptions)) + .reduce(Task::combine); + } + + private Mono<Result> reIndexMailboxes(Flux<Mailbox> mailboxes, MailboxSession session, ReprocessingContext reprocessingContext, RunningOptions runningOptions) { + Flux<ReIndexingEntry> entries = mailboxes + .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, session, reprocessingContext)); + + return reIndexMessages(entries, runningOptions, reprocessingContext); } private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) { @@ -186,22 +202,41 @@ public class ReIndexerPerformer { .next(); } - private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure) { + private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure, ReprocessingContext reprocessingContext) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) .findMailboxByIdReactive(failure.getMailboxId()) .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, failure.getUid()) - .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message))); + .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message))) + .onErrorResume(e -> { + LOGGER.warn("ReIndexing failed for {}", failure, e); + reprocessingContext.recordFailureDetailsForMessage(failure.getMailboxId(), failure.getUid()); + return Mono.empty(); + }); } - private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession) { + private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext) { MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(mailboxSession); return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId()) .thenMany(messageMapper.listAllMessageUids(mailbox)) - .flatMap(uid -> messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED)) - .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)); + .flatMap(uid -> reIndexingEntryForUid(mailbox, mailboxSession, reprocessingContext, messageMapper, uid)) + .onErrorResume(e -> { + LOGGER.warn("ReIndexing failed for {}", mailbox.generateAssociatedPath(), e); + reprocessingContext.recordMailboxFailure(mailbox.getMailboxId()); + return Mono.empty(); + }); + } + + private Flux<ReIndexingEntry> reIndexingEntryForUid(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext, MessageMapper messageMapper, MessageUid uid) { + return messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED) + .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)) + .onErrorResume(e -> { + LOGGER.warn("ReIndexing failed for {} - {}", mailbox.getMailboxId(), uid, e); + reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid); + return Mono.empty(); + }); } private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) { @@ -209,7 +244,12 @@ public class ReIndexerPerformer { .window(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1)) .throttle(entriesToIndex) .reduce(Task::combine) - .switchIfEmpty(Mono.just(Result.COMPLETED)); + .switchIfEmpty(Mono.fromSupplier(() -> { + if (reprocessingContext.failures().mailboxFailures().isEmpty()) { + return Result.COMPLETED; + } + return Result.PARTIAL; + })); } private Mono<Task.Result> reIndexMessage(MailboxSession mailboxSession, Mailbox mailbox, ReprocessingContext reprocessingContext, MailboxMessage message) { diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java index c0c3eb7..bc4facb 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java @@ -32,11 +32,13 @@ class ReprocessingContext { private final AtomicInteger successfullyReprocessedMails; private final AtomicInteger failedReprocessingMails; private final ConcurrentLinkedDeque<ReIndexingExecutionFailures.ReIndexingFailure> failures; + private final ConcurrentLinkedDeque<MailboxId> mailboxFailures; ReprocessingContext() { failedReprocessingMails = new AtomicInteger(0); successfullyReprocessedMails = new AtomicInteger(0); failures = new ConcurrentLinkedDeque<>(); + mailboxFailures = new ConcurrentLinkedDeque<>(); } void recordFailureDetailsForMessage(MailboxId mailboxId, MessageUid uid) { @@ -48,6 +50,10 @@ class ReprocessingContext { successfullyReprocessedMails.incrementAndGet(); } + void recordMailboxFailure(MailboxId mailboxId) { + mailboxFailures.add(mailboxId); + } + int successfullyReprocessedMailCount() { return successfullyReprocessedMails.get(); } @@ -57,6 +63,6 @@ class ReprocessingContext { } ReIndexingExecutionFailures failures() { - return new ReIndexingExecutionFailures(ImmutableList.copyOf(failures)); + return new ReIndexingExecutionFailures(ImmutableList.copyOf(failures), ImmutableList.copyOf(mailboxFailures)); } } diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java index 446b387..5bf65dc 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java @@ -169,7 +169,7 @@ public class ReprocessingContextInformationDTO implements AdditionalInformationD } static List<ReindexingFailureDTO> serializeFailures(ReIndexingExecutionFailures failures) { - ImmutableListMultimap<MailboxId, ReIndexingExecutionFailures.ReIndexingFailure> failuresByMailbox = failures.failures() + ImmutableListMultimap<MailboxId, ReIndexingExecutionFailures.ReIndexingFailure> failuresByMailbox = failures.messageFailures() .stream() .collect(Guavate.toImmutableListMultimap(ReIndexingExecutionFailures.ReIndexingFailure::getMailboxId)); diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java index 3b3515a..5c7c489 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java @@ -52,7 +52,7 @@ public class SerializableReIndexingExecutionFailures { public static SerializableReIndexingExecutionFailures from(ReIndexingExecutionFailures reIndexingExecutionFailures) { return new SerializableReIndexingExecutionFailures( - reIndexingExecutionFailures.failures() + reIndexingExecutionFailures.messageFailures() .stream() .map(failure -> new SerializableReIndexingExecutionFailures.SerializableReIndexingFailure(failure.getMailboxId(), failure.getUid())) .collect(Guavate.toImmutableList())); diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java index 55ad6ed..bdd950e 100644 --- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java +++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java @@ -19,6 +19,8 @@ package org.apache.mailbox.tools.indexer; +import static org.apache.james.backends.cassandra.Scenario.Builder.fail; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -34,10 +36,14 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.core.Username; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageManager; +import org.apache.james.mailbox.MessageManager.AppendCommand; +import org.apache.james.mailbox.MessageManager.AppendResult; import org.apache.james.mailbox.cassandra.CassandraMailboxManager; import org.apache.james.mailbox.cassandra.CassandraMailboxManagerProvider; import org.apache.james.mailbox.cassandra.mail.MailboxAggregateModule; import org.apache.james.mailbox.indexer.ReIndexer; +import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures; +import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures.ReIndexingFailure; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MailboxPath; @@ -45,10 +51,15 @@ import org.apache.james.mailbox.store.MailboxSessionMapperFactory; import org.apache.james.mailbox.store.PreDeletionHooks; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; +import org.apache.james.task.Task; import org.apache.james.util.concurrency.ConcurrentTestRunner; +import org.apache.mailbox.tools.indexer.ReprocessingContextInformationDTO.ReprocessingContextInformationForErrorRecoveryIndexationTask; +import org.apache.mailbox.tools.indexer.ReprocessingContextInformationDTO.ReprocessingContextInformationForFullReindexingTask; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; import com.google.common.base.Strings; @@ -90,7 +101,7 @@ public class CassandraReIndexerImplTest { ConcurrentTestRunner.builder() .operation((a, b) -> mailbox .appendMessage( - MessageManager.AppendCommand.builder().build(bigBody), + AppendCommand.builder().build(bigBody), systemSession)) .threadCount(threadCount) .operationCount(operationCount) @@ -105,4 +116,234 @@ public class CassandraReIndexerImplTest { .add(any(MailboxSession.class), any(Mailbox.class),any(MailboxMessage.class)); verifyNoMoreInteractions(messageSearchIndex); } + + @Nested + class FailureTesting { + @Test + void fullReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(USERNAME); + mailboxManager.createMailbox(INBOX, session); + + MessageManager mailbox = mailboxManager.getMailbox(INBOX, session); + mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session); + + cassandra.getConf() + .registerScenario(fail() + .forever() + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;")); + + Task task = reIndexer.reIndex(ReIndexer.RunningOptions.DEFAULT); + Task.Result result = task.run(); + + assertThat(result).isEqualTo(Task.Result.PARTIAL); + } + + @Test + void fullReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(USERNAME); + mailboxManager.createMailbox(INBOX, session); + + MessageManager mailbox = mailboxManager.getMailbox(INBOX, session); + mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session); + + cassandra.getConf() + .registerScenario(fail() + .forever() + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;")); + + Task task = reIndexer.reIndex(ReIndexer.RunningOptions.DEFAULT); + task.run(); + + ReprocessingContextInformationForFullReindexingTask information = (ReprocessingContextInformationForFullReindexingTask) task.details().get(); + assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId()); + } + + @Test + void singleMailboxReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(USERNAME); + mailboxManager.createMailbox(INBOX, session); + + MessageManager mailbox = mailboxManager.getMailbox(INBOX, session); + mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session); + + cassandra.getConf() + .registerScenario(fail() + .forever() + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;")); + + Task task = reIndexer.reIndex(mailbox.getId(), ReIndexer.RunningOptions.DEFAULT); + Task.Result result = task.run(); + + assertThat(result).isEqualTo(Task.Result.PARTIAL); + } + + @Test + void singleMailboxReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(USERNAME); + mailboxManager.createMailbox(INBOX, session); + + MessageManager mailbox = mailboxManager.getMailbox(INBOX, session); + mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session); + + cassandra.getConf() + .registerScenario(fail() + .forever() + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;")); + + Task task = reIndexer.reIndex(mailbox.getId(), ReIndexer.RunningOptions.DEFAULT); + task.run(); + + SingleMailboxReindexingTask.AdditionalInformation information = (SingleMailboxReindexingTask.AdditionalInformation) task.details().get(); + assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId()); + } + + @Test + void userMailboxReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(USERNAME); + mailboxManager.createMailbox(INBOX, session); + + MessageManager mailbox = mailboxManager.getMailbox(INBOX, session); + mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session); + + cassandra.getConf() + .registerScenario(fail() + .forever() + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;")); + + Task task = reIndexer.reIndex(USERNAME, ReIndexer.RunningOptions.DEFAULT); + Task.Result result = task.run(); + + assertThat(result).isEqualTo(Task.Result.PARTIAL); + } + + @Test + void userMailboxReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(USERNAME); + mailboxManager.createMailbox(INBOX, session); + + MessageManager mailbox = mailboxManager.getMailbox(INBOX, session); + mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session); + + cassandra.getConf() + .registerScenario(fail() + .forever() + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;")); + + Task task = reIndexer.reIndex(USERNAME, ReIndexer.RunningOptions.DEFAULT); + task.run(); + + UserReindexingTask.AdditionalInformation information = (UserReindexingTask.AdditionalInformation) task.details().get(); + assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId()); + } + + @Test + void errorReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(USERNAME); + mailboxManager.createMailbox(INBOX, session); + + MessageManager mailbox = mailboxManager.getMailbox(INBOX, session); + AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session); + + cassandra.getConf() + .registerScenario(fail() + .forever() + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;")); + + Task task = reIndexer.reIndex(new ReIndexingExecutionFailures( + ImmutableList.of(new ReIndexingFailure(mailbox.getId(), + appendResult.getId().getUid())), + ImmutableList.of(mailbox.getId())), + ReIndexer.RunningOptions.DEFAULT); + Task.Result result = task.run(); + + assertThat(result).isEqualTo(Task.Result.PARTIAL); + } + + @Test + void errorReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(USERNAME); + mailboxManager.createMailbox(INBOX, session); + + MessageManager mailbox = mailboxManager.getMailbox(INBOX, session); + AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session); + + cassandra.getConf() + .registerScenario(fail() + .forever() + .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;")); + + Task task = reIndexer.reIndex(new ReIndexingExecutionFailures( + ImmutableList.of(new ReIndexingFailure(mailbox.getId(), + appendResult.getId().getUid())), + ImmutableList.of(mailbox.getId())), + ReIndexer.RunningOptions.DEFAULT); + task.run(); + + ReprocessingContextInformationForErrorRecoveryIndexationTask information = (ReprocessingContextInformationForErrorRecoveryIndexationTask) task.details().get(); + assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId()); + } + + @Test + void errorReindexingShouldUpdateDetailsUponReadingMailboxError(CassandraCluster cassandra) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(USERNAME); + mailboxManager.createMailbox(INBOX, session); + + MessageManager mailbox = mailboxManager.getMailbox(INBOX, session); + AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session); + + cassandra.getConf() + .registerScenario(fail() + .forever() + .whenQueryStartsWith("SELECT id,mailboxbase,uidvalidity,name FROM mailbox WHERE id=:id;")); + + Task task = reIndexer.reIndex(new ReIndexingExecutionFailures( + ImmutableList.of(new ReIndexingFailure(mailbox.getId(), + appendResult.getId().getUid())), + ImmutableList.of()), + ReIndexer.RunningOptions.DEFAULT); + task.run(); + + ReprocessingContextInformationForErrorRecoveryIndexationTask information = (ReprocessingContextInformationForErrorRecoveryIndexationTask) task.details().get(); + assertThat(information.failures().messageFailures()).containsExactly(new ReIndexingFailure(mailbox.getId(), appendResult.getId().getUid())); + } + + @Test + void fullReindexingShouldUpdateDetailsUponSingleMessageFullReadError(CassandraCluster cassandra) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(USERNAME); + mailboxManager.createMailbox(INBOX, session); + + MessageManager mailbox = mailboxManager.getMailbox(INBOX, session); + AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session); + + cassandra.getConf() + .registerScenario(fail() + .forever() + .whenQueryStartsWith("SELECT messageId,internalDate,bodyStartOctet,fullContentOctets,bodyOctets,bodyContent,headerContent,textualLineCount,properties,attachments FROM messageV2 WHERE messageId=:messageId;")); + + Task task = reIndexer.reIndex(ReIndexer.RunningOptions.DEFAULT); + task.run(); + + ReprocessingContextInformationForFullReindexingTask information = (ReprocessingContextInformationForFullReindexingTask) task.details().get(); + assertThat(information.failures().messageFailures()).containsExactly(new ReIndexingFailure(mailbox.getId(), appendResult.getId().getUid())); + } + } + + @Test + void errorReindexingShouldReindexPreviouslyFailedMailbox() throws Exception { + MailboxSession session = mailboxManager.createSystemSession(USERNAME); + mailboxManager.createMailbox(INBOX, session); + + MessageManager mailbox = mailboxManager.getMailbox(INBOX, session); + mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session); + + Task task = reIndexer.reIndex(new ReIndexingExecutionFailures( + ImmutableList.of(), + ImmutableList.of(mailbox.getId())), + ReIndexer.RunningOptions.DEFAULT); + task.run(); + + verify(messageSearchIndex).deleteAll(any(MailboxSession.class), any(MailboxId.class)); + verify(messageSearchIndex, times(1)) + .add(any(MailboxSession.class), any(Mailbox.class),any(MailboxMessage.class)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org