MAILBOX-297 Parallel staged flags updates

Also change MessageRange parameter of updateFlags from set to range in 
CassandraMessageMapper

Use a custom collector for chunker


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e82c8584
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e82c8584
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e82c8584

Branch: refs/heads/master
Commit: e82c858481db7af8c15d9e383294480d06a964cf
Parents: 6f2d4c7
Author: benwa <[email protected]>
Authored: Mon May 22 15:01:42 2017 +0700
Committer: benwa <[email protected]>
Committed: Mon May 29 17:02:46 2017 +0700

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMessageDAO.java     |   4 +-
 .../cassandra/mail/CassandraMessageMapper.java  | 164 ++++++++-----------
 .../mail/utils/FlagsUpdateStageResult.java      |  92 +++++++++++
 .../mail/utils/FlagsUpdateStageResultTest.java  | 134 +++++++++++++++
 .../james/util/CompletableFutureUtil.java       |   6 +-
 .../apache/james/util/FluentFutureStream.java   |   4 +
 .../james/util/streams/JamesCollectors.java     |  59 +++++--
 .../james/util/CompletableFutureUtilTest.java   |  27 +++
 .../james/util/streams/JamesCollectorsTest.java |  13 +-
 9 files changed, 379 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index a6ba328..a094540 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -193,9 +193,7 @@ public class CassandraMessageDAO {
     public CompletableFuture<Stream<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>>> 
retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType 
fetchType, Optional<Integer> limit) {
         return CompletableFutureUtil.chainAll(
             getLimitedIdStream(messageIds.stream().distinct(), limit)
-                .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ))
-                .values()
-                .stream(),
+                .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ)),
             ids -> FluentFutureStream.of(
                 ids.stream()
                     .map(id -> retrieveRow(id, fetchType)

http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 7b711bd..b959972 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.CassandraId;
 import org.apache.james.mailbox.cassandra.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.utils.FlagsUpdateStageResult;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
@@ -64,6 +66,7 @@ public class CassandraMessageMapper implements MessageMapper {
         .unseen(0L)
         .build();
     public static final int EXPUNGE_BATCH_SIZE = 100;
+    public static final int UPDATE_FLAGS_BATCH_SIZE = 20;
     public static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraMessageMapper.class);
 
     private final CassandraModSeqProvider modSeqProvider;
@@ -216,13 +219,12 @@ public class CassandraMessageMapper implements 
MessageMapper {
         return deletedMessageDAO.retrieveDeletedMessage(mailboxId, 
messageRange)
             .join()
             .collect(JamesCollectors.chunker(EXPUNGE_BATCH_SIZE))
-            .values().stream()
             .map(uidChunk -> expungeUidChunk(mailboxId, uidChunk))
             .flatMap(CompletableFuture::join)
             .collect(Guavate.toImmutableMap(MailboxMessage::getUid, 
SimpleMessageMetaData::new));
     }
 
-    private CompletableFuture<Stream<SimpleMailboxMessage>> 
expungeUidChunk(CassandraId mailboxId, List<MessageUid> uidChunk) {
+    private CompletableFuture<Stream<SimpleMailboxMessage>> 
expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) {
         return FluentFutureStream.of(uidChunk.stream()
             .map(uid -> messageIdDAO.retrieve(mailboxId, uid)))
             .flatMap(OptionalConverter::toStream)
@@ -273,53 +275,63 @@ public class CassandraMessageMapper implements 
MessageMapper {
     }
 
     @Override
-    public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, 
FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws 
MailboxException {
+    public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, 
FlagsUpdateCalculator flagUpdateCalculator, MessageRange range) throws 
MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        return runUpdate(mailboxId, set, flagUpdateCalculator).iterator();
-    }
-
-    private List<UpdatedFlags> runUpdate(CassandraId mailboxId, MessageRange 
set, FlagsUpdateCalculator flagsUpdateCalculator) throws MailboxException {
-        Stream<ComposedMessageIdWithMetaData> toBeUpdated = 
messageIdDAO.retrieveMessages(mailboxId, set).join();
+        Stream<ComposedMessageIdWithMetaData> toBeUpdated = 
messageIdDAO.retrieveMessages(mailboxId, range).join();
 
-        FlagsUpdateStageResult globalResult = runUpdateStage(mailboxId, 
toBeUpdated, flagsUpdateCalculator);
+        FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, 
toBeUpdated, flagUpdateCalculator);
+        FlagsUpdateStageResult finalResult = 
handleUpdatesStagedRetry(mailboxId, flagUpdateCalculator, firstResult);
+        if (!finalResult.getFailed().isEmpty()) {
+            LOGGER.error("Can not update following UIDs {} for mailbox {}", 
finalResult.getFailed(), mailboxId.asUuid());
+        }
+        return finalResult.getSucceeded().iterator();
+    }
 
+    private FlagsUpdateStageResult handleUpdatesStagedRetry(CassandraId 
mailboxId, FlagsUpdateCalculator flagUpdateCalculator, FlagsUpdateStageResult 
firstResult) {
+        FlagsUpdateStageResult globalResult = firstResult;
         int retryCount = 0;
-
         while (retryCount < maxRetries && !globalResult.getFailed().isEmpty()) 
{
             retryCount++;
-            FlagsUpdateStageResult stageResult = runUpdateStage(mailboxId,
-                FluentFutureStream.of(
-                    globalResult.getFailed().stream()
-                        .map(uid -> messageIdDAO.retrieve(mailboxId, uid)))
-                    .flatMap(OptionalConverter::toStream)
-                    .completableFuture().join(),
-                flagsUpdateCalculator);
-
-            globalResult = globalResult.keepSuccess().merge(stageResult);
+            FlagsUpdateStageResult stageResult = retryUpdatesStage(mailboxId, 
flagUpdateCalculator, globalResult.getFailed());
+            globalResult = globalResult.keepSucceded().merge(stageResult);
         }
+        return globalResult;
+    }
 
-        LOGGER.error("Can not update following UIDs {} for mailbox {}", 
globalResult.getFailed(), mailboxId.asUuid());
+    private FlagsUpdateStageResult retryUpdatesStage(CassandraId mailboxId, 
FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) {
+        Stream<ComposedMessageIdWithMetaData> idsFailed = 
FluentFutureStream.of(
+            failed.stream().map(uid -> messageIdDAO.retrieve(mailboxId, uid)))
+            .flatMap(OptionalConverter::toStream)
+            .join();
 
-        return globalResult.getSucceeded();
+        return runUpdateStage(mailboxId, idsFailed, flagsUpdateCalculator);
     }
 
     private FlagsUpdateStageResult runUpdateStage(CassandraId mailboxId, 
Stream<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator 
flagsUpdateCalculator) {
-        Long newModSeq = 
modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new 
RuntimeException("ModSeq generation failed"));
+        Long newModSeq = 
modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new 
RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid()));
 
