JAMES-2541 Abstract the Id required by the Store API
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b853f6e1 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b853f6e1 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b853f6e1 Branch: refs/heads/master Commit: b853f6e1d4a049c6ccad06fbd895c9e216ba5715 Parents: 9d216b2 Author: Benoit Tellier <[email protected]> Authored: Fri Sep 7 11:17:27 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Mon Sep 10 17:17:41 2018 +0700 ---------------------------------------------------------------------- .../org/apache/james/blob/api/BlobPartsId.java | 30 ++++++ .../java/org/apache/james/blob/api/Store.java | 44 ++++---- .../james/blob/mail/MimeMessagePartsId.java | 102 +++++++++++++++++++ .../james/blob/mail/MimeMessageStore.java | 19 ++-- .../james/blob/mail/MimeMessageStoreTest.java | 39 ++----- .../cassandra/CassandraMailRepository.java | 20 ++-- 6 files changed, 175 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/b853f6e1/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobPartsId.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobPartsId.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobPartsId.java new file mode 100644 index 0000000..a6a9412 --- /dev/null +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobPartsId.java @@ -0,0 +1,30 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.api; + +import java.util.Map; + +public interface BlobPartsId { + interface Factory<I extends BlobPartsId> { + I generate(Map<Store.BlobType, BlobId> map); + } + + Map<Store.BlobType, BlobId> asMap(); +} http://git-wip-us.apache.org/repos/asf/james-project/blob/b853f6e1/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java index 122fdbe..c270a9d 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java @@ -20,8 +20,6 @@ package org.apache.james.blob.api; import java.io.InputStream; -import java.util.Collection; -import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; @@ -31,7 +29,12 @@ import org.apache.james.util.FluentFutureStream; import com.google.common.collect.ImmutableMap; -public interface Store<T> { +public interface Store<T, I> { + + CompletableFuture<I> save(T t); + + CompletableFuture<T> read(I blobIds); + class BlobType { private final String name; @@ -59,38 +62,36 @@ public interface Store<T> { } } - interface Encoder<T> { - Stream<Pair<BlobType, InputStream>> encode(T t); - } - - interface Decoder<T> { - void validateInput(Collection<BlobType> input); + class Impl<T, I extends BlobPartsId> implements Store<T, I> { - T decode(Stream<Pair<BlobType, byte[]>> streams); - } - - CompletableFuture<Map<BlobType, BlobId>> save(T t); + public interface Encoder<T> { + Stream<Pair<BlobType, InputStream>> encode(T t); + } - CompletableFuture<T> read(Map<BlobType, BlobId> blobIds); + public interface Decoder<T> { + T decode(Stream<Pair<BlobType, byte[]>> streams); + } - class Impl<T> implements Store<T> { + private final BlobPartsId.Factory<I> idFactory; private final Encoder<T> encoder; private final Decoder<T> decoder; private final BlobStore blobStore; - public Impl(Encoder<T> encoder, Decoder<T> decoder, BlobStore blobStore) { + public Impl(BlobPartsId.Factory<I> idFactory, Encoder<T> encoder, Decoder<T> decoder, BlobStore blobStore) { + this.idFactory = idFactory; this.encoder = encoder; this.decoder = decoder; this.blobStore = blobStore; } @Override - public CompletableFuture<Map<BlobType, BlobId>> save(T t) { + public CompletableFuture<I> save(T t) { return FluentFutureStream.of( encoder.encode(t) .map(this::saveEntry)) .completableFuture() - .thenApply(pairStream -> pairStream.collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue))); + .thenApply(pairStream -> pairStream.collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue))) + .thenApply(idFactory::generate); } private CompletableFuture<Pair<BlobType, BlobId>> saveEntry(Pair<BlobType, InputStream> entry) { @@ -99,10 +100,9 @@ public interface Store<T> { } @Override - public CompletableFuture<T> read(Map<BlobType, BlobId> blobIds) { - decoder.validateInput(blobIds.keySet()); - - CompletableFuture<Stream<Pair<BlobType, byte[]>>> binaries = FluentFutureStream.of(blobIds.entrySet() + public CompletableFuture<T> read(I blobIds) { + CompletableFuture<Stream<Pair<BlobType, byte[]>>> binaries = FluentFutureStream.of(blobIds.asMap() + .entrySet() .stream() .map(entry -> blobStore.readBytes(entry.getValue()) .thenApply(bytes -> Pair.of(entry.getKey(), bytes)))) http://git-wip-us.apache.org/repos/asf/james-project/blob/b853f6e1/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java ---------------------------------------------------------------------- diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java new file mode 100644 index 0000000..5d1c48c --- /dev/null +++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java @@ -0,0 +1,102 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.mail; + +import java.util.Map; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobPartsId; +import org.apache.james.blob.api.Store; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +public class MimeMessagePartsId implements BlobPartsId { + @FunctionalInterface + public interface RequireHeaderBlobId { + RequireBodyBlobId headerBlobId(BlobId headerBlobId); + } + + @FunctionalInterface + public interface RequireBodyBlobId { + Builder bodyBlobId(BlobId bodyBlobId); + } + + public static class Builder { + private final BlobId headerBlobId; + private final BlobId bodyBlobId; + + private Builder(BlobId headerBlobId, BlobId bodyBlobId) { + Preconditions.checkNotNull(headerBlobId, "'headerBlobId' should not be null"); + Preconditions.checkNotNull(bodyBlobId, "'bodyBlobId' should not be null"); + + this.headerBlobId = headerBlobId; + this.bodyBlobId = bodyBlobId; + } + + public MimeMessagePartsId build() { + return new MimeMessagePartsId(headerBlobId, bodyBlobId); + } + } + + public static RequireHeaderBlobId builder() { + return headerBlobId -> bodyBlobId -> new Builder(headerBlobId, bodyBlobId); + } + + public static class Factory implements BlobPartsId.Factory<MimeMessagePartsId> { + @Override + public MimeMessagePartsId generate(Map<Store.BlobType, BlobId> map) { + Preconditions.checkArgument(map.keySet().contains(HEADER_BLOB_TYPE), "Expecting 'mailHeader' blobId to be specified"); + Preconditions.checkArgument(map.keySet().contains(BODY_BLOB_TYPE), "Expecting 'mailBody' blobId to be specified"); + Preconditions.checkArgument(map.size() == 2, "blobId other than 'mailHeader' or 'mailBody' are not supported"); + + return builder() + .headerBlobId(map.get(HEADER_BLOB_TYPE)) + .bodyBlobId(map.get(BODY_BLOB_TYPE)) + .build(); + } + } + + static final Store.BlobType HEADER_BLOB_TYPE = new Store.BlobType("mailHeader"); + static final Store.BlobType BODY_BLOB_TYPE = new Store.BlobType("mailBody"); + + private final BlobId headerBlobId; + private final BlobId bodyBlobId; + + private MimeMessagePartsId(BlobId headerBlobId, BlobId bodyBlobId) { + this.headerBlobId = headerBlobId; + this.bodyBlobId = bodyBlobId; + } + + @Override + public Map<Store.BlobType, BlobId> asMap() { + return ImmutableMap.of( + HEADER_BLOB_TYPE, headerBlobId, + BODY_BLOB_TYPE, bodyBlobId); + } + + public BlobId getHeaderBlobId() { + return headerBlobId; + } + + public BlobId getBodyBlobId() { + return bodyBlobId; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/b853f6e1/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java ---------------------------------------------------------------------- diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java index adfc6ed..c4650b1 100644 --- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java +++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java @@ -20,6 +20,8 @@ package org.apache.james.blob.mail; import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM; +import static org.apache.james.blob.mail.MimeMessagePartsId.BODY_BLOB_TYPE; +import static org.apache.james.blob.mail.MimeMessagePartsId.HEADER_BLOB_TYPE; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -27,7 +29,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.SequenceInputStream; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.Map; import java.util.Properties; import java.util.stream.Stream; @@ -48,8 +49,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; public class MimeMessageStore { - public static final Store.BlobType HEADER_BLOB_TYPE = new Store.BlobType("mailHeader"); - public static final Store.BlobType BODY_BLOB_TYPE = new Store.BlobType("mailBody"); public static class Factory { private final BlobStore blobStore; @@ -59,15 +58,16 @@ public class MimeMessageStore { this.blobStore = blobStore; } - public Store<MimeMessage> mimeMessageStore() { + public Store<MimeMessage, MimeMessagePartsId> mimeMessageStore() { return new Store.Impl<>( + new MimeMessagePartsId.Factory(), new MailEncoder(), new MailDecoder(), blobStore); } } - static class MailEncoder implements Store.Encoder<MimeMessage> { + static class MailEncoder implements Store.Impl.Encoder<MimeMessage> { @Override public Stream<Pair<BlobType, InputStream>> encode(MimeMessage message) { try { @@ -124,14 +124,7 @@ public class MimeMessageStore { } } - static class MailDecoder implements Store.Decoder<MimeMessage> { - @Override - public void validateInput(Collection<BlobType> input) { - Preconditions.checkArgument(input.contains(HEADER_BLOB_TYPE), "Expecting 'mailHeader' blobId to be specified"); - Preconditions.checkArgument(input.contains(BODY_BLOB_TYPE), "Expecting 'mailBody' blobId to be specified"); - Preconditions.checkArgument(input.size() == 2, "blobId other than 'mailHeader' or 'mailBody' are not supported"); - } - + static class MailDecoder implements Store.Impl.Decoder<MimeMessage> { @Override public MimeMessage decode(Stream<Pair<BlobType, byte[]>> streams) { Preconditions.checkNotNull(streams); http://git-wip-us.apache.org/repos/asf/james-project/blob/b853f6e1/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java ---------------------------------------------------------------------- diff --git a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java index 0817963..d8de475 100644 --- a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java +++ b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java @@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.nio.charset.StandardCharsets; -import java.util.Map; import javax.mail.internet.MimeMessage; @@ -37,12 +36,11 @@ import org.apache.james.util.MimeMessageUtil; import org.assertj.core.api.SoftAssertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; class MimeMessageStoreTest { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); - private Store<MimeMessage> testee; + private Store<MimeMessage, MimeMessagePartsId> testee; private BlobStore blobStore; @BeforeEach @@ -64,29 +62,6 @@ class MimeMessageStoreTest { } @Test - void readShouldThrowWhenMissingHeaderBlobs() { - assertThatThrownBy(() -> testee.read(ImmutableMap.of( - MimeMessageStore.HEADER_BLOB_TYPE, BLOB_ID_FACTORY.randomId()))) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - void readShouldThrowWhenMissingBodyBlobs() { - assertThatThrownBy(() -> testee.read(ImmutableMap.of( - MimeMessageStore.BODY_BLOB_TYPE, BLOB_ID_FACTORY.randomId()))) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - void readShouldThrowWhenExtraBodyBlobs() { - assertThatThrownBy(() -> testee.read(ImmutableMap.of( - MimeMessageStore.BODY_BLOB_TYPE, BLOB_ID_FACTORY.randomId(), - MimeMessageStore.HEADER_BLOB_TYPE, BLOB_ID_FACTORY.randomId(), - new Store.BlobType("Unknown"), BLOB_ID_FACTORY.randomId()))) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test void mailStoreShouldPreserveContent() throws Exception { MimeMessage message = MimeMessageBuilder.mimeMessageBuilder() .addFrom("[email protected]") @@ -95,7 +70,7 @@ class MimeMessageStoreTest { .setText("Important mail content") .build(); - Map<Store.BlobType, BlobId> parts = testee.save(message).join(); + MimeMessagePartsId parts = testee.save(message).join(); MimeMessage retrievedMessage = testee.read(parts).join(); @@ -111,7 +86,7 @@ class MimeMessageStoreTest { .setSubject("Important Mail") .build(); - Map<Store.BlobType, BlobId> parts = testee.save(message).join(); + MimeMessagePartsId parts = testee.save(message).join(); MimeMessage retrievedMessage = testee.read(parts).join(); @@ -130,14 +105,12 @@ class MimeMessageStoreTest { .setText("Important mail content") .build(); - Map<Store.BlobType, BlobId> parts = testee.save(message).join(); + MimeMessagePartsId parts = testee.save(message).join(); SoftAssertions.assertSoftly( softly -> { - softly.assertThat(parts).containsKeys(MimeMessageStore.HEADER_BLOB_TYPE, MimeMessageStore.BODY_BLOB_TYPE); - - BlobId headerBlobId = parts.get(MimeMessageStore.HEADER_BLOB_TYPE); - BlobId bodyBlobId = parts.get(MimeMessageStore.BODY_BLOB_TYPE); + BlobId headerBlobId = parts.getHeaderBlobId(); + BlobId bodyBlobId = parts.getBodyBlobId(); softly.assertThat(new String(blobStore.readBytes(headerBlobId).join(), StandardCharsets.UTF_8)) .isEqualTo("Date: Thu, 6 Sep 2018 13:29:13 +0700 (ICT)\r\n" + http://git-wip-us.apache.org/repos/asf/james-project/blob/b853f6e1/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java index d32a2d6..c62feba 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java @@ -21,16 +21,14 @@ package org.apache.james.mailrepository.cassandra; import java.util.Collection; import java.util.Iterator; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; -import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.Store; -import org.apache.james.blob.mail.MimeMessageStore; +import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepository; import org.apache.james.mailrepository.api.MailRepositoryUrl; @@ -39,7 +37,6 @@ import org.apache.james.util.FluentFutureStream; import org.apache.mailet.Mail; import com.github.fge.lambdas.Throwing; -import com.google.common.collect.ImmutableMap; public class CassandraMailRepository implements MailRepository { @@ -47,11 +44,11 @@ public class CassandraMailRepository implements MailRepository { private final CassandraMailRepositoryKeysDAO keysDAO; private final CassandraMailRepositoryCountDAO countDAO; private final CassandraMailRepositoryMailDAO mailDAO; - private final Store<MimeMessage> mimeMessageStore; + private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; public CassandraMailRepository(MailRepositoryUrl url, CassandraMailRepositoryKeysDAO keysDAO, CassandraMailRepositoryCountDAO countDAO, CassandraMailRepositoryMailDAO mailDAO, - Store<MimeMessage> mimeMessageStore) { + Store<MimeMessage, MimeMessagePartsId> mimeMessageStore) { this.url = url; this.keysDAO = keysDAO; this.countDAO = countDAO; @@ -65,8 +62,8 @@ public class CassandraMailRepository implements MailRepository { mimeMessageStore.save(mail.getMessage()) .thenCompose(Throwing.function(parts -> mailDAO.store(url, mail, - parts.get(MimeMessageStore.HEADER_BLOB_TYPE), - parts.get(MimeMessageStore.BODY_BLOB_TYPE)))) + parts.getHeaderBlobId(), + parts.getBodyBlobId()))) .thenCompose(any -> keysDAO.store(url, mailKey)) .thenCompose(this::increaseSizeIfStored) .join(); @@ -98,9 +95,10 @@ public class CassandraMailRepository implements MailRepository { } private CompletableFuture<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) { - Map<Store.BlobType, BlobId> parts = ImmutableMap.of( - MimeMessageStore.HEADER_BLOB_TYPE, mailDTO.getHeaderBlobId(), - MimeMessageStore.BODY_BLOB_TYPE, mailDTO.getBodyBlobId()); + MimeMessagePartsId parts = MimeMessagePartsId.builder() + .headerBlobId(mailDTO.getHeaderBlobId()) + .bodyBlobId(mailDTO.getBodyBlobId()) + .build(); return mimeMessageStore.read(parts) .thenApply(mimeMessage -> mailDTO.getMailBuilder() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
