JAMES-2541 Use stream when encoding/decoding blob
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0aef5b2a Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0aef5b2a Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0aef5b2a Branch: refs/heads/master Commit: 0aef5b2ac9c0c3291ffe69a6037eeef4a28ea712 Parents: 639aabc Author: Antoine Duprat <[email protected]> Authored: Thu Sep 6 15:03:58 2018 +0200 Committer: Benoit Tellier <[email protected]> Committed: Mon Sep 10 17:17:41 2018 +0700 ---------------------------------------------------------------------- .../java/org/apache/james/blob/api/Store.java | 17 ++++++++-------- .../james/blob/mail/MimeMessageStore.java | 21 +++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/0aef5b2a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java index 87588a8..122fdbe 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.util.FluentFutureStream; @@ -59,13 +60,13 @@ public interface Store<T> { } interface Encoder<T> { - Map<BlobType, InputStream> encode(T t); + Stream<Pair<BlobType, InputStream>> encode(T t); } interface Decoder<T> { void validateInput(Collection<BlobType> input); - T decode(Map<BlobType, byte[]> streams); + T decode(Stream<Pair<BlobType, byte[]>> streams); } CompletableFuture<Map<BlobType, BlobId>> save(T t); @@ -87,27 +88,25 @@ public interface Store<T> { public CompletableFuture<Map<BlobType, BlobId>> save(T t) { return FluentFutureStream.of( encoder.encode(t) - .entrySet() - .stream() .map(this::saveEntry)) .completableFuture() .thenApply(pairStream -> pairStream.collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue))); } - private CompletableFuture<Pair<BlobType, BlobId>> saveEntry(Map.Entry<BlobType, InputStream> entry) { - return blobStore.save(entry.getValue()) - .thenApply(blobId -> Pair.of(entry.getKey(), blobId)); + private CompletableFuture<Pair<BlobType, BlobId>> saveEntry(Pair<BlobType, InputStream> entry) { + return blobStore.save(entry.getRight()) + .thenApply(blobId -> Pair.of(entry.getLeft(), blobId)); } @Override public CompletableFuture<T> read(Map<BlobType, BlobId> blobIds) { decoder.validateInput(blobIds.keySet()); - CompletableFuture<ImmutableMap<BlobType, byte[]>> binaries = FluentFutureStream.of(blobIds.entrySet() + CompletableFuture<Stream<Pair<BlobType, byte[]>>> binaries = FluentFutureStream.of(blobIds.entrySet() .stream() .map(entry -> blobStore.readBytes(entry.getValue()) .thenApply(bytes -> Pair.of(entry.getKey(), bytes)))) - .collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue)); + .completableFuture(); return binaries.thenApply(decoder::decode); } http://git-wip-us.apache.org/repos/asf/james-project/blob/0aef5b2a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java ---------------------------------------------------------------------- diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java index ef718fc..e7ef327 100644 --- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java +++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; import java.util.Properties; +import java.util.stream.Stream; import javax.inject.Inject; import javax.mail.MessagingException; @@ -37,6 +38,7 @@ import javax.mail.Session; import javax.mail.internet.MimeMessage; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.Store; import org.apache.james.util.BodyOffsetInputStream; @@ -50,13 +52,13 @@ public class MimeMessageStore extends Store.Impl<MimeMessage> { static class MailEncoder implements Encoder<MimeMessage> { @Override - public Map<BlobType, InputStream> encode(MimeMessage message) { + public Stream<Pair<BlobType, InputStream>> encode(MimeMessage message) { try { byte[] messageAsArray = messageToArray(message); int bodyStartOctet = computeBodyStartOctet(messageAsArray); - return ImmutableMap.of( - HEADER_BLOB_TYPE, new ByteArrayInputStream(getHeaderBytes(messageAsArray, bodyStartOctet)), - BODY_BLOB_TYPE, new ByteArrayInputStream(getBodyBytes(messageAsArray, bodyStartOctet))); + return Stream.of( + Pair.of(HEADER_BLOB_TYPE, new ByteArrayInputStream(getHeaderBytes(messageAsArray, bodyStartOctet))), + Pair.of(BODY_BLOB_TYPE, new ByteArrayInputStream(getBodyBytes(messageAsArray, bodyStartOctet)))); } catch (MessagingException | IOException e) { throw new RuntimeException(e); } @@ -114,15 +116,16 @@ public class MimeMessageStore extends Store.Impl<MimeMessage> { } @Override - public MimeMessage decode(Map<BlobType, byte[]> streams) { + public MimeMessage decode(Stream<Pair<BlobType, byte[]>> streams) { Preconditions.checkNotNull(streams); - Preconditions.checkArgument(streams.containsKey(HEADER_BLOB_TYPE)); - Preconditions.checkArgument(streams.containsKey(BODY_BLOB_TYPE)); + Map<BlobType,byte[]> pairs = streams.collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight)); + Preconditions.checkArgument(pairs.containsKey(HEADER_BLOB_TYPE)); + Preconditions.checkArgument(pairs.containsKey(BODY_BLOB_TYPE)); return toMimeMessage( new SequenceInputStream( - new ByteArrayInputStream(streams.get(HEADER_BLOB_TYPE)), - new ByteArrayInputStream(streams.get(BODY_BLOB_TYPE)))); + new ByteArrayInputStream(pairs.get(HEADER_BLOB_TYPE)), + new ByteArrayInputStream(pairs.get(BODY_BLOB_TYPE)))); } private MimeMessage toMimeMessage(InputStream inputStream) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