-        FlagsUpdateStageResult result = toBeUpdated
-            .map(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator,
-                newModSeq,
-                oldMetadata))
-            .reduce(FlagsUpdateStageResult::merge)
-            .orElse(none());
+        return 
toBeUpdated.collect(JamesCollectors.chunker(UPDATE_FLAGS_BATCH_SIZE))
+            .map(uidChunk -> performUpdatesForChunk(mailboxId, 
flagsUpdateCalculator, newModSeq, uidChunk))
+            .map(CompletableFuture::join)
+            .reduce(FlagsUpdateStageResult.none(), 
FlagsUpdateStageResult::merge);
+    }
 
-        result.getSucceeded().stream()
-            .map((UpdatedFlags updatedFlags) -> 
indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)
-                .thenApply(voidValue -> updatedFlags))
-            .forEach(CompletableFuture::join);
+    private CompletableFuture<FlagsUpdateStageResult> 
performUpdatesForChunk(CassandraId mailboxId, FlagsUpdateCalculator 
flagsUpdateCalculator, Long newModSeq, 
Collection<ComposedMessageIdWithMetaData> uidChunk) {
+        Stream<CompletableFuture<FlagsUpdateStageResult>> updateMetaDataFuture 
=
+            uidChunk.stream().map(oldMetadata -> 
tryFlagsUpdate(flagsUpdateCalculator, newModSeq, oldMetadata));
 
-        return result;
+        return FluentFutureStream.of(updateMetaDataFuture)
+            .reduce(FlagsUpdateStageResult.none(), 
FlagsUpdateStageResult::merge)
+            .thenCompose(result -> updateIndexesForUpdatesResult(mailboxId, 
result));
+    }
+
+    private CompletableFuture<FlagsUpdateStageResult> 
updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult 
result) {
+        return FluentFutureStream.of(
+            result.getSucceeded().stream()
+                .map((UpdatedFlags updatedFlags) -> 
indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)))
+            .completableFuture()
+            .thenApply(any -> result);
     }
 
     @Override
