This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0b01bc71a9b88ee123e64088baaa33941bfe7a84
Author: LanKhuat <khuatdang...@gmail.com>
AuthorDate: Mon Jun 8 12:07:44 2020 +0700

    JAMES-3201 Error handling using functionaljava Either
---
 mailbox/tools/indexer/pom.xml                      |   5 +
 .../mailbox/tools/indexer/ReIndexerPerformer.java  | 162 ++++++++++++++-------
 .../james/webadmin/routes/MailboxesRoutesTest.java |  11 +-
 .../webadmin/routes/UserMailboxesRoutesTest.java   |   2 +-
 4 files changed, 119 insertions(+), 61 deletions(-)

diff --git a/mailbox/tools/indexer/pom.xml b/mailbox/tools/indexer/pom.xml
index 528c4b1..e572be5 100644
--- a/mailbox/tools/indexer/pom.xml
+++ b/mailbox/tools/indexer/pom.xml
@@ -124,6 +124,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.functionaljava</groupId>
+            <artifactId>functionaljava-java8</artifactId>
+            <version>4.8.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
diff --git 
a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
 
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index 9dd74cb..f9e2c5e 100644
--- 
a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ 
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
 
+import fj.data.Either;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -78,6 +79,50 @@ public class ReIndexerPerformer {
         }
     }
 
+    private interface Failure {
+        void recordFailure(ReprocessingContext context);
+    }
+
+    private static class MailboxFailure implements Failure {
+        private final MailboxId mailboxId;
+
+        private MailboxFailure(MailboxId mailboxId) {
+            this.mailboxId = mailboxId;
+        }
+
+        public MailboxId getMailboxId() {
+            return mailboxId;
+        }
+
+        @Override
+        public void recordFailure(ReprocessingContext context) {
+            context.recordMailboxFailure(mailboxId);
+        }
+    }
+
+    private static class MessageFailure implements Failure {
+        private final MailboxId mailboxId;
+        private final MessageUid uid;
+
+        private MessageFailure(MailboxId mailboxId, MessageUid uid) {
+            this.mailboxId = mailboxId;
+            this.uid = uid;
+        }
+
+        public MailboxId getMailboxId() {
+            return mailboxId;
+        }
+
+        public MessageUid getUid() {
+            return uid;
+        }
+
+        @Override
+        public void recordFailure(ReprocessingContext context) {
+            context.recordFailureDetailsForMessage(mailboxId, uid);
+        }
+    }
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ReIndexerPerformer.class);
 
     private static final int SINGLE_MESSAGE = 1;
@@ -101,8 +146,8 @@ public class ReIndexerPerformer {
         MailboxSession mailboxSession = 
mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
         LOGGER.info("Starting a full reindex");
 
-        Flux<ReIndexingEntry> entriesToIndex = 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
-            .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, 
mailboxSession, reprocessingContext));
+        Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
+            .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, 
mailboxSession));
 
         return reIndexMessages(entriesToIndex, runningOptions, 
reprocessingContext)
             .doFinally(any -> LOGGER.info("Full reindex finished"));
@@ -111,9 +156,9 @@ public class ReIndexerPerformer {
     Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReprocessingContext 
reprocessingContext, RunningOptions runningOptions) {
         MailboxSession mailboxSession = 
mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
 
-        Flux<ReIndexingEntry> entriesToIndex = 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
+        Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
             .findMailboxByIdReactive(mailboxId)
-            .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, 
mailboxSession, reprocessingContext));
+            .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, 
mailboxSession));
 
         return reIndexMessages(entriesToIndex, runningOptions, 
reprocessingContext);
     }
@@ -126,8 +171,8 @@ public class ReIndexerPerformer {
         MailboxQuery mailboxQuery = 
MailboxQuery.privateMailboxesBuilder(mailboxSession).build();
 
         try {
-            Flux<ReIndexingEntry> entriesToIndex = 
mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound())
-                .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, 
mailboxSession, reprocessingContext));
+            Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = 
mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound())
+                .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, 
mailboxSession));
 
             return reIndexMessages(entriesToIndex, runningOptions, 
reprocessingContext)
                 .doFinally(any -> LOGGER.info("User {} reindex finished", 
username.asString()));
@@ -143,7 +188,8 @@ public class ReIndexerPerformer {
         return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
             .findMailboxByIdReactive(mailboxId)
             .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, uid)
