This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit af594f294a6ed1167d91817177a43605dac24810
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu May 7 11:26:35 2020 +0700

    JAMES-3143 SolveMessageInconsistenciesService: Delay confirmation read
    
    We don't need to perform the confirmation if there is no inconsistencies,
    enhencing run performances
---
 .../task/SolveMessageInconsistenciesService.java   | 58 +++++++++++++++-------
 1 file changed, 41 insertions(+), 17 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
index f0ec09b..a052681 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
 
+import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
@@ -395,8 +396,8 @@ public class SolveMessageInconsistenciesService {
 
     Mono<Task.Result> fixMessageInconsistencies(Context context) {
         return Flux.concat(
-            fixInconsistenciesInMessageId(context),
-            fixInconsistenciesInImapUid(context))
+                fixInconsistenciesInMessageId(context),
+                fixInconsistenciesInImapUid(context))
             .reduce(Task.Result.COMPLETED, Task::combine);
     }
 
@@ -408,22 +409,40 @@ public class SolveMessageInconsistenciesService {
     }
 
     private Mono<Inconsistency> 
detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) {
-        return messageIdToImapUidDAO.retrieve((CassandraMessageId) 
message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) 
message.getComposedMessageId().getMailboxId()))
-            .next()
-            .flatMap(this::compareWithMessageIdRecord)
+        return compareWithMessageIdRecord(message)
             .onErrorResume(error -> Mono.just(new 
FailedToRetrieveRecord(message)));
     }
 
-    private Mono<Inconsistency> 
compareWithMessageIdRecord(ComposedMessageIdWithMetaData 
upToDateMessageFromImapUid) {
-        return messageIdDAO.retrieve((CassandraId) 
upToDateMessageFromImapUid.getComposedMessageId().getMailboxId(), 
upToDateMessageFromImapUid.getComposedMessageId().getUid())
+    private Mono<Inconsistency> 
compareWithMessageIdRecord(ComposedMessageIdWithMetaData messageFromImapUid) {
+        CassandraId mailboxId = (CassandraId) 
messageFromImapUid.getComposedMessageId().getMailboxId();
+        MessageUid uid = messageFromImapUid.getComposedMessageId().getUid();
+        CassandraMessageId messageId = (CassandraMessageId) 
messageFromImapUid.getComposedMessageId().getMessageId();
+
+        return messageIdDAO.retrieve(mailboxId, uid)
             .handle(publishIfPresent())
-            .map(messageIdRecord -> {
-                if (messageIdRecord.equals(upToDateMessageFromImapUid)) {
-                    return NO_INCONSISTENCY;
+            .flatMap(messageIdRecord -> {
+                if (messageIdRecord.equals(messageFromImapUid)) {
+                    return Mono.just(NO_INCONSISTENCY);
                 }
-                return new OutdatedMessageIdEntry(messageIdRecord, 
upToDateMessageFromImapUid);
+                return detectOutdatedMessageIdEntry(mailboxId, messageId, 
messageIdRecord);
             })
-            .switchIfEmpty(Mono.just(new 
OrphanImapUidEntry(upToDateMessageFromImapUid)));
+            .switchIfEmpty(
+                detectOrphanImapUidEntry(messageFromImapUid, mailboxId, 
messageId));
+    }
+
+    private Mono<Inconsistency> detectOutdatedMessageIdEntry(CassandraId 
mailboxId, CassandraMessageId messageId, ComposedMessageIdWithMetaData 
messageIdRecord) {
+        return messageIdToImapUidDAO.retrieve(messageId, 
Optional.of(mailboxId))
+            .filter(upToDateMessageFromImapUid -> 
!upToDateMessageFromImapUid.equals(messageIdRecord))
+            .<Inconsistency>map(upToDateMessageFromImapUid -> new 
OutdatedMessageIdEntry(messageIdRecord, upToDateMessageFromImapUid))
+            .next()
+            .switchIfEmpty(Mono.just(NO_INCONSISTENCY));
+    }
+
+    private Mono<Inconsistency> 
detectOrphanImapUidEntry(ComposedMessageIdWithMetaData messageFromImapUid, 
CassandraId mailboxId, CassandraMessageId messageId) {
+        return messageIdToImapUidDAO.retrieve(messageId, 
Optional.of(mailboxId))
+            .next()
+            .<Inconsistency>map(OrphanImapUidEntry::new)
+            .switchIfEmpty(Mono.just(NO_INCONSISTENCY));
     }
 
     private Flux<Task.Result> fixInconsistenciesInMessageId(Context context) {
@@ -434,12 +453,17 @@ public class SolveMessageInconsistenciesService {
     }
 
     private Mono<Inconsistency> 
detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) {
+        return messageIdToImapUidDAO.retrieve((CassandraMessageId) 
message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) 
message.getComposedMessageId().getMailboxId()))
+            .map(uidRecord -> NO_INCONSISTENCY)
+            .next()
+            .switchIfEmpty(detectOrphanMessageIdEntry(message))
+            .onErrorResume(error -> Mono.just(new 
FailedToRetrieveRecord(message)));
+    }
+
+    private Mono<Inconsistency> 
detectOrphanMessageIdEntry(ComposedMessageIdWithMetaData message) {
         return messageIdDAO.retrieve((CassandraId) 
message.getComposedMessageId().getMailboxId(), 
message.getComposedMessageId().getUid())
             .handle(publishIfPresent())
-            .flatMap(upToDateMessage -> 
messageIdToImapUidDAO.retrieve((CassandraMessageId) 
message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) 
message.getComposedMessageId().getMailboxId()))
-                .map(uidRecord -> NO_INCONSISTENCY)
-                .switchIfEmpty(Mono.just(new OrphanMessageIdEntry(message)))
-                .next())
-            .onErrorResume(error -> Mono.just(new 
FailedToRetrieveRecord(message)));
+            .<Inconsistency>map(OrphanMessageIdEntry::new)
+            .switchIfEmpty(Mono.just(NO_INCONSISTENCY));
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to