MAILBOX-297 Parallel staged flags updates Also change MessageRange parameter of updateFlags from set to range in CassandraMessageMapper
Use a custom collector for chunker Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e82c8584 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e82c8584 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e82c8584 Branch: refs/heads/master Commit: e82c858481db7af8c15d9e383294480d06a964cf Parents: 6f2d4c7 Author: benwa <[email protected]> Authored: Mon May 22 15:01:42 2017 +0700 Committer: benwa <[email protected]> Committed: Mon May 29 17:02:46 2017 +0700 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMessageDAO.java | 4 +- .../cassandra/mail/CassandraMessageMapper.java | 164 ++++++++----------- .../mail/utils/FlagsUpdateStageResult.java | 92 +++++++++++ .../mail/utils/FlagsUpdateStageResultTest.java | 134 +++++++++++++++ .../james/util/CompletableFutureUtil.java | 6 +- .../apache/james/util/FluentFutureStream.java | 4 + .../james/util/streams/JamesCollectors.java | 59 +++++-- .../james/util/CompletableFutureUtilTest.java | 27 +++ .../james/util/streams/JamesCollectorsTest.java | 13 +- 9 files changed, 379 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index a6ba328..a094540 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -193,9 +193,7 @@ public class CassandraMessageDAO { public CompletableFuture<Stream<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Optional<Integer> limit) { return CompletableFutureUtil.chainAll( getLimitedIdStream(messageIds.stream().distinct(), limit) - .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ)) - .values() - .stream(), + .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ)), ids -> FluentFutureStream.of( ids.stream() .map(id -> retrieveRow(id, fetchType) http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 7b711bd..b959972 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.cassandra.mail; +import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -36,6 +37,7 @@ import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.cassandra.CassandraId; import org.apache.james.mailbox.cassandra.CassandraMessageId; +import org.apache.james.mailbox.cassandra.mail.utils.FlagsUpdateStageResult; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; @@ -64,6 +66,7 @@ public class CassandraMessageMapper implements MessageMapper { .unseen(0L) .build(); public static final int EXPUNGE_BATCH_SIZE = 100; + public static final int UPDATE_FLAGS_BATCH_SIZE = 20; public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageMapper.class); private final CassandraModSeqProvider modSeqProvider; @@ -216,13 +219,12 @@ public class CassandraMessageMapper implements MessageMapper { return deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange) .join() .collect(JamesCollectors.chunker(EXPUNGE_BATCH_SIZE)) - .values().stream() .map(uidChunk -> expungeUidChunk(mailboxId, uidChunk)) .flatMap(CompletableFuture::join) .collect(Guavate.toImmutableMap(MailboxMessage::getUid, SimpleMessageMetaData::new)); } - private CompletableFuture<Stream<SimpleMailboxMessage>> expungeUidChunk(CassandraId mailboxId, List<MessageUid> uidChunk) { + private CompletableFuture<Stream<SimpleMailboxMessage>> expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) { return FluentFutureStream.of(uidChunk.stream() .map(uid -> messageIdDAO.retrieve(mailboxId, uid))) .flatMap(OptionalConverter::toStream) @@ -273,53 +275,63 @@ public class CassandraMessageMapper implements MessageMapper { } @Override - public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws MailboxException { + public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange range) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return runUpdate(mailboxId, set, flagUpdateCalculator).iterator(); - } - - private List<UpdatedFlags> runUpdate(CassandraId mailboxId, MessageRange set, FlagsUpdateCalculator flagsUpdateCalculator) throws MailboxException { - Stream<ComposedMessageIdWithMetaData> toBeUpdated = messageIdDAO.retrieveMessages(mailboxId, set).join(); + Stream<ComposedMessageIdWithMetaData> toBeUpdated = messageIdDAO.retrieveMessages(mailboxId, range).join(); - FlagsUpdateStageResult globalResult = runUpdateStage(mailboxId, toBeUpdated, flagsUpdateCalculator); + FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, toBeUpdated, flagUpdateCalculator); + FlagsUpdateStageResult finalResult = handleUpdatesStagedRetry(mailboxId, flagUpdateCalculator, firstResult); + if (!finalResult.getFailed().isEmpty()) { + LOGGER.error("Can not update following UIDs {} for mailbox {}", finalResult.getFailed(), mailboxId.asUuid()); + } + return finalResult.getSucceeded().iterator(); + } + private FlagsUpdateStageResult handleUpdatesStagedRetry(CassandraId mailboxId, FlagsUpdateCalculator flagUpdateCalculator, FlagsUpdateStageResult firstResult) { + FlagsUpdateStageResult globalResult = firstResult; int retryCount = 0; - while (retryCount < maxRetries && !globalResult.getFailed().isEmpty()) { retryCount++; - FlagsUpdateStageResult stageResult = runUpdateStage(mailboxId, - FluentFutureStream.of( - globalResult.getFailed().stream() - .map(uid -> messageIdDAO.retrieve(mailboxId, uid))) - .flatMap(OptionalConverter::toStream) - .completableFuture().join(), - flagsUpdateCalculator); - - globalResult = globalResult.keepSuccess().merge(stageResult); + FlagsUpdateStageResult stageResult = retryUpdatesStage(mailboxId, flagUpdateCalculator, globalResult.getFailed()); + globalResult = globalResult.keepSucceded().merge(stageResult); } + return globalResult; + } - LOGGER.error("Can not update following UIDs {} for mailbox {}", globalResult.getFailed(), mailboxId.asUuid()); + private FlagsUpdateStageResult retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) { + Stream<ComposedMessageIdWithMetaData> idsFailed = FluentFutureStream.of( + failed.stream().map(uid -> messageIdDAO.retrieve(mailboxId, uid))) + .flatMap(OptionalConverter::toStream) + .join(); - return globalResult.getSucceeded(); + return runUpdateStage(mailboxId, idsFailed, flagsUpdateCalculator); } private FlagsUpdateStageResult runUpdateStage(CassandraId mailboxId, Stream<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) { - Long newModSeq = modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new RuntimeException("ModSeq generation failed")); + Long newModSeq = modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid())); - FlagsUpdateStageResult result = toBeUpdated - .map(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator, - newModSeq, - oldMetadata)) - .reduce(FlagsUpdateStageResult::merge) - .orElse(none()); + return toBeUpdated.collect(JamesCollectors.chunker(UPDATE_FLAGS_BATCH_SIZE)) + .map(uidChunk -> performUpdatesForChunk(mailboxId, flagsUpdateCalculator, newModSeq, uidChunk)) + .map(CompletableFuture::join) + .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge); + } - result.getSucceeded().stream() - .map((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags) - .thenApply(voidValue -> updatedFlags)) - .forEach(CompletableFuture::join); + private CompletableFuture<FlagsUpdateStageResult> performUpdatesForChunk(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, Long newModSeq, Collection<ComposedMessageIdWithMetaData> uidChunk) { + Stream<CompletableFuture<FlagsUpdateStageResult>> updateMetaDataFuture = + uidChunk.stream().map(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, oldMetadata)); - return result; + return FluentFutureStream.of(updateMetaDataFuture) + .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge) + .thenCompose(result -> updateIndexesForUpdatesResult(mailboxId, result)); + } + + private CompletableFuture<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) { + return FluentFutureStream.of( + result.getSucceeded().stream() + .map((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags))) + .completableFuture() + .thenApply(any -> result); } @Override @@ -362,93 +374,49 @@ public class CassandraMessageMapper implements MessageMapper { } - private FlagsUpdateStageResult tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData oldMetaData) { + private CompletableFuture<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData oldMetaData) { Flags oldFlags = oldMetaData.getFlags(); Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags); if (identicalFlags(oldFlags, newFlags)) { - return success(UpdatedFlags.builder() + return CompletableFuture.completedFuture(FlagsUpdateStageResult.success(UpdatedFlags.builder() .uid(oldMetaData.getComposedMessageId().getUid()) .modSeq(oldMetaData.getModSeq()) .oldFlags(oldFlags) .newFlags(newFlags) - .build()); + .build())); } - if (updateFlags(oldMetaData, newFlags, newModSeq)) { - return success(UpdatedFlags.builder() - .uid(oldMetaData.getComposedMessageId().getUid()) - .modSeq(newModSeq) - .oldFlags(oldFlags) - .newFlags(newFlags) - .build()); - } else { - return fail(oldMetaData.getComposedMessageId().getUid()); - } + return updateFlags(oldMetaData, newFlags, newModSeq) + .thenApply(success -> { + if (success) { + return FlagsUpdateStageResult.success(UpdatedFlags.builder() + .uid(oldMetaData.getComposedMessageId().getUid()) + .modSeq(newModSeq) + .oldFlags(oldFlags) + .newFlags(newFlags) + .build()); + } else { + return FlagsUpdateStageResult.fail(oldMetaData.getComposedMessageId().getUid()); + } + }); } private boolean identicalFlags(Flags oldFlags, Flags newFlags) { return oldFlags.equals(newFlags); } - private boolean updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, long newModSeq) { - ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder() + private CompletableFuture<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, long newModSeq) { + ComposedMessageIdWithMetaData newMetadata = ComposedMessageIdWithMetaData.builder() .composedMessageId(oldMetadata.getComposedMessageId()) .modSeq(newModSeq) .flags(newFlags) .build(); - return imapUidDAO.updateMetadata(composedMessageIdWithMetaData, oldMetadata.getModSeq()) + return imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq()) .thenCompose(success -> Optional.of(success) .filter(b -> b) - .map((Boolean any) -> messageIdDAO.updateMetadata(composedMessageIdWithMetaData) + .map((Boolean any) -> messageIdDAO.updateMetadata(newMetadata) .thenApply(v -> success)) - .orElse(CompletableFuture.completedFuture(success))) - .join(); - } - - private static FlagsUpdateStageResult success(UpdatedFlags updatedFlags) { - return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of(updatedFlags)); - } - - private static FlagsUpdateStageResult fail(MessageUid uid) { - return new FlagsUpdateStageResult(ImmutableList.of(uid), ImmutableList.of()); - } - - private static FlagsUpdateStageResult none() { - return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of()); - } - - private static class FlagsUpdateStageResult { - private final List<MessageUid> failed; - private final List<UpdatedFlags> succeeded; - - public FlagsUpdateStageResult(List<MessageUid> failed, List<UpdatedFlags> succeeded) { - this.failed = failed; - this.succeeded = succeeded; - } - - public List<MessageUid> getFailed() { - return failed; - } - - public List<UpdatedFlags> getSucceeded() { - return succeeded; - } - - public FlagsUpdateStageResult merge(FlagsUpdateStageResult other) { - return new FlagsUpdateStageResult( - ImmutableList.<MessageUid>builder() - .addAll(this.failed) - .addAll(other.failed) - .build(), - ImmutableList.<UpdatedFlags>builder() - .addAll(this.succeeded) - .addAll(other.succeeded) - .build()); - } - - public FlagsUpdateStageResult keepSuccess() { - return new FlagsUpdateStageResult(ImmutableList.of(), succeeded); - } + .orElse(CompletableFuture.completedFuture(success))); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java new file mode 100644 index 0000000..0bdaa7e --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java @@ -0,0 +1,92 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.utils; + +import java.util.List; +import java.util.Objects; + +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.model.UpdatedFlags; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; + +public class FlagsUpdateStageResult { + public static FlagsUpdateStageResult success(UpdatedFlags updatedFlags) { + return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of(updatedFlags)); + } + + public static FlagsUpdateStageResult fail(MessageUid uid) { + return new FlagsUpdateStageResult(ImmutableList.of(uid), ImmutableList.of()); + } + + public static FlagsUpdateStageResult none() { + return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of()); + } + + private final ImmutableList<MessageUid> failed; + private final ImmutableList<UpdatedFlags> succeeded; + + @VisibleForTesting + FlagsUpdateStageResult(ImmutableList<MessageUid> failed, ImmutableList<UpdatedFlags> succeeded) { + this.failed = failed; + this.succeeded = succeeded; + } + + public List<MessageUid> getFailed() { + return failed; + } + + public List<UpdatedFlags> getSucceeded() { + return succeeded; + } + + public FlagsUpdateStageResult merge(FlagsUpdateStageResult other) { + return new FlagsUpdateStageResult( + ImmutableList.<MessageUid>builder() + .addAll(this.failed) + .addAll(other.failed) + .build(), + ImmutableList.<UpdatedFlags>builder() + .addAll(this.succeeded) + .addAll(other.succeeded) + .build()); + } + + public FlagsUpdateStageResult keepSucceded() { + return new FlagsUpdateStageResult(ImmutableList.of(), succeeded); + } + + @Override + public final boolean equals(Object o) { + if (o instanceof FlagsUpdateStageResult) { + FlagsUpdateStageResult that = (FlagsUpdateStageResult) o; + + return Objects.equals(this.succeeded, that.succeeded) + && Objects.equals(this.failed, that.failed); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(failed, succeeded); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java new file mode 100644 index 0000000..2c43111 --- /dev/null +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java @@ -0,0 +1,134 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.utils; + +import static org.assertj.core.api.Assertions.assertThat; + +import javax.mail.Flags; + +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.model.UpdatedFlags; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; + +import nl.jqno.equalsverifier.EqualsVerifier; + +public class FlagsUpdateStageResultTest { + + public static final MessageUid UID = MessageUid.of(1L); + public static final MessageUid OTHER_UID = MessageUid.of(2L); + public static final UpdatedFlags UPDATED_FLAGS = UpdatedFlags.builder() + .uid(UID) + .modSeq(18L) + .oldFlags(new Flags()) + .newFlags(new Flags(Flags.Flag.SEEN)) + .build(); + public static final UpdatedFlags OTHER_UPDATED_FLAGS = UpdatedFlags.builder() + .uid(OTHER_UID) + .modSeq(18L) + .oldFlags(new Flags()) + .newFlags(new Flags(Flags.Flag.SEEN)) + .build(); + + @Test + public void classShouldRespectBeanContract() { + EqualsVerifier.forClass(FlagsUpdateStageResult.class); + } + + @Test + public void noneShouldCreateResultWithoutSuccessOrFails() { + assertThat(FlagsUpdateStageResult.none()) + .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of())); + } + + @Test + public void failShouldCreateResultWithFailedUid() { + assertThat(FlagsUpdateStageResult.fail(UID)) + .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(UID), ImmutableList.of())); + } + + @Test + public void successShouldCreateResultWithSucceededUpdatedFlags() { + assertThat(FlagsUpdateStageResult.success(UPDATED_FLAGS)) + .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of(UPDATED_FLAGS))); + } + + @Test + public void noneShouldBeWellMergedWithNone() { + assertThat(FlagsUpdateStageResult.none().merge(FlagsUpdateStageResult.none())) + .isEqualTo(FlagsUpdateStageResult.none()); + } + + @Test + public void noneShouldBeWellMergedWithFail() { + assertThat(FlagsUpdateStageResult.none().merge(FlagsUpdateStageResult.fail(UID))) + .isEqualTo(FlagsUpdateStageResult.fail(UID)); + } + + @Test + public void noneShouldBeWellMergedWithSuccess() { + assertThat(FlagsUpdateStageResult.none().merge(FlagsUpdateStageResult.success(UPDATED_FLAGS))) + .isEqualTo(FlagsUpdateStageResult.success(UPDATED_FLAGS)); + } + + @Test + public void failShouldBeWellMergedWithFail() { + assertThat(FlagsUpdateStageResult.fail(UID).merge(FlagsUpdateStageResult.fail(OTHER_UID))) + .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(UID, OTHER_UID), ImmutableList.of())); + } + + @Test + public void successShouldBeWellMergedWithFail() { + assertThat(FlagsUpdateStageResult.success(UPDATED_FLAGS).merge(FlagsUpdateStageResult.fail(UID))) + .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(UID), ImmutableList.of(UPDATED_FLAGS))); + } + + @Test + public void successShouldBeWellMergedWithSuccess() { + assertThat(FlagsUpdateStageResult.success(UPDATED_FLAGS).merge(FlagsUpdateStageResult.success(OTHER_UPDATED_FLAGS))) + .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of(UPDATED_FLAGS, OTHER_UPDATED_FLAGS))); + } + + @Test + public void getFailedShouldReturnFailedUid() { + FlagsUpdateStageResult flagsUpdateStageResult = new FlagsUpdateStageResult(ImmutableList.of(UID), ImmutableList.of(UPDATED_FLAGS)); + + assertThat(flagsUpdateStageResult.getFailed()) + .containsExactly(UID); + } + + @Test + public void getSucceededShouldReturnSucceedUpdatedFlags() { + FlagsUpdateStageResult flagsUpdateStageResult = new FlagsUpdateStageResult(ImmutableList.of(UID), ImmutableList.of(UPDATED_FLAGS)); + + assertThat(flagsUpdateStageResult.getSucceeded()) + .containsExactly(UPDATED_FLAGS); + } + + @Test + public void keepSuccessShouldDiscardFailedUids() { + FlagsUpdateStageResult flagsUpdateStageResult = new FlagsUpdateStageResult(ImmutableList.of(UID), ImmutableList.of(UPDATED_FLAGS)); + + assertThat(flagsUpdateStageResult.keepSucceded()) + .isEqualTo(FlagsUpdateStageResult.success(UPDATED_FLAGS)); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java index a3c7f51..6b5c312 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java @@ -89,10 +89,14 @@ public class CompletableFutureUtil { stream.map(action)); } - public static <T, U> CompletableFuture<Optional<T>> reduce(BinaryOperator<T> binaryOperator, CompletableFuture<Stream<T>> futureStream) { + public static <T> CompletableFuture<Optional<T>> reduce(BinaryOperator<T> binaryOperator, CompletableFuture<Stream<T>> futureStream) { return futureStream.thenApply(stream -> stream.reduce(binaryOperator)); } + public static <T> CompletableFuture<T> reduce(BinaryOperator<T> binaryOperator, CompletableFuture<Stream<T>> futureStream, T emptyAccumulator) { + return futureStream.thenApply(stream -> stream.reduce(binaryOperator).orElse(emptyAccumulator)); + } + public static <T> CompletableFuture<T> keepValue(Supplier<CompletableFuture<Void>> supplier, T value) { return supplier.get().thenApply(any -> value); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java index 0e2bf10..810f264 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java @@ -60,6 +60,10 @@ public class FluentFutureStream<T> { return CompletableFutureUtil.reduce(combiner, completableFuture); } + public CompletableFuture<T> reduce(T emptyAccumulator, BinaryOperator<T> combiner) { + return CompletableFutureUtil.reduce(combiner, completableFuture, emptyAccumulator); + } + public <U> FluentFutureStream<U> thenComposeOnAll(Function<T, CompletableFuture<U>> function) { return FluentFutureStream.of( CompletableFutureUtil.thenComposeOnAll(completableFuture(), function)); http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java b/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java index 1fd653a..adfac75 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java @@ -19,26 +19,63 @@ package org.apache.james.util.streams; -import java.util.List; -import java.util.Map; +import java.util.Collection; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collector; -import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; public class JamesCollectors { - public static <D> Collector<D, ?, Map<Integer, List<D>>> chunker(int chunkSize) { - Preconditions.checkArgument(chunkSize > 0, "ChunkSize should be strictly positive"); - AtomicInteger counter = new AtomicInteger(-1); - return Collectors.groupingBy(x -> counter.incrementAndGet() / chunkSize); + public static <D> Collector<D, ?, Stream<Collection<D>>> chunker(int chunkSize) { + return new ChunkCollector<>(chunkSize); } - public static <D> Function<Stream<D>, Stream<List<D>>> chunk(int chunkSize) { - return stream -> stream.collect(chunker(chunkSize)) - .values() - .stream(); + public static class ChunkCollector<D> implements Collector<D, Multimap<Integer, D>, Stream<Collection<D>>> { + private final int chunkSize; + private final AtomicInteger counter; + + private ChunkCollector(int chunkSize) { + Preconditions.checkArgument(chunkSize > 0, "ChunkSize should be strictly positive"); + this.chunkSize = chunkSize; + this.counter = new AtomicInteger(-1); + } + + @Override + public Supplier<Multimap<Integer, D>> supplier() { + return () -> Multimaps.synchronizedListMultimap(ArrayListMultimap.<Integer, D>create()); + } + + @Override + public BiConsumer<Multimap<Integer, D>, D> accumulator() { + return (accumulator, value) -> accumulator.put(counter.incrementAndGet() / chunkSize, value); + } + + @Override + public BinaryOperator<Multimap<Integer, D>> combiner() { + return (accumulator1, accumulator2) -> { + accumulator1.putAll(accumulator2); + return accumulator1; + }; + } + + @Override + public Function<Multimap<Integer, D>, Stream<Collection<D>>> finisher() { + return accumulator -> accumulator.asMap().values().stream(); + } + + @Override + public Set<Characteristics> characteristics() { + return ImmutableSet.of(Characteristics.CONCURRENT); + } } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java index 8b637ea..3a7e571 100644 --- a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java +++ b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java @@ -361,4 +361,31 @@ public class CompletableFutureUtilTest { .join()) .contains(6L); } + + @Test + public void reduceShouldReturnIdentityAccumulatorWhenNoValue() { + long identityAccumulator = 0L; + assertThat( + CompletableFutureUtil.reduce( + (i, j) -> i + j, + CompletableFutureUtil.<Long>allOfArray(), + identityAccumulator) + .join()) + .isEqualTo(identityAccumulator); + } + + @Test + public void reduceShouldWorkWithIdentityAccumulator() { + assertThat( + CompletableFutureUtil.reduce( + (i, j) -> i + j, + CompletableFutureUtil.allOfArray( + CompletableFuture.completedFuture(1L), + CompletableFuture.completedFuture(2L), + CompletableFuture.completedFuture(3L) + ), + 0L) + .join()) + .isEqualTo(6L); + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java b/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java index 4981735..52f1c3a 100644 --- a/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java +++ b/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java @@ -40,7 +40,8 @@ public class JamesCollectorsTest { public void chunkerShouldAcceptEmptyStrem() { Stream<Integer> emptyStream = Stream.of(); - assertThat(emptyStream.collect(JamesCollectors.chunker(10))) + assertThat(emptyStream.collect(JamesCollectors.chunker(10)) + .collect(Guavate.toImmutableList())) .isEmpty(); } @@ -63,8 +64,6 @@ public class JamesCollectorsTest { Stream<Integer> monoValueStream = Stream.of(1); List<List<Integer>> values = monoValueStream.collect(JamesCollectors.chunker(10)) - .values() - .stream() .map(ImmutableList::copyOf) .collect(Guavate.toImmutableList()); assertThat(values) @@ -76,8 +75,6 @@ public class JamesCollectorsTest { Stream<Integer> stream = Stream.of(1, 2); List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3)) - .values() - .stream() .map(ImmutableList::copyOf) .collect(Guavate.toImmutableList()); assertThat(values) @@ -89,8 +86,6 @@ public class JamesCollectorsTest { Stream<Integer> stream = Stream.of(1, 2, 3); List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3)) - .values() - .stream() .map(ImmutableList::copyOf) .collect(Guavate.toImmutableList()); assertThat(values) @@ -102,8 +97,6 @@ public class JamesCollectorsTest { Stream<Integer> stream = Stream.of(1, 2, 3, 4); List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3)) - .values() - .stream() .map(ImmutableList::copyOf) .collect(Guavate.toImmutableList()); assertThat(values) @@ -117,8 +110,6 @@ public class JamesCollectorsTest { Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6, 7); List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3)) - .values() - .stream() .map(ImmutableList::copyOf) .collect(Guavate.toImmutableList()); assertThat(values) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