-                .flatMap(message -> reIndexMessage(mailboxSession, mailbox, 
reprocessingContext, message)))
+                .map(message -> Either.<Failure, ReIndexingEntry>right(new 
ReIndexingEntry(mailbox, mailboxSession, message)))
+                .flatMap(entryOrFailure -> reIndexMessage(entryOrFailure, 
reprocessingContext)))
             .switchIfEmpty(Mono.just(Result.COMPLETED));
     }
 
@@ -165,24 +211,18 @@ public class ReIndexerPerformer {
         MailboxSession mailboxSession = 
mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
         MailboxMapper mapper = 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
 
-        return Flux.merge(
-            
reIndexMessages(Flux.fromIterable(previousReIndexingFailures.messageFailures())
-                    .flatMap(failure -> 
createReindexingEntryFromFailure(failure, reprocessingContext)),
-                runningOptions,
-                reprocessingContext),
-            
reIndexMailboxes(Flux.fromIterable(previousReIndexingFailures.mailboxFailures())
-                    .flatMap(mapper::findMailboxByIdReactive),
-                mailboxSession,
-                reprocessingContext,
-                runningOptions))
-            .reduce(Task::combine);
-    }
-
-    private Mono<Result> reIndexMailboxes(Flux<Mailbox> mailboxes, 
MailboxSession session, ReprocessingContext reprocessingContext, RunningOptions 
runningOptions) {
-        Flux<ReIndexingEntry> entries = mailboxes
-            .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, session, 
reprocessingContext));
+        Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = Flux.merge(
+            Flux.fromIterable(previousReIndexingFailures.messageFailures())
+                .flatMap(this::createReindexingEntryFromFailure),
+            Flux.fromIterable(previousReIndexingFailures.mailboxFailures())
+                .flatMap(mailboxId -> mapper.findMailboxByIdReactive(mailboxId)
+                    .flatMapMany(mailbox -> 
reIndexingEntriesForMailbox(mailbox, mailboxSession))
+                    .onErrorResume(e -> {
+                        LOGGER.warn("Failed to re-index {}", mailboxId, e);
+                        return Mono.just(Either.left(new 
MailboxFailure(mailboxId)));
+                    })));
 
-        return reIndexMessages(entries, runningOptions, reprocessingContext);
+        return reIndexMessages(entriesToIndex, runningOptions, 
reprocessingContext);
     }
 
     private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession 
session) {
@@ -202,64 +242,78 @@ public class ReIndexerPerformer {
             .next();
     }
 
-    private Mono<ReIndexingEntry> 
createReindexingEntryFromFailure(ReIndexingFailure failure, ReprocessingContext 
reprocessingContext) {
+    private Mono<Either<Failure, ReIndexingEntry>> 
createReindexingEntryFromFailure(ReIndexingFailure previousFailure) {
         MailboxSession mailboxSession = 
mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
 
         return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
-            .findMailboxByIdReactive(failure.getMailboxId())
-            .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, 
failure.getUid())
-                .map(message -> new ReIndexingEntry(mailbox, mailboxSession, 
message)))
+            .findMailboxByIdReactive(previousFailure.getMailboxId())
+            .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, 
previousFailure.getUid())
+                .map(message -> Either.<Failure, ReIndexingEntry>right(new 
ReIndexingEntry(mailbox, mailboxSession, message))))
             .onErrorResume(e -> {
-                LOGGER.warn("ReIndexing failed for {}", failure, e);
-                
reprocessingContext.recordFailureDetailsForMessage(failure.getMailboxId(), 
failure.getUid());
-                return Mono.empty();
+                LOGGER.warn("ReIndexing failed for {}", previousFailure, e);
+                return Mono.just(Either.left(new 
MessageFailure(previousFailure.getMailboxId(), previousFailure.getUid())));
             });
     }
 
