JAMES-2589 Solve concurency problem in vacation mailet on top of RabbitMQ mail queue
Now sending a mail requires some cassandra joins. A join was called from within the future chain every time a notification was sent, leading to Cassandra driver error. Making the VacationMailet code synchronous solved the issue. (Another alternative would be to "futurify" the MailQueue API + the MailetContext API but for sure it is more expensive. Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/67a55c04 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/67a55c04 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/67a55c04 Branch: refs/heads/master Commit: 67a55c041baad3e80206c4c5a5d42c55fd4f5ebd Parents: 2cc544c Author: Benoit Tellier <[email protected]> Authored: Thu Nov 15 19:54:35 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Wed Nov 21 16:48:26 2018 +0700 ---------------------------------------------------------------------- .../james/jmap/mailet/VacationMailet.java | 27 ++++++++++---------- 1 file changed, 14 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/67a55c04/server/protocols/jmap/src/main/java/org/apache/james/jmap/mailet/VacationMailet.java ---------------------------------------------------------------------- diff --git a/server/protocols/jmap/src/main/java/org/apache/james/jmap/mailet/VacationMailet.java b/server/protocols/jmap/src/main/java/org/apache/james/jmap/mailet/VacationMailet.java index 7a291b2..a692d08 100644 --- a/server/protocols/jmap/src/main/java/org/apache/james/jmap/mailet/VacationMailet.java +++ b/server/protocols/jmap/src/main/java/org/apache/james/jmap/mailet/VacationMailet.java @@ -70,16 +70,14 @@ public class VacationMailet extends GenericMailet { if (! automaticallySentMailDetector.isAutomaticallySent(mail)) { ZonedDateTime processingDate = zonedDateTimeProvider.get(); mail.getRecipients() - .stream() - .map(mailAddress -> manageVacation(mailAddress, mail, processingDate)) - .forEach(CompletableFuture::join); + .forEach(mailAddress -> manageVacation(mailAddress, mail, processingDate)); } } catch (Throwable e) { LOGGER.warn("Can not process vacation for one or more recipients in {}", mail.getRecipients(), e); } } - private CompletableFuture<Void> manageVacation(MailAddress recipient, Mail processedMail, ZonedDateTime processingDate) { + private void manageVacation(MailAddress recipient, Mail processedMail, ZonedDateTime processingDate) { AccountId accountId = AccountId.fromString(recipient.toString()); CompletableFuture<Vacation> vacationFuture = vacationRepository.retrieveVacation(accountId); @@ -87,15 +85,16 @@ public class VacationMailet extends GenericMailet { AccountId.fromString(recipient.toString()), RecipientId.fromMailAddress(processedMail.getMaybeSender().get())); - return vacationFuture.thenCombine(alreadySentFuture, Pair::of) - .thenCompose(pair -> sendNotificationIfRequired(recipient, processedMail, processingDate, pair.getKey(), pair.getValue())); + Pair<Vacation, Boolean> pair = vacationFuture.thenCombine(alreadySentFuture, Pair::of) + .join(); + + sendNotificationIfRequired(recipient, processedMail, processingDate, pair.getKey(), pair.getValue()); } - private CompletableFuture<Void> sendNotificationIfRequired(MailAddress recipient, Mail processedMail, ZonedDateTime processingDate, Vacation vacation, Boolean alreadySent) { + private void sendNotificationIfRequired(MailAddress recipient, Mail processedMail, ZonedDateTime processingDate, Vacation vacation, Boolean alreadySent) { if (shouldSendNotification(vacation, processingDate, alreadySent)) { - return sendNotification(recipient, processedMail, vacation); + sendNotification(recipient, processedMail, vacation); } - return CompletableFuture.completedFuture(null); } private boolean shouldSendNotification(Vacation vacation, ZonedDateTime processingDate, boolean alreadySent) { @@ -103,19 +102,21 @@ public class VacationMailet extends GenericMailet { && ! alreadySent; } - private CompletableFuture<Void> sendNotification(MailAddress recipient, Mail processedMail, Vacation vacation) { + private void sendNotification(MailAddress recipient, Mail processedMail, Vacation vacation) { try { VacationReply vacationReply = VacationReply.builder(processedMail) .receivedMailRecipient(recipient) .vacation(vacation) .build(mimeMessageBodyGenerator); + sendNotification(vacationReply); - return notificationRegistry.register(AccountId.fromString(recipient.toString()), + + notificationRegistry.register(AccountId.fromString(recipient.toString()), RecipientId.fromMailAddress(processedMail.getMaybeSender().get()), - vacation.getToDate()); + vacation.getToDate()) + .join(); } catch (MessagingException e) { LOGGER.warn("Failed to send JMAP vacation notification from {} to {}", recipient, processedMail.getMaybeSender(), e); - return CompletableFuture.completedFuture(null); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