@@ -362,93 +374,49 @@ public class CassandraMessageMapper implements 
MessageMapper {
     }
 
 
-    private FlagsUpdateStageResult tryFlagsUpdate(FlagsUpdateCalculator 
flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData 
oldMetaData) {
+    private CompletableFuture<FlagsUpdateStageResult> 
tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, long newModSeq, 
ComposedMessageIdWithMetaData oldMetaData) {
         Flags oldFlags = oldMetaData.getFlags();
         Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags);
 
         if (identicalFlags(oldFlags, newFlags)) {
-            return success(UpdatedFlags.builder()
+            return 
CompletableFuture.completedFuture(FlagsUpdateStageResult.success(UpdatedFlags.builder()
                 .uid(oldMetaData.getComposedMessageId().getUid())
                 .modSeq(oldMetaData.getModSeq())
                 .oldFlags(oldFlags)
                 .newFlags(newFlags)
-                .build());
+                .build()));
         }
 
-        if (updateFlags(oldMetaData, newFlags, newModSeq)) {
-            return success(UpdatedFlags.builder()
-                .uid(oldMetaData.getComposedMessageId().getUid())
-                .modSeq(newModSeq)
-                .oldFlags(oldFlags)
-                .newFlags(newFlags)
-                .build());
-        } else {
-            return fail(oldMetaData.getComposedMessageId().getUid());
-        }
+        return updateFlags(oldMetaData, newFlags, newModSeq)
+            .thenApply(success -> {
+                if (success) {
+                    return 
FlagsUpdateStageResult.success(UpdatedFlags.builder()
+                        .uid(oldMetaData.getComposedMessageId().getUid())
+                        .modSeq(newModSeq)
+                        .oldFlags(oldFlags)
+                        .newFlags(newFlags)
+                        .build());
+                } else {
+                    return 
FlagsUpdateStageResult.fail(oldMetaData.getComposedMessageId().getUid());
+                }
+            });
     }
 
     private boolean identicalFlags(Flags oldFlags, Flags newFlags) {
         return oldFlags.equals(newFlags);
     }
 