-    private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, 
MailboxSession mailboxSession, ReprocessingContext reprocessingContext) {
+    private Flux<Either<Failure, ReIndexingEntry>> 
reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession) {
         MessageMapper messageMapper = 
mailboxSessionMapperFactory.getMessageMapper(mailboxSession);
 
         return messageSearchIndex.deleteAll(mailboxSession, 
mailbox.getMailboxId())
             .thenMany(messageMapper.listAllMessageUids(mailbox))
-            .flatMap(uid -> reIndexingEntryForUid(mailbox, mailboxSession, 
reprocessingContext, messageMapper, uid))
+            .flatMap(uid -> reIndexingEntryForUid(mailbox, mailboxSession, 
messageMapper, uid))
             .onErrorResume(e -> {
                 LOGGER.warn("ReIndexing failed for {}", 
mailbox.generateAssociatedPath(), e);
-                
reprocessingContext.recordMailboxFailure(mailbox.getMailboxId());
-                return Mono.empty();
+                return Mono.just(Either.left(new 
MailboxFailure(mailbox.getMailboxId())));
             });
     }
 
-    private Flux<ReIndexingEntry> reIndexingEntryForUid(Mailbox mailbox, 
MailboxSession mailboxSession, ReprocessingContext reprocessingContext, 
MessageMapper messageMapper, MessageUid uid) {
+    private Flux<Either<Failure, ReIndexingEntry>> 
reIndexingEntryForUid(Mailbox mailbox, MailboxSession mailboxSession, 
MessageMapper messageMapper, MessageUid uid) {
         return messageMapper.findInMailboxReactive(mailbox, 
MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED)
-            .map(message -> new ReIndexingEntry(mailbox, mailboxSession, 
message))
+            .map(message -> Either.<Failure, ReIndexingEntry>right(new 
ReIndexingEntry(mailbox, mailboxSession, message)))
             .onErrorResume(e -> {
-                LOGGER.warn("ReIndexing failed for {} - {}", 
mailbox.getMailboxId(), uid, e);
-                
reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
-                return Mono.empty();
+                LOGGER.warn("ReIndexing failed for {} {}", 
mailbox.getMailboxId(), uid, e);
+                return Mono.just(Either.left(new 
MessageFailure(mailbox.getMailboxId(), uid)));
             });
     }
 
-    private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> 
entriesToIndex, RunningOptions runningOptions, ReprocessingContext 
reprocessingContext) {
-        return ReactorUtils.Throttler.<ReIndexingEntry, 
Task.Result>forOperation(entry -> reIndexMessage(entry.getMailboxSession(), 
entry.getMailbox(), reprocessingContext, entry.getMessage()))
+    private Mono<Task.Result> reIndexMessages(Flux<Either<Failure, 
ReIndexingEntry>> entriesToIndex, RunningOptions runningOptions, 
ReprocessingContext reprocessingContext) {
+        return ReactorUtils.Throttler.<Either<Failure, ReIndexingEntry>, 
Task.Result>forOperation(
+                entry -> reIndexMessage(entry, reprocessingContext))
             .window(runningOptions.getMessagesPerSecond(), 
Duration.ofSeconds(1))
             .throttle(entriesToIndex)
             .reduce(Task::combine)
-            .switchIfEmpty(Mono.fromSupplier(() -> {
-                if 
(reprocessingContext.failures().mailboxFailures().isEmpty()) {
-                    return Result.COMPLETED;
-                }
+            .switchIfEmpty(Mono.just(Result.COMPLETED));
+    }
+
+    private Mono<Task.Result> reIndexMessage(Either<Failure, ReIndexingEntry> 
entryOrFailure, ReprocessingContext reprocessingContext) {
+        return toMono(entryOrFailure.right().map(this::index))
+            .map(this::flapMapRight)
+            .map(either -> recordIndexingResult(reprocessingContext, either));
+    }
+
+    private Result recordIndexingResult(ReprocessingContext 
reprocessingContext, Either<Failure, Result> either) {
+        return either.either(
+            failure -> {
+                failure.recordFailure(reprocessingContext);
                 return Result.PARTIAL;
-            }));
+            },
+            result -> result.onComplete(reprocessingContext::recordSuccess));
     }
 
-    private Mono<Task.Result> reIndexMessage(MailboxSession mailboxSession, 
Mailbox mailbox, ReprocessingContext reprocessingContext, MailboxMessage 
message) {
-        return Mono.fromCallable(() -> messageSearchIndex.add(mailboxSession, 
mailbox, message))
-            .doOnNext(any -> reprocessingContext.recordSuccess())
-            .thenReturn(Result.COMPLETED)
+    private Mono<Either<Failure, Result>> index(ReIndexingEntry entry) {
+        return messageSearchIndex.add(entry.getMailboxSession(), 
entry.getMailbox(), entry.getMessage())
+            .thenReturn(Either.<Failure, Result>right(Result.COMPLETED))
             .onErrorResume(e -> {
-                LOGGER.warn("ReIndexing failed for {} {}", 
mailbox.generateAssociatedPath(), message.getUid(), e);
-                
reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), 
message.getUid());
-                return Mono.just(Result.PARTIAL);
+                LOGGER.warn("ReIndexing failed for {} {}", 
entry.getMailbox().generateAssociatedPath(), entry.getMessage().getUid(), e);
+                return Mono.just(Either.left(new 
MessageFailure(entry.getMailbox().getMailboxId(), 
entry.getMessage().getUid())));
             });
     }
