JAMES-2589 Solve concurency problem in vacation mailet on top of RabbitMQ mail 
queue

Now sending a mail requires some cassandra joins.

A join was called from within the future chain every time a notification was 
sent, leading to
Cassandra driver error.

Making the VacationMailet code synchronous solved the issue.

(Another alternative would be to "futurify" the MailQueue API + the 
MailetContext API but for sure it is more expensive.


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/67a55c04
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/67a55c04
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/67a55c04

Branch: refs/heads/master
Commit: 67a55c041baad3e80206c4c5a5d42c55fd4f5ebd
Parents: 2cc544c
Author: Benoit Tellier <[email protected]>
Authored: Thu Nov 15 19:54:35 2018 +0700
Committer: Benoit Tellier <[email protected]>
Committed: Wed Nov 21 16:48:26 2018 +0700

----------------------------------------------------------------------
 .../james/jmap/mailet/VacationMailet.java       | 27 ++++++++++----------
 1 file changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/67a55c04/server/protocols/jmap/src/main/java/org/apache/james/jmap/mailet/VacationMailet.java
----------------------------------------------------------------------
diff --git 
a/server/protocols/jmap/src/main/java/org/apache/james/jmap/mailet/VacationMailet.java
 
b/server/protocols/jmap/src/main/java/org/apache/james/jmap/mailet/VacationMailet.java
index 7a291b2..a692d08 100644
--- 
a/server/protocols/jmap/src/main/java/org/apache/james/jmap/mailet/VacationMailet.java
+++ 
b/server/protocols/jmap/src/main/java/org/apache/james/jmap/mailet/VacationMailet.java
@@ -70,16 +70,14 @@ public class VacationMailet extends GenericMailet {
             if (! automaticallySentMailDetector.isAutomaticallySent(mail)) {
                 ZonedDateTime processingDate = zonedDateTimeProvider.get();
                 mail.getRecipients()
-                    .stream()
-                    .map(mailAddress -> manageVacation(mailAddress, mail, 
processingDate))
-                    .forEach(CompletableFuture::join);
+                    .forEach(mailAddress -> manageVacation(mailAddress, mail, 
processingDate));
             }
         } catch (Throwable e) {
             LOGGER.warn("Can not process vacation for one or more recipients 
in {}", mail.getRecipients(), e);
         }
     }
 
-    private CompletableFuture<Void> manageVacation(MailAddress recipient, Mail 
processedMail, ZonedDateTime processingDate) {
+    private void manageVacation(MailAddress recipient, Mail processedMail, 
ZonedDateTime processingDate) {
         AccountId accountId = AccountId.fromString(recipient.toString());
 
         CompletableFuture<Vacation> vacationFuture = 
vacationRepository.retrieveVacation(accountId);
@@ -87,15 +85,16 @@ public class VacationMailet extends GenericMailet {
             AccountId.fromString(recipient.toString()),
             RecipientId.fromMailAddress(processedMail.getMaybeSender().get()));
 
-        return vacationFuture.thenCombine(alreadySentFuture, Pair::of)
-            .thenCompose(pair -> sendNotificationIfRequired(recipient, 
processedMail, processingDate, pair.getKey(), pair.getValue()));
+        Pair<Vacation, Boolean> pair = 
vacationFuture.thenCombine(alreadySentFuture, Pair::of)
+            .join();
+
+        sendNotificationIfRequired(recipient, processedMail, processingDate, 
pair.getKey(), pair.getValue());
     }
 
-    private CompletableFuture<Void> sendNotificationIfRequired(MailAddress 
recipient, Mail processedMail, ZonedDateTime processingDate, Vacation vacation, 
Boolean alreadySent) {
+    private void sendNotificationIfRequired(MailAddress recipient, Mail 
processedMail, ZonedDateTime processingDate, Vacation vacation, Boolean 
alreadySent) {
         if (shouldSendNotification(vacation, processingDate, alreadySent)) {
-            return sendNotification(recipient, processedMail, vacation);
+            sendNotification(recipient, processedMail, vacation);
         }
-        return CompletableFuture.completedFuture(null);
     }
 
     private boolean shouldSendNotification(Vacation vacation, ZonedDateTime 
processingDate, boolean alreadySent) {
@@ -103,19 +102,21 @@ public class VacationMailet extends GenericMailet {
             && ! alreadySent;
     }
 
-    private CompletableFuture<Void> sendNotification(MailAddress recipient, 
Mail processedMail, Vacation vacation) {
+    private void sendNotification(MailAddress recipient, Mail processedMail, 
Vacation vacation) {
         try {
             VacationReply vacationReply = VacationReply.builder(processedMail)
                 .receivedMailRecipient(recipient)
                 .vacation(vacation)
                 .build(mimeMessageBodyGenerator);
+
             sendNotification(vacationReply);
-            return 
notificationRegistry.register(AccountId.fromString(recipient.toString()),
+
+            
notificationRegistry.register(AccountId.fromString(recipient.toString()),
                 
RecipientId.fromMailAddress(processedMail.getMaybeSender().get()),
-                vacation.getToDate());
+                vacation.getToDate())
+                .join();
         } catch (MessagingException e) {
             LOGGER.warn("Failed to send JMAP vacation notification from {} to 
{}", recipient, processedMail.getMaybeSender(), e);
-            return CompletableFuture.completedFuture(null);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to