Repository: james-project Updated Branches: refs/heads/master d496bb23a -> 09c9d34c6
MAILBOX-297 Staged flags updates Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6f2d4c79 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6f2d4c79 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6f2d4c79 Branch: refs/heads/master Commit: 6f2d4c79f2aaa4e0acb2b8b59e644eef7e11d37a Parents: 0535d4b Author: benwa <[email protected]> Authored: Mon May 22 11:53:22 2017 +0700 Committer: benwa <[email protected]> Committed: Mon May 29 16:58:49 2017 +0700 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMessageMapper.java | 176 +++++++++++-------- 1 file changed, 107 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/6f2d4c79/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 349cd08..7b711bd 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -25,25 +25,21 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import java.util.stream.Stream; import javax.mail.Flags; import javax.mail.Flags.Flag; import org.apache.commons.lang3.tuple.Pair; -import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; import org.apache.james.mailbox.FlagsBuilder; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.cassandra.CassandraId; import org.apache.james.mailbox.cassandra.CassandraMessageId; -import org.apache.james.mailbox.cassandra.mail.utils.MessageDeletedDuringFlagsUpdateException; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.MailboxCounters; -import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.MessageMetaData; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.model.UpdatedFlags; @@ -60,16 +56,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.steveash.guavate.Guavate; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; public class CassandraMessageMapper implements MessageMapper { - private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageMapper.class); public static final MailboxCounters INITIAL_COUNTERS = MailboxCounters.builder() .count(0L) .unseen(0L) .build(); public static final int EXPUNGE_BATCH_SIZE = 100; + public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageMapper.class); private final CassandraModSeqProvider modSeqProvider; private final MailboxSession mailboxSession; @@ -280,14 +275,51 @@ public class CassandraMessageMapper implements MessageMapper { @Override public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return messageIdDAO.retrieveMessages(mailboxId, set) - .join() - .flatMap(message -> updateFlagsOnMessage(mailbox, flagUpdateCalculator, message)) - .map((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags) - .thenApply(voidValue -> updatedFlags)) - .map(CompletableFuture::join) - .collect(Collectors.toList()) // This collect is here as we need to consume all the stream before returning result - .iterator(); + + return runUpdate(mailboxId, set, flagUpdateCalculator).iterator(); + } + + private List<UpdatedFlags> runUpdate(CassandraId mailboxId, MessageRange set, FlagsUpdateCalculator flagsUpdateCalculator) throws MailboxException { + Stream<ComposedMessageIdWithMetaData> toBeUpdated = messageIdDAO.retrieveMessages(mailboxId, set).join(); + + FlagsUpdateStageResult globalResult = runUpdateStage(mailboxId, toBeUpdated, flagsUpdateCalculator); + + int retryCount = 0; + + while (retryCount < maxRetries && !globalResult.getFailed().isEmpty()) { + retryCount++; + FlagsUpdateStageResult stageResult = runUpdateStage(mailboxId, + FluentFutureStream.of( + globalResult.getFailed().stream() + .map(uid -> messageIdDAO.retrieve(mailboxId, uid))) + .flatMap(OptionalConverter::toStream) + .completableFuture().join(), + flagsUpdateCalculator); + + globalResult = globalResult.keepSuccess().merge(stageResult); + } + + LOGGER.error("Can not update following UIDs {} for mailbox {}", globalResult.getFailed(), mailboxId.asUuid()); + + return globalResult.getSucceeded(); + } + + private FlagsUpdateStageResult runUpdateStage(CassandraId mailboxId, Stream<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) { + Long newModSeq = modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new RuntimeException("ModSeq generation failed")); + + FlagsUpdateStageResult result = toBeUpdated + .map(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator, + newModSeq, + oldMetadata)) + .reduce(FlagsUpdateStageResult::merge) + .orElse(none()); + + result.getSucceeded().stream() + .map((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags) + .thenApply(voidValue -> updatedFlags)) + .forEach(CompletableFuture::join); + + return result; } @Override @@ -329,41 +361,30 @@ public class CassandraMessageMapper implements MessageMapper { imapUidDAO.insert(composedMessageIdWithMetaData)); } - private Stream<UpdatedFlags> updateFlagsOnMessage(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { - return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, composedMessageIdWithMetaData) - .map(Stream::of) - .orElse(handleRetries(mailbox, flagUpdateCalculator, composedMessageIdWithMetaData)); - } - private Optional<UpdatedFlags> tryMessageFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, Mailbox mailbox, ComposedMessageIdWithMetaData oldMetaData) { - try { - long oldModSeq = oldMetaData.getModSeq(); - Flags oldFlags = oldMetaData.getFlags(); - Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags); + private FlagsUpdateStageResult tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData oldMetaData) { + Flags oldFlags = oldMetaData.getFlags(); + Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags); - boolean involveFlagsChanges = !identicalFlags(oldFlags, newFlags); - long newModSeq = generateNewModSeqIfNeeded(mailbox, oldModSeq, involveFlagsChanges); - - if (updateFlags(oldMetaData, newFlags, newModSeq)) { - return Optional.of(UpdatedFlags.builder() - .uid(oldMetaData.getComposedMessageId().getUid()) - .modSeq(newModSeq) - .oldFlags(oldFlags) - .newFlags(newFlags) - .build()); - } else { - return Optional.empty(); - } - } catch (MailboxException e) { - throw Throwables.propagate(e); + if (identicalFlags(oldFlags, newFlags)) { + return success(UpdatedFlags.builder() + .uid(oldMetaData.getComposedMessageId().getUid()) + .modSeq(oldMetaData.getModSeq()) + .oldFlags(oldFlags) + .newFlags(newFlags) + .build()); } - } - private long generateNewModSeqIfNeeded(Mailbox mailbox, long oldModSeq, boolean involveFlagsChanges) throws MailboxException { - if (involveFlagsChanges) { - return modSeqProvider.nextModSeq(mailboxSession, mailbox); + if (updateFlags(oldMetaData, newFlags, newModSeq)) { + return success(UpdatedFlags.builder() + .uid(oldMetaData.getComposedMessageId().getUid()) + .modSeq(newModSeq) + .oldFlags(oldFlags) + .newFlags(newFlags) + .build()); + } else { + return fail(oldMetaData.getComposedMessageId().getUid()); } - return oldModSeq; } private boolean identicalFlags(Flags oldFlags, Flags newFlags) { @@ -385,32 +406,49 @@ public class CassandraMessageMapper implements MessageMapper { .join(); } - private Stream<UpdatedFlags> handleRetries(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData oldMetaData) { - try { - return Stream.of( - new FunctionRunnerWithRetry(maxRetries) - .executeAndRetrieveObject(() -> retryMessageFlagsUpdate(mailbox, - oldMetaData.getComposedMessageId().getMessageId(), - flagUpdateCalculator))); - } catch (MessageDeletedDuringFlagsUpdateException e) { - mailboxSession.getLog().warn(e.getMessage()); - return Stream.of(); - } catch (MailboxDeleteDuringUpdateException e) { - LOGGER.info("Mailbox {} was deleted during flag update", mailbox.getMailboxId()); - return Stream.of(); - } catch (Exception e) { - throw Throwables.propagate(e); - } + private static FlagsUpdateStageResult success(UpdatedFlags updatedFlags) { + return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of(updatedFlags)); } - private Optional<UpdatedFlags> retryMessageFlagsUpdate(Mailbox mailbox, MessageId messageId, FlagsUpdateCalculator flagUpdateCalculator) { - CassandraId cassandraId = (CassandraId) mailbox.getMailboxId(); - ComposedMessageIdWithMetaData composedMessageIdWithMetaData = imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId)) - .join() - .findFirst() - .orElseThrow(MailboxDeleteDuringUpdateException::new); - return tryMessageFlagsUpdate(flagUpdateCalculator, - mailbox, - composedMessageIdWithMetaData); + private static FlagsUpdateStageResult fail(MessageUid uid) { + return new FlagsUpdateStageResult(ImmutableList.of(uid), ImmutableList.of()); + } + + private static FlagsUpdateStageResult none() { + return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of()); + } + + private static class FlagsUpdateStageResult { + private final List<MessageUid> failed; + private final List<UpdatedFlags> succeeded; + + public FlagsUpdateStageResult(List<MessageUid> failed, List<UpdatedFlags> succeeded) { + this.failed = failed; + this.succeeded = succeeded; + } + + public List<MessageUid> getFailed() { + return failed; + } + + public List<UpdatedFlags> getSucceeded() { + return succeeded; + } + + public FlagsUpdateStageResult merge(FlagsUpdateStageResult other) { + return new FlagsUpdateStageResult( + ImmutableList.<MessageUid>builder() + .addAll(this.failed) + .addAll(other.failed) + .build(), + ImmutableList.<UpdatedFlags>builder() + .addAll(this.succeeded) + .addAll(other.succeeded) + .build()); + } + + public FlagsUpdateStageResult keepSuccess() { + return new FlagsUpdateStageResult(ImmutableList.of(), succeeded); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