+
+    private <X, Y> Either<X, Y> flapMapRight(Either<X, Either<X, Y>> 
nestedEither) {
+        return nestedEither.right().bind(either -> either);
+    }
+
+    private <X, Y> Mono<Either<X, Y>> toMono(Either<X, Mono<Y>> either) {
+        return either.either(x -> Mono.just(Either.left(x)), yMono -> 
yMono.map(Either::right));
+    }
 }
\ No newline at end of file
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
index 05a333c..c8c29cc 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
@@ -27,7 +27,6 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
@@ -213,7 +212,7 @@ class MailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: 
value\r\n\r\nbody"),
                         systemSession).getId();
 
-                doThrow(new RuntimeException())
+                doReturn(Mono.error(new RuntimeException()))
                     .when(searchIndex)
                     .add(any(MailboxSession.class), any(Mailbox.class), 
any(MailboxMessage.class));
 
@@ -401,7 +400,7 @@ class MailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: 
value\r\n\r\nbody"),
                         systemSession).getId();
 
-                doThrow(new RuntimeException())
+                doReturn(Mono.error(new RuntimeException()))
                     .when(searchIndex)
                     .add(any(MailboxSession.class), any(Mailbox.class), 
any(MailboxMessage.class));
 
@@ -743,7 +742,7 @@ class MailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: 
value\r\n\r\nbody"),
                         systemSession);
 
-                doThrow(new 
RuntimeException()).when(searchIndex).add(any(MailboxSession.class), 
any(Mailbox.class), any(MailboxMessage.class));
+                doReturn(Mono.error(new 
RuntimeException())).when(searchIndex).add(any(MailboxSession.class), 
any(Mailbox.class), any(MailboxMessage.class));
 
                 String taskId = with()
                     .post("/mailboxes?task=reIndex")
@@ -787,7 +786,7 @@ class MailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: 
value\r\n\r\nbody"),
                         systemSession).getId();
 
-                doThrow(new RuntimeException())
+                doReturn(Mono.error(new RuntimeException()))
                     .when(searchIndex)
                     .add(any(MailboxSession.class), any(Mailbox.class), 
any(MailboxMessage.class));
 
@@ -835,7 +834,7 @@ class MailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: 
value\r\n\r\nbody"),
                         systemSession).getId();
 
-                doThrow(new 
RuntimeException()).when(searchIndex).add(any(MailboxSession.class), 
any(Mailbox.class), any(MailboxMessage.class));
+                doReturn(Mono.error(new 
RuntimeException())).when(searchIndex).add(any(MailboxSession.class), 
any(Mailbox.class), any(MailboxMessage.class));
 
                 String taskId = with()
                     .post("/mailboxes?task=reIndex")
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
index 6d0e299..5dd021b 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
@@ -1231,7 +1231,7 @@ class UserMailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: 
value\r\n\r\nbody"),
                         systemSession).getId();
 
-                doThrow(new RuntimeException())
+                doReturn(Mono.error(new RuntimeException()))
                     .when(searchIndex)
                     .add(any(MailboxSession.class), any(Mailbox.class), 
any(MailboxMessage.class));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to