-    private boolean updateFlags(ComposedMessageIdWithMetaData oldMetadata, 
Flags newFlags, long newModSeq) {
-        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
ComposedMessageIdWithMetaData.builder()
+    private CompletableFuture<Boolean> 
updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, long 
newModSeq) {
+        ComposedMessageIdWithMetaData newMetadata = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(oldMetadata.getComposedMessageId())
                 .modSeq(newModSeq)
                 .flags(newFlags)
                 .build();
-        return imapUidDAO.updateMetadata(composedMessageIdWithMetaData, 
oldMetadata.getModSeq())
+        return imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq())
             .thenCompose(success -> Optional.of(success)
                 .filter(b -> b)
-                .map((Boolean any) -> 
messageIdDAO.updateMetadata(composedMessageIdWithMetaData)
+                .map((Boolean any) -> messageIdDAO.updateMetadata(newMetadata)
                     .thenApply(v -> success))
-                .orElse(CompletableFuture.completedFuture(success)))
-            .join();
-    }
-
-    private static FlagsUpdateStageResult success(UpdatedFlags updatedFlags) {
-        return new FlagsUpdateStageResult(ImmutableList.of(), 
ImmutableList.of(updatedFlags));
-    }
-
-    private static FlagsUpdateStageResult fail(MessageUid uid) {
-        return new FlagsUpdateStageResult(ImmutableList.of(uid), 
ImmutableList.of());
-    }
-
-    private static FlagsUpdateStageResult none() {
-        return new FlagsUpdateStageResult(ImmutableList.of(), 
ImmutableList.of());
-    }
-
-    private static class FlagsUpdateStageResult {
-        private final List<MessageUid> failed;
-        private final List<UpdatedFlags> succeeded;
-
-        public FlagsUpdateStageResult(List<MessageUid> failed, 
List<UpdatedFlags> succeeded) {
-            this.failed = failed;
-            this.succeeded = succeeded;
-        }
-
-        public List<MessageUid> getFailed() {
-            return failed;
-        }
-
-        public List<UpdatedFlags> getSucceeded() {
-            return succeeded;
-        }
-
-        public FlagsUpdateStageResult merge(FlagsUpdateStageResult other) {
-            return new FlagsUpdateStageResult(
-                ImmutableList.<MessageUid>builder()
-                    .addAll(this.failed)
-                    .addAll(other.failed)
-                    .build(),
-                ImmutableList.<UpdatedFlags>builder()
-                    .addAll(this.succeeded)
-                    .addAll(other.succeeded)
-                    .build());
-        }
-
-        public FlagsUpdateStageResult keepSuccess() {
-            return new FlagsUpdateStageResult(ImmutableList.of(), succeeded);
-        }
+                .orElse(CompletableFuture.completedFuture(success)));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java
new file mode 100644
index 0000000..0bdaa7e
--- /dev/null
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java
@@ -0,0 +1,92 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.utils;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.model.UpdatedFlags;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+public class FlagsUpdateStageResult {
+    public static FlagsUpdateStageResult success(UpdatedFlags updatedFlags) {
+        return new FlagsUpdateStageResult(ImmutableList.of(), 
ImmutableList.of(updatedFlags));
+    }
+
+    public static FlagsUpdateStageResult fail(MessageUid uid) {
+        return new FlagsUpdateStageResult(ImmutableList.of(uid), 
ImmutableList.of());
+    }
+
+    public static FlagsUpdateStageResult none() {
+        return new FlagsUpdateStageResult(ImmutableList.of(), 
ImmutableList.of());
+    }
+
+    private final ImmutableList<MessageUid> failed;
+    private final ImmutableList<UpdatedFlags> succeeded;
+
+    @VisibleForTesting
+    FlagsUpdateStageResult(ImmutableList<MessageUid> failed, 
ImmutableList<UpdatedFlags> succeeded) {
+        this.failed = failed;
+        this.succeeded = succeeded;
+    }
+
+    public List<MessageUid> getFailed() {
+        return failed;
+    }
+
+    public List<UpdatedFlags> getSucceeded() {
+        return succeeded;
+    }
+
+    public FlagsUpdateStageResult merge(FlagsUpdateStageResult other) {
+        return new FlagsUpdateStageResult(
+            ImmutableList.<MessageUid>builder()
+                .addAll(this.failed)
+                .addAll(other.failed)
+                .build(),
+            ImmutableList.<UpdatedFlags>builder()
+                .addAll(this.succeeded)
+                .addAll(other.succeeded)
+                .build());
+    }
+
+    public FlagsUpdateStageResult keepSucceded() {
+        return new FlagsUpdateStageResult(ImmutableList.of(), succeeded);
+    }
+
+    @Override
+    public final boolean equals(Object o) {
+        if (o instanceof FlagsUpdateStageResult) {
+            FlagsUpdateStageResult that = (FlagsUpdateStageResult) o;
+
+            return Objects.equals(this.succeeded, that.succeeded)
+                && Objects.equals(this.failed, that.failed);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hash(failed, succeeded);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java
new file mode 100644
index 0000000..2c43111
--- /dev/null
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java
@@ -0,0 +1,134 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.utils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import javax.mail.Flags;
+
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.model.UpdatedFlags;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+public class FlagsUpdateStageResultTest {
+
+    public static final MessageUid UID = MessageUid.of(1L);
+    public static final MessageUid OTHER_UID = MessageUid.of(2L);
+    public static final UpdatedFlags UPDATED_FLAGS = UpdatedFlags.builder()
+        .uid(UID)
+        .modSeq(18L)
+        .oldFlags(new Flags())
+        .newFlags(new Flags(Flags.Flag.SEEN))
+        .build();
+    public static final UpdatedFlags OTHER_UPDATED_FLAGS = 
UpdatedFlags.builder()
+        .uid(OTHER_UID)
+        .modSeq(18L)
+        .oldFlags(new Flags())
+        .newFlags(new Flags(Flags.Flag.SEEN))
+        .build();
+
+    @Test
+    public void classShouldRespectBeanContract() {
+        EqualsVerifier.forClass(FlagsUpdateStageResult.class);
+    }
+
+    @Test
+    public void noneShouldCreateResultWithoutSuccessOrFails() {
+        assertThat(FlagsUpdateStageResult.none())
+            .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(), 
ImmutableList.of()));
+    }
+
+    @Test
+    public void failShouldCreateResultWithFailedUid() {
+        assertThat(FlagsUpdateStageResult.fail(UID))
+            .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(UID), 
ImmutableList.of()));
+    }
+
+    @Test
+    public void successShouldCreateResultWithSucceededUpdatedFlags() {
+        assertThat(FlagsUpdateStageResult.success(UPDATED_FLAGS))
+            .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(), 
ImmutableList.of(UPDATED_FLAGS)));
+    }
+
+    @Test
+    public void noneShouldBeWellMergedWithNone() {
+        
assertThat(FlagsUpdateStageResult.none().merge(FlagsUpdateStageResult.none()))
+            .isEqualTo(FlagsUpdateStageResult.none());
+    }
+
+    @Test
+    public void noneShouldBeWellMergedWithFail() {
+        
assertThat(FlagsUpdateStageResult.none().merge(FlagsUpdateStageResult.fail(UID)))
+            .isEqualTo(FlagsUpdateStageResult.fail(UID));
+    }
+
+    @Test
+    public void noneShouldBeWellMergedWithSuccess() {
+        
assertThat(FlagsUpdateStageResult.none().merge(FlagsUpdateStageResult.success(UPDATED_FLAGS)))
+            .isEqualTo(FlagsUpdateStageResult.success(UPDATED_FLAGS));
+    }
+
+    @Test
+    public void failShouldBeWellMergedWithFail() {
+        
assertThat(FlagsUpdateStageResult.fail(UID).merge(FlagsUpdateStageResult.fail(OTHER_UID)))
+            .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(UID, 
OTHER_UID), ImmutableList.of()));
+    }
+
+    @Test
+    public void successShouldBeWellMergedWithFail() {
+        
assertThat(FlagsUpdateStageResult.success(UPDATED_FLAGS).merge(FlagsUpdateStageResult.fail(UID)))
+            .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(UID), 
ImmutableList.of(UPDATED_FLAGS)));
+    }
+
+    @Test
+    public void successShouldBeWellMergedWithSuccess() {
+        
assertThat(FlagsUpdateStageResult.success(UPDATED_FLAGS).merge(FlagsUpdateStageResult.success(OTHER_UPDATED_FLAGS)))
+            .isEqualTo(new FlagsUpdateStageResult(ImmutableList.of(), 
ImmutableList.of(UPDATED_FLAGS, OTHER_UPDATED_FLAGS)));
+    }
+
+    @Test
+    public void getFailedShouldReturnFailedUid() {
+        FlagsUpdateStageResult flagsUpdateStageResult = new 
FlagsUpdateStageResult(ImmutableList.of(UID), ImmutableList.of(UPDATED_FLAGS));
+
+        assertThat(flagsUpdateStageResult.getFailed())
+            .containsExactly(UID);
+    }
+
+    @Test
+    public void getSucceededShouldReturnSucceedUpdatedFlags() {
+        FlagsUpdateStageResult flagsUpdateStageResult = new 
FlagsUpdateStageResult(ImmutableList.of(UID), ImmutableList.of(UPDATED_FLAGS));
+
+        assertThat(flagsUpdateStageResult.getSucceeded())
+            .containsExactly(UPDATED_FLAGS);
+    }
+
+    @Test
+    public void keepSuccessShouldDiscardFailedUids() {
+        FlagsUpdateStageResult flagsUpdateStageResult = new 
FlagsUpdateStageResult(ImmutableList.of(UID), ImmutableList.of(UPDATED_FLAGS));
+
+        assertThat(flagsUpdateStageResult.keepSucceded())
+            .isEqualTo(FlagsUpdateStageResult.success(UPDATED_FLAGS));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
 
b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
index a3c7f51..6b5c312 100644
--- 
a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
+++ 
b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
@@ -89,10 +89,14 @@ public class CompletableFutureUtil {
                 stream.map(action));
     }
 
