JAMES-2541 Use stream when encoding/decoding blob

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0aef5b2a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0aef5b2a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0aef5b2a

Branch: refs/heads/master
Commit: 0aef5b2ac9c0c3291ffe69a6037eeef4a28ea712
Parents: 639aabc
Author: Antoine Duprat <[email protected]>
Authored: Thu Sep 6 15:03:58 2018 +0200
Committer: Benoit Tellier <[email protected]>
Committed: Mon Sep 10 17:17:41 2018 +0700

----------------------------------------------------------------------
 .../java/org/apache/james/blob/api/Store.java   | 17 ++++++++--------
 .../james/blob/mail/MimeMessageStore.java       | 21 +++++++++++---------
 2 files changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/0aef5b2a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
index 87588a8..122fdbe 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.util.FluentFutureStream;
@@ -59,13 +60,13 @@ public interface Store<T> {
     }
 
     interface Encoder<T> {
-        Map<BlobType, InputStream> encode(T t);
+        Stream<Pair<BlobType, InputStream>> encode(T t);
     }
 
     interface Decoder<T> {
         void validateInput(Collection<BlobType> input);
 
-        T decode(Map<BlobType, byte[]> streams);
+        T decode(Stream<Pair<BlobType, byte[]>> streams);
     }
 
     CompletableFuture<Map<BlobType, BlobId>> save(T t);
@@ -87,27 +88,25 @@ public interface Store<T> {
         public CompletableFuture<Map<BlobType, BlobId>> save(T t) {
             return FluentFutureStream.of(
                 encoder.encode(t)
-                    .entrySet()
-                    .stream()
                     .map(this::saveEntry))
                 .completableFuture()
                 .thenApply(pairStream -> 
pairStream.collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue)));
         }
 
-        private CompletableFuture<Pair<BlobType, BlobId>> 
saveEntry(Map.Entry<BlobType, InputStream> entry) {
-            return blobStore.save(entry.getValue())
-                .thenApply(blobId -> Pair.of(entry.getKey(), blobId));
+        private CompletableFuture<Pair<BlobType, BlobId>> 
saveEntry(Pair<BlobType, InputStream> entry) {
+            return blobStore.save(entry.getRight())
+                .thenApply(blobId -> Pair.of(entry.getLeft(), blobId));
         }
 
         @Override
         public CompletableFuture<T> read(Map<BlobType, BlobId> blobIds) {
             decoder.validateInput(blobIds.keySet());
 
-            CompletableFuture<ImmutableMap<BlobType, byte[]>> binaries = 
FluentFutureStream.of(blobIds.entrySet()
+            CompletableFuture<Stream<Pair<BlobType, byte[]>>> binaries = 
FluentFutureStream.of(blobIds.entrySet()
                 .stream()
                 .map(entry -> blobStore.readBytes(entry.getValue())
                     .thenApply(bytes -> Pair.of(entry.getKey(), bytes))))
-                .collect(ImmutableMap.toImmutableMap(Pair::getKey, 
Pair::getValue));
+                .completableFuture();
 
             return binaries.thenApply(decoder::decode);
         }

http://git-wip-us.apache.org/repos/asf/james-project/blob/0aef5b2a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
----------------------------------------------------------------------
diff --git 
a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
 
b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
index ef718fc..e7ef327 100644
--- 
a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
+++ 
b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Stream;
 
 import javax.inject.Inject;
 import javax.mail.MessagingException;
@@ -37,6 +38,7 @@ import javax.mail.Session;
 import javax.mail.internet.MimeMessage;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.Store;
 import org.apache.james.util.BodyOffsetInputStream;
@@ -50,13 +52,13 @@ public class MimeMessageStore extends 
Store.Impl<MimeMessage> {
 
     static class MailEncoder implements Encoder<MimeMessage> {
         @Override
-        public Map<BlobType, InputStream> encode(MimeMessage message) {
+        public Stream<Pair<BlobType, InputStream>> encode(MimeMessage message) 
{
             try {
                 byte[] messageAsArray = messageToArray(message);
                 int bodyStartOctet = computeBodyStartOctet(messageAsArray);
-                return ImmutableMap.of(
-                    HEADER_BLOB_TYPE, new 
ByteArrayInputStream(getHeaderBytes(messageAsArray, bodyStartOctet)),
-                    BODY_BLOB_TYPE, new 
ByteArrayInputStream(getBodyBytes(messageAsArray, bodyStartOctet)));
+                return Stream.of(
+                    Pair.of(HEADER_BLOB_TYPE, new 
ByteArrayInputStream(getHeaderBytes(messageAsArray, bodyStartOctet))),
+                    Pair.of(BODY_BLOB_TYPE, new 
ByteArrayInputStream(getBodyBytes(messageAsArray, bodyStartOctet))));
             } catch (MessagingException | IOException e) {
                 throw new RuntimeException(e);
             }
@@ -114,15 +116,16 @@ public class MimeMessageStore extends 
Store.Impl<MimeMessage> {
         }
 
         @Override
-        public MimeMessage decode(Map<BlobType, byte[]> streams) {
+        public MimeMessage decode(Stream<Pair<BlobType, byte[]>> streams) {
             Preconditions.checkNotNull(streams);
-            Preconditions.checkArgument(streams.containsKey(HEADER_BLOB_TYPE));
-            Preconditions.checkArgument(streams.containsKey(BODY_BLOB_TYPE));
+            Map<BlobType,byte[]> pairs = 
streams.collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight));
+            Preconditions.checkArgument(pairs.containsKey(HEADER_BLOB_TYPE));
+            Preconditions.checkArgument(pairs.containsKey(BODY_BLOB_TYPE));
 
             return toMimeMessage(
                 new SequenceInputStream(
-                    new ByteArrayInputStream(streams.get(HEADER_BLOB_TYPE)),
-                    new ByteArrayInputStream(streams.get(BODY_BLOB_TYPE))));
+                    new ByteArrayInputStream(pairs.get(HEADER_BLOB_TYPE)),
+                    new ByteArrayInputStream(pairs.get(BODY_BLOB_TYPE))));
         }
 
         private MimeMessage toMimeMessage(InputStream inputStream) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to