This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0b01bc71a9b88ee123e64088baaa33941bfe7a84 Author: LanKhuat <khuatdang...@gmail.com> AuthorDate: Mon Jun 8 12:07:44 2020 +0700 JAMES-3201 Error handling using functionaljava Either --- mailbox/tools/indexer/pom.xml | 5 + .../mailbox/tools/indexer/ReIndexerPerformer.java | 162 ++++++++++++++------- .../james/webadmin/routes/MailboxesRoutesTest.java | 11 +- .../webadmin/routes/UserMailboxesRoutesTest.java | 2 +- 4 files changed, 119 insertions(+), 61 deletions(-) diff --git a/mailbox/tools/indexer/pom.xml b/mailbox/tools/indexer/pom.xml index 528c4b1..e572be5 100644 --- a/mailbox/tools/indexer/pom.xml +++ b/mailbox/tools/indexer/pom.xml @@ -124,6 +124,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.functionaljava</groupId> + <artifactId>functionaljava-java8</artifactId> + <version>4.8.1</version> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <scope>test</scope> diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java index 9dd74cb..f9e2c5e 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java @@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableList; +import fj.data.Either; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -78,6 +79,50 @@ public class ReIndexerPerformer { } } + private interface Failure { + void recordFailure(ReprocessingContext context); + } + + private static class MailboxFailure implements Failure { + private final MailboxId mailboxId; + + private MailboxFailure(MailboxId mailboxId) { + this.mailboxId = mailboxId; + } + + public MailboxId getMailboxId() { + return mailboxId; + } + + @Override + public void recordFailure(ReprocessingContext context) { + context.recordMailboxFailure(mailboxId); + } + } + + private static class MessageFailure implements Failure { + private final MailboxId mailboxId; + private final MessageUid uid; + + private MessageFailure(MailboxId mailboxId, MessageUid uid) { + this.mailboxId = mailboxId; + this.uid = uid; + } + + public MailboxId getMailboxId() { + return mailboxId; + } + + public MessageUid getUid() { + return uid; + } + + @Override + public void recordFailure(ReprocessingContext context) { + context.recordFailureDetailsForMessage(mailboxId, uid); + } + } + private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class); private static final int SINGLE_MESSAGE = 1; @@ -101,8 +146,8 @@ public class ReIndexerPerformer { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); LOGGER.info("Starting a full reindex"); - Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list() - .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext)); + Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list() + .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession)); return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext) .doFinally(any -> LOGGER.info("Full reindex finished")); @@ -111,9 +156,9 @@ public class ReIndexerPerformer { Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReprocessingContext reprocessingContext, RunningOptions runningOptions) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); - Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) + Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) .findMailboxByIdReactive(mailboxId) - .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext)); + .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession)); return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext); } @@ -126,8 +171,8 @@ public class ReIndexerPerformer { MailboxQuery mailboxQuery = MailboxQuery.privateMailboxesBuilder(mailboxSession).build(); try { - Flux<ReIndexingEntry> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound()) - .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext)); + Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound()) + .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession)); return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext) .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString())); @@ -143,7 +188,8 @@ public class ReIndexerPerformer { return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) .findMailboxByIdReactive(mailboxId) .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, uid) - .flatMap(message -> reIndexMessage(mailboxSession, mailbox, reprocessingContext, message))) + .map(message -> Either.<Failure, ReIndexingEntry>right(new ReIndexingEntry(mailbox, mailboxSession, message))) + .flatMap(entryOrFailure -> reIndexMessage(entryOrFailure, reprocessingContext))) .switchIfEmpty(Mono.just(Result.COMPLETED)); } @@ -165,24 +211,18 @@ public class ReIndexerPerformer { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession); - return Flux.merge( - reIndexMessages(Flux.fromIterable(previousReIndexingFailures.messageFailures()) - .flatMap(failure -> createReindexingEntryFromFailure(failure, reprocessingContext)), - runningOptions, - reprocessingContext), - reIndexMailboxes(Flux.fromIterable(previousReIndexingFailures.mailboxFailures()) - .flatMap(mapper::findMailboxByIdReactive), - mailboxSession, - reprocessingContext, - runningOptions)) - .reduce(Task::combine); - } - - private Mono<Result> reIndexMailboxes(Flux<Mailbox> mailboxes, MailboxSession session, ReprocessingContext reprocessingContext, RunningOptions runningOptions) { - Flux<ReIndexingEntry> entries = mailboxes - .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, session, reprocessingContext)); + Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = Flux.merge( + Flux.fromIterable(previousReIndexingFailures.messageFailures()) + .flatMap(this::createReindexingEntryFromFailure), + Flux.fromIterable(previousReIndexingFailures.mailboxFailures()) + .flatMap(mailboxId -> mapper.findMailboxByIdReactive(mailboxId) + .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession)) + .onErrorResume(e -> { + LOGGER.warn("Failed to re-index {}", mailboxId, e); + return Mono.just(Either.left(new MailboxFailure(mailboxId))); + }))); - return reIndexMessages(entries, runningOptions, reprocessingContext); + return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext); } private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) { @@ -202,64 +242,78 @@ public class ReIndexerPerformer { .next(); } - private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure, ReprocessingContext reprocessingContext) { + private Mono<Either<Failure, ReIndexingEntry>> createReindexingEntryFromFailure(ReIndexingFailure previousFailure) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) - .findMailboxByIdReactive(failure.getMailboxId()) - .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, failure.getUid()) - .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message))) + .findMailboxByIdReactive(previousFailure.getMailboxId()) + .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, previousFailure.getUid()) + .map(message -> Either.<Failure, ReIndexingEntry>right(new ReIndexingEntry(mailbox, mailboxSession, message)))) .onErrorResume(e -> { - LOGGER.warn("ReIndexing failed for {}", failure, e); - reprocessingContext.recordFailureDetailsForMessage(failure.getMailboxId(), failure.getUid()); - return Mono.empty(); + LOGGER.warn("ReIndexing failed for {}", previousFailure, e); + return Mono.just(Either.left(new MessageFailure(previousFailure.getMailboxId(), previousFailure.getUid()))); }); } - private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext) { + private Flux<Either<Failure, ReIndexingEntry>> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession) { MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(mailboxSession); return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId()) .thenMany(messageMapper.listAllMessageUids(mailbox)) - .flatMap(uid -> reIndexingEntryForUid(mailbox, mailboxSession, reprocessingContext, messageMapper, uid)) + .flatMap(uid -> reIndexingEntryForUid(mailbox, mailboxSession, messageMapper, uid)) .onErrorResume(e -> { LOGGER.warn("ReIndexing failed for {}", mailbox.generateAssociatedPath(), e); - reprocessingContext.recordMailboxFailure(mailbox.getMailboxId()); - return Mono.empty(); + return Mono.just(Either.left(new MailboxFailure(mailbox.getMailboxId()))); }); } - private Flux<ReIndexingEntry> reIndexingEntryForUid(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext, MessageMapper messageMapper, MessageUid uid) { + private Flux<Either<Failure, ReIndexingEntry>> reIndexingEntryForUid(Mailbox mailbox, MailboxSession mailboxSession, MessageMapper messageMapper, MessageUid uid) { return messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED) - .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)) + .map(message -> Either.<Failure, ReIndexingEntry>right(new ReIndexingEntry(mailbox, mailboxSession, message))) .onErrorResume(e -> { - LOGGER.warn("ReIndexing failed for {} - {}", mailbox.getMailboxId(), uid, e); - reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid); - return Mono.empty(); + LOGGER.warn("ReIndexing failed for {} {}", mailbox.getMailboxId(), uid, e); + return Mono.just(Either.left(new MessageFailure(mailbox.getMailboxId(), uid))); }); } - private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) { - return ReactorUtils.Throttler.<ReIndexingEntry, Task.Result>forOperation(entry -> reIndexMessage(entry.getMailboxSession(), entry.getMailbox(), reprocessingContext, entry.getMessage())) + private Mono<Task.Result> reIndexMessages(Flux<Either<Failure, ReIndexingEntry>> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) { + return ReactorUtils.Throttler.<Either<Failure, ReIndexingEntry>, Task.Result>forOperation( + entry -> reIndexMessage(entry, reprocessingContext)) .window(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1)) .throttle(entriesToIndex) .reduce(Task::combine) - .switchIfEmpty(Mono.fromSupplier(() -> { - if (reprocessingContext.failures().mailboxFailures().isEmpty()) { - return Result.COMPLETED; - } + .switchIfEmpty(Mono.just(Result.COMPLETED)); + } + + private Mono<Task.Result> reIndexMessage(Either<Failure, ReIndexingEntry> entryOrFailure, ReprocessingContext reprocessingContext) { + return toMono(entryOrFailure.right().map(this::index)) + .map(this::flapMapRight) + .map(either -> recordIndexingResult(reprocessingContext, either)); + } + + private Result recordIndexingResult(ReprocessingContext reprocessingContext, Either<Failure, Result> either) { + return either.either( + failure -> { + failure.recordFailure(reprocessingContext); return Result.PARTIAL; - })); + }, + result -> result.onComplete(reprocessingContext::recordSuccess)); } - private Mono<Task.Result> reIndexMessage(MailboxSession mailboxSession, Mailbox mailbox, ReprocessingContext reprocessingContext, MailboxMessage message) { - return Mono.fromCallable(() -> messageSearchIndex.add(mailboxSession, mailbox, message)) - .doOnNext(any -> reprocessingContext.recordSuccess()) - .thenReturn(Result.COMPLETED) + private Mono<Either<Failure, Result>> index(ReIndexingEntry entry) { + return messageSearchIndex.add(entry.getMailboxSession(), entry.getMailbox(), entry.getMessage()) + .thenReturn(Either.<Failure, Result>right(Result.COMPLETED)) .onErrorResume(e -> { - LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), message.getUid(), e); - reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), message.getUid()); - return Mono.just(Result.PARTIAL); + LOGGER.warn("ReIndexing failed for {} {}", entry.getMailbox().generateAssociatedPath(), entry.getMessage().getUid(), e); + return Mono.just(Either.left(new MessageFailure(entry.getMailbox().getMailboxId(), entry.getMessage().getUid()))); }); } + + private <X, Y> Either<X, Y> flapMapRight(Either<X, Either<X, Y>> nestedEither) { + return nestedEither.right().bind(either -> either); + } + + private <X, Y> Mono<Either<X, Y>> toMono(Either<X, Mono<Y>> either) { + return either.either(x -> Mono.just(Either.left(x)), yMono -> yMono.map(Either::right)); + } } \ No newline at end of file diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java index 05a333c..c8c29cc 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java @@ -27,7 +27,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; @@ -213,7 +212,7 @@ class MailboxesRoutesTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession).getId(); - doThrow(new RuntimeException()) + doReturn(Mono.error(new RuntimeException())) .when(searchIndex) .add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class)); @@ -401,7 +400,7 @@ class MailboxesRoutesTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession).getId(); - doThrow(new RuntimeException()) + doReturn(Mono.error(new RuntimeException())) .when(searchIndex) .add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class)); @@ -743,7 +742,7 @@ class MailboxesRoutesTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession); - doThrow(new RuntimeException()).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class)); + doReturn(Mono.error(new RuntimeException())).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class)); String taskId = with() .post("/mailboxes?task=reIndex") @@ -787,7 +786,7 @@ class MailboxesRoutesTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession).getId(); - doThrow(new RuntimeException()) + doReturn(Mono.error(new RuntimeException())) .when(searchIndex) .add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class)); @@ -835,7 +834,7 @@ class MailboxesRoutesTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession).getId(); - doThrow(new RuntimeException()).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class)); + doReturn(Mono.error(new RuntimeException())).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class)); String taskId = with() .post("/mailboxes?task=reIndex") diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java index 6d0e299..5dd021b 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java @@ -1231,7 +1231,7 @@ class UserMailboxesRoutesTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession).getId(); - doThrow(new RuntimeException()) + doReturn(Mono.error(new RuntimeException())) .when(searchIndex) .add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class)); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org