-    public static <T, U> CompletableFuture<Optional<T>> 
reduce(BinaryOperator<T> binaryOperator, CompletableFuture<Stream<T>> 
futureStream) {
+    public static <T> CompletableFuture<Optional<T>> reduce(BinaryOperator<T> 
binaryOperator, CompletableFuture<Stream<T>> futureStream) {
         return futureStream.thenApply(stream -> stream.reduce(binaryOperator));
     }
 
+    public static <T> CompletableFuture<T> reduce(BinaryOperator<T> 
binaryOperator, CompletableFuture<Stream<T>> futureStream, T emptyAccumulator) {
+        return futureStream.thenApply(stream -> 
stream.reduce(binaryOperator).orElse(emptyAccumulator));
+    }
+
     public static <T> CompletableFuture<T> 
keepValue(Supplier<CompletableFuture<Void>> supplier, T value) {
         return supplier.get().thenApply(any -> value);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
 
b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
index 0e2bf10..810f264 100644
--- 
a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
+++ 
b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
@@ -60,6 +60,10 @@ public class FluentFutureStream<T> {
         return CompletableFutureUtil.reduce(combiner, completableFuture);
     }
 
+    public CompletableFuture<T> reduce(T emptyAccumulator, BinaryOperator<T> 
combiner) {
+        return CompletableFutureUtil.reduce(combiner, completableFuture, 
emptyAccumulator);
+    }
+
     public <U> FluentFutureStream<U> thenComposeOnAll(Function<T, 
CompletableFuture<U>> function) {
         return FluentFutureStream.of(
             CompletableFutureUtil.thenComposeOnAll(completableFuture(), 
function));

http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
 
b/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
index 1fd653a..adfac75 100644
--- 
a/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
+++ 
b/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
@@ -19,26 +19,63 @@
 
 package org.apache.james.util.streams;
 
-import java.util.List;
-import java.util.Map;
+import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.BinaryOperator;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collector;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
 
 public class JamesCollectors {
-    public static <D> Collector<D, ?, Map<Integer, List<D>>> chunker(int 
chunkSize) {
-        Preconditions.checkArgument(chunkSize > 0, "ChunkSize should be 
strictly positive");
-        AtomicInteger counter = new AtomicInteger(-1);
-        return Collectors.groupingBy(x -> counter.incrementAndGet() / 
chunkSize);
+    public static <D> Collector<D, ?, Stream<Collection<D>>> chunker(int 
chunkSize) {
+        return new ChunkCollector<>(chunkSize);
     }
 
-    public static <D> Function<Stream<D>, Stream<List<D>>> chunk(int 
chunkSize) {
-        return stream -> stream.collect(chunker(chunkSize))
-            .values()
-            .stream();
+    public static class ChunkCollector<D> implements Collector<D, 
Multimap<Integer, D>, Stream<Collection<D>>> {
+        private final int chunkSize;
+        private final AtomicInteger counter;
+
+        private ChunkCollector(int chunkSize) {
+            Preconditions.checkArgument(chunkSize > 0, "ChunkSize should be 
strictly positive");
+            this.chunkSize = chunkSize;
+            this.counter = new AtomicInteger(-1);
+        }
+
+        @Override
+        public Supplier<Multimap<Integer, D>> supplier() {
+            return () -> 
Multimaps.synchronizedListMultimap(ArrayListMultimap.<Integer, D>create());
+        }
+
+        @Override
+        public BiConsumer<Multimap<Integer, D>, D> accumulator() {
+            return (accumulator, value) -> 
accumulator.put(counter.incrementAndGet() / chunkSize, value);
+        }
+
+        @Override
+        public BinaryOperator<Multimap<Integer, D>> combiner() {
+            return (accumulator1, accumulator2) -> {
+                accumulator1.putAll(accumulator2);
+                return accumulator1;
+            };
+        }
+
+        @Override
+        public Function<Multimap<Integer, D>, Stream<Collection<D>>> 
finisher() {
+            return accumulator -> accumulator.asMap().values().stream();
+        }
+
+        @Override
+        public Set<Characteristics> characteristics() {
+            return ImmutableSet.of(Characteristics.CONCURRENT);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
 
b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
index 8b637ea..3a7e571 100644
--- 
a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
+++ 
b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
@@ -361,4 +361,31 @@ public class CompletableFutureUtilTest {
                 .join())
             .contains(6L);
     }
+
+    @Test
+    public void reduceShouldReturnIdentityAccumulatorWhenNoValue() {
+        long identityAccumulator = 0L;
+        assertThat(
+            CompletableFutureUtil.reduce(
+                (i, j) -> i + j,
+                CompletableFutureUtil.<Long>allOfArray(),
+                identityAccumulator)
+                .join())
+            .isEqualTo(identityAccumulator);
+    }
+
+    @Test
+    public void reduceShouldWorkWithIdentityAccumulator() {
+        assertThat(
+            CompletableFutureUtil.reduce(
+                (i, j) -> i + j,
+                CompletableFutureUtil.allOfArray(
+                    CompletableFuture.completedFuture(1L),
+                    CompletableFuture.completedFuture(2L),
+                    CompletableFuture.completedFuture(3L)
+                ),
+                0L)
+                .join())
+            .isEqualTo(6L);
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e82c8584/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java
 
b/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java
index 4981735..52f1c3a 100644
--- 
a/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java
+++ 
b/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java
@@ -40,7 +40,8 @@ public class JamesCollectorsTest {
     public void chunkerShouldAcceptEmptyStrem() {
         Stream<Integer> emptyStream = Stream.of();
 
-        assertThat(emptyStream.collect(JamesCollectors.chunker(10)))
+        assertThat(emptyStream.collect(JamesCollectors.chunker(10))
+            .collect(Guavate.toImmutableList()))
             .isEmpty();
     }
 
@@ -63,8 +64,6 @@ public class JamesCollectorsTest {
         Stream<Integer> monoValueStream = Stream.of(1);
 
         List<List<Integer>> values = 
monoValueStream.collect(JamesCollectors.chunker(10))
-            .values()
-            .stream()
             .map(ImmutableList::copyOf)
             .collect(Guavate.toImmutableList());
         assertThat(values)
@@ -76,8 +75,6 @@ public class JamesCollectorsTest {
         Stream<Integer> stream = Stream.of(1, 2);
 
         List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
-            .values()
-            .stream()
             .map(ImmutableList::copyOf)
             .collect(Guavate.toImmutableList());
         assertThat(values)
@@ -89,8 +86,6 @@ public class JamesCollectorsTest {
         Stream<Integer> stream = Stream.of(1, 2, 3);
 
         List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
-            .values()
-            .stream()
             .map(ImmutableList::copyOf)
             .collect(Guavate.toImmutableList());
         assertThat(values)
@@ -102,8 +97,6 @@ public class JamesCollectorsTest {
         Stream<Integer> stream = Stream.of(1, 2, 3, 4);
 
         List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
-            .values()
-            .stream()
             .map(ImmutableList::copyOf)
             .collect(Guavate.toImmutableList());
         assertThat(values)
@@ -117,8 +110,6 @@ public class JamesCollectorsTest {
         Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6, 7);
 
         List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
-            .values()
-            .stream()
             .map(ImmutableList::copyOf)
             .collect(Guavate.toImmutableList());
         assertThat(values)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to