This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b028cd499b20bce86b174dcb4962797bef460452 Author: Antoine Duprat <[email protected]> AuthorDate: Mon Mar 4 14:21:58 2019 +0100 JAMES-2671 Add content length in save BlobStore API --- .../java/org/apache/james/blob/api/BlobStore.java | 2 +- .../apache/james/blob/api/MetricableBlobStore.java | 4 +- .../main/java/org/apache/james/blob/api/Store.java | 28 ++++++++-- .../apache/james/blob/api/BlobStoreContract.java | 6 +-- .../james/blob/api/FixedLengthInputStreamTest.java | 63 ++++++++++++++++++++++ .../blob/api/MetricableBlobStoreContract.java | 6 +-- .../james/blob/cassandra/CassandraBlobsDAO.java | 2 +- .../blob/cassandra/CassandraBlobsDAOTest.java | 2 +- .../apache/james/blob/memory/MemoryBlobStore.java | 2 +- .../blob/objectstorage/ObjectStorageBlobsDAO.java | 16 +++--- .../objectstorage/ObjectStorageBlobsDAOTest.java | 20 ++++--- .../apache/james/blob/union/UnionBlobStore.java | 8 +-- .../james/blob/union/UnionBlobStoreTest.java | 14 ++--- .../apache/james/blob/mail/MimeMessageStore.java | 10 ++-- 14 files changed, 141 insertions(+), 42 deletions(-) diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java index 8b68d93..3d28ab5 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java @@ -26,7 +26,7 @@ public interface BlobStore { Mono<BlobId> save(byte[] data); - Mono<BlobId> save(InputStream data); + Mono<BlobId> save(InputStream data, long contentLength); Mono<byte[]> readBytes(BlobId blobId); diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java index 4ed7b17..b51e37b 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java @@ -54,9 +54,9 @@ public class MetricableBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data, long contentLength) { return metricFactory - .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(data)); + .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(data, contentLength)); } @Override diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java index 07a5611..3cd4afa 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java @@ -26,6 +26,7 @@ import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; +import com.google.common.base.Preconditions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -66,7 +67,7 @@ public interface Store<T, I> { class Impl<T, I extends BlobPartsId> implements Store<T, I> { public interface Encoder<T> { - Stream<Pair<BlobType, InputStream>> encode(T t); + Stream<Pair<BlobType, FixedLengthInputStream>> encode(T t); } public interface Decoder<T> { @@ -93,9 +94,9 @@ public interface Store<T, I> { .map(idFactory::generate); } - private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, InputStream> entry) { + private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, FixedLengthInputStream> entry) { return Mono.just(entry.getLeft()) - .zipWith(blobStore.save(entry.getRight())); + .zipWith(blobStore.save(entry.getRight().getInputStream(), entry.getRight().getContentLength())); } @Override @@ -110,4 +111,25 @@ public interface Store<T, I> { .map(decoder::decode); } } + + class FixedLengthInputStream { + + private final InputStream inputStream; + private final long contentLength; + + public FixedLengthInputStream(InputStream inputStream, long contentLength) { + Preconditions.checkNotNull(inputStream, "'inputStream' is mandatory"); + Preconditions.checkArgument(contentLength >= 0, "'contentLength' should be greater than or equal to 0"); + this.inputStream = inputStream; + this.contentLength = contentLength; + } + + public InputStream getInputStream() { + return inputStream; + } + + public long getContentLength() { + return contentLength; + } + } } diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java index 2164c74..6983485 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java @@ -49,7 +49,7 @@ public interface BlobStoreContract { @Test default void saveShouldThrowWhenNullInputStream() { - assertThatThrownBy(() -> testee().save((InputStream) null).block()) + assertThatThrownBy(() -> testee().save((InputStream) null, 0).block()) .isInstanceOf(NullPointerException.class); } @@ -64,7 +64,7 @@ public interface BlobStoreContract { @Test default void saveShouldSaveEmptyInputStream() { - BlobId blobId = testee().save(new ByteArrayInputStream(EMPTY_BYTEARRAY)).block(); + BlobId blobId = testee().save(new ByteArrayInputStream(EMPTY_BYTEARRAY), EMPTY_BYTEARRAY.length).block(); byte[] bytes = testee().readBytes(blobId).block(); @@ -81,7 +81,7 @@ public interface BlobStoreContract { @Test default void saveShouldReturnBlobIdOfInputStream() { BlobId blobId = - testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY)).block(); + testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY), SHORT_BYTEARRAY.length).block(); assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66")); } diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java new file mode 100644 index 0000000..c442b0c --- /dev/null +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.james.blob.api; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; + +import org.junit.jupiter.api.Test; + +class FixedLengthInputStreamTest { + + @Test + void fixedLengthInputStreamShouldThrowWhenInputStreamIsNull() { + assertThatThrownBy(() -> new Store.FixedLengthInputStream(null, 0)) + .isInstanceOf(NullPointerException.class) + .hasMessage("'inputStream' is mandatory"); + } + + @Test + void fixedLengthInputStreamShouldThrowWhenContentLengthIsNegative() { + assertThatThrownBy(() -> new Store.FixedLengthInputStream(new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)), -1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'contentLength' should be greater than or equal to 0"); + } + + @Test + void lengthShouldBeStored() { + int contentLength = 1; + + Store.FixedLengthInputStream testee = new Store.FixedLengthInputStream(new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)), contentLength); + + assertThat(testee.getContentLength()).isEqualTo(contentLength); + } + + @Test + void inputStreamShouldBeStored() { + ByteArrayInputStream inputStream = new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)); + + Store.FixedLengthInputStream testee = new Store.FixedLengthInputStream(inputStream, 1); + + assertThat(testee.getInputStream()).hasSameContentAs(inputStream); + } +} \ No newline at end of file diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java index c568764..ec495ed 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java @@ -86,9 +86,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract { @Test default void saveInputStreamShouldPublishSaveInputStreamTimerMetrics() { - testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block(); - testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block(); - testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block(); + testee().save(new ByteArrayInputStream(BYTES_CONTENT), BYTES_CONTENT.length).block(); + testee().save(new ByteArrayInputStream(BYTES_CONTENT), BYTES_CONTENT.length).block(); + testee().save(new ByteArrayInputStream(BYTES_CONTENT), BYTES_CONTENT.length).block(); verify(metricsTestExtension.saveInputStreamTimeMetric, times(3)).stopAndPublish(); } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java index cd77a88..3a9d88d 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java @@ -211,7 +211,7 @@ public class CassandraBlobsDAO implements BlobStore { } @Override - public Mono<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data, long contentLength) { Preconditions.checkNotNull(data); return Mono.fromCallable(() -> IOUtils.toByteArray(data)) .flatMap(this::saveAsMono); diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java index 51a9933..3e570ee 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java @@ -81,7 +81,7 @@ public class CassandraBlobsDAOTest implements MetricableBlobStoreContract { @Test void blobStoreShouldSupport100MBBlob() { - BlobId blobId = testee.save(new ZeroedInputStream(100_000_000)).block(); + BlobId blobId = testee.save(new ZeroedInputStream(100_000_000), 100_000_000).block(); InputStream bytes = testee.read(blobId); assertThat(bytes).hasSameContentAs(new ZeroedInputStream(100_000_000)); } diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java index 9f72fc4..2e203ef 100644 --- a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java +++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java @@ -53,7 +53,7 @@ public class MemoryBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data, long contentLength) { Preconditions.checkNotNull(data); try { byte[] bytes = IOUtils.toByteArray(data); diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java index 7b02932..d441e68 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java @@ -22,6 +22,7 @@ package org.apache.james.blob.objectstorage; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.james.blob.api.BlobId; @@ -82,15 +83,15 @@ public class ObjectStorageBlobsDAO implements BlobStore { @Override public Mono<BlobId> save(byte[] data) { - return save(new ByteArrayInputStream(data)); + return save(new ByteArrayInputStream(data), data.length); } @Override - public Mono<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data, long contentLength) { Preconditions.checkNotNull(data); BlobId tmpId = blobIdFactory.randomId(); - return save(data, tmpId) + return save(data, contentLength, tmpId) .flatMap(id -> updateBlobId(tmpId, id)); } @@ -102,11 +103,14 @@ public class ObjectStorageBlobsDAO implements BlobStore { .thenReturn(to); } - private Mono<BlobId> save(InputStream data, BlobId id) { + private Mono<BlobId> save(InputStream data, long contentLength, BlobId id) { String containerName = this.containerName.value(); HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data); Payload payload = payloadCodec.write(hashingInputStream); - Blob blob = blobStore.blobBuilder(id.asString()).payload(payload).build(); + Blob blob = blobStore.blobBuilder(id.asString()) + .payload(payload.getPayload()) + .contentLength(payload.getLength().orElse(contentLength)) + .build(); return Mono.fromCallable(() -> blobStore.putBlob(containerName, blob)) .then(Mono.fromCallable(() -> blobIdFactory.from(hashingInputStream.hash().toString()))); @@ -123,7 +127,7 @@ public class ObjectStorageBlobsDAO implements BlobStore { try { if (blob != null) { - return payloadCodec.read(blob.getPayload()); + return payloadCodec.read(new Payload(blob.getPayload(), Optional.empty())); } else { throw new ObjectStoreException("fail to load blob with id " + blobId); } diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java index 95c1fec..63ebb7d 100644 --- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java +++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.UUID; @@ -46,6 +47,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import com.google.common.base.Charsets; import com.google.common.base.Strings; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -123,18 +125,21 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { } @Test - void supportsEncryptionWithCustomPayloadCodec() { + void supportsEncryptionWithCustomPayloadCodec() throws IOException { ObjectStorageBlobsDAO encryptedDao = ObjectStorageBlobsDAO .builder(testConfig) .container(containerName) .blobIdFactory(blobIdFactory()) .payloadCodec(new AESPayloadCodec(CRYPTO_CONFIG)) .build(); - byte[] bytes = "James is the best!".getBytes(StandardCharsets.UTF_8); + String content = "James is the best!"; + byte[] bytes = content.getBytes(StandardCharsets.UTF_8); BlobId blobId = encryptedDao.save(bytes).block(); InputStream read = encryptedDao.read(blobId); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes)); + ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + String expectedContent = IOUtils.toString(inputStream, Charsets.UTF_8); + assertThat(content).isEqualTo(expectedContent); } @Test @@ -145,7 +150,8 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { .blobIdFactory(blobIdFactory()) .payloadCodec(new AESPayloadCodec(CRYPTO_CONFIG)) .build(); - byte[] bytes = "James is the best!".getBytes(StandardCharsets.UTF_8); + String content = "James is the best!"; + byte[] bytes = content.getBytes(StandardCharsets.UTF_8); BlobId blobId = encryptedDao.save(bytes).block(); InputStream encryptedIs = testee.read(blobId); @@ -154,7 +160,9 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { assertThat(encryptedBytes).isNotEqualTo(bytes); InputStream clearTextIs = encryptedDao.read(blobId); - assertThat(clearTextIs).hasSameContentAs(new ByteArrayInputStream(bytes)); + ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + String expectedContent = IOUtils.toString(inputStream, Charsets.UTF_8); + assertThat(content).isEqualTo(expectedContent); } @Test @@ -176,7 +184,7 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { @Test void saveInputStreamShouldNotCompleteWhenDoesNotAwait() { Mono<BlobId> blobIdFuture = testee - .save(new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8))) + .save(new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)), BIG_STRING.length()) .subscribeOn(Schedulers.elastic()); assertThat(blobIdFuture.toFuture()).isNotCompleted(); } diff --git a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java index 950fcf7..ee02c8d 100644 --- a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java +++ b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java @@ -92,14 +92,14 @@ public class UnionBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data, long contentLength) { try { return saveToCurrentFallbackIfFails( - Mono.defer(() -> currentBlobStore.save(data)), - () -> Mono.defer(() -> legacyBlobStore.save(data))); + Mono.defer(() -> currentBlobStore.save(data, contentLength)), + () -> Mono.defer(() -> legacyBlobStore.save(data, contentLength))); } catch (Exception e) { LOGGER.error("exception directly happens while saving InputStream data, fall back to legacy blob store", e); - return legacyBlobStore.save(data); + return legacyBlobStore.save(data, contentLength); } } diff --git a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java index 2bed094..563f3cd 100644 --- a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java +++ b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java @@ -60,7 +60,7 @@ class UnionBlobStoreTest implements BlobStoreContract { } @Override - public Mono<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data, long contentLength) { return Mono.error(new RuntimeException("broken everywhere")); } @@ -89,7 +89,7 @@ class UnionBlobStoreTest implements BlobStoreContract { } @Override - public Mono<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data, long contentLength) { throw new RuntimeException("broken everywhere"); } @@ -164,7 +164,7 @@ class UnionBlobStoreTest implements BlobStoreContract { .current(new ThrowingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block(); + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(unionBlobStore.read(blobId)) @@ -202,7 +202,7 @@ class UnionBlobStoreTest implements BlobStoreContract { .current(new FailingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block(); + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(unionBlobStore.read(blobId)) @@ -284,7 +284,7 @@ class UnionBlobStoreTest implements BlobStoreContract { Stream<Function<UnionBlobStore, Mono<?>>> blobStoreOperationsReturnFutures() { return Stream.of( blobStore -> blobStore.save(BLOB_CONTENT), - blobStore -> blobStore.save(new ByteArrayInputStream(BLOB_CONTENT)), + blobStore -> blobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length), blobStore -> blobStore.readBytes(BLOB_ID_FACTORY.randomId())); } @@ -394,7 +394,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void saveInputStreamShouldWriteToCurrent() { - BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block(); + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block(); assertThat(currentBlobStore.readBytes(blobId).block()) .isEqualTo(BLOB_CONTENT); @@ -402,7 +402,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void saveInputStreamShouldNotWriteToLegacy() { - BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block(); + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block(); assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).block()) .isInstanceOf(ObjectStoreException.class); diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java index 71905de..506b895 100644 --- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java +++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java @@ -69,18 +69,20 @@ public class MimeMessageStore { static class MimeMessageEncoder implements Store.Impl.Encoder<MimeMessage> { @Override - public Stream<Pair<BlobType, InputStream>> encode(MimeMessage message) { + public Stream<Pair<BlobType, Store.FixedLengthInputStream>> encode(MimeMessage message) { try { byte[] messageAsArray = messageToArray(message); int bodyStartOctet = computeBodyStartOctet(messageAsArray); + byte[] headerBytes = getHeaderBytes(messageAsArray, bodyStartOctet); + byte[] bodyBytes = getBodyBytes(messageAsArray, bodyStartOctet); return Stream.of( - Pair.of(HEADER_BLOB_TYPE, new ByteArrayInputStream(getHeaderBytes(messageAsArray, bodyStartOctet))), - Pair.of(BODY_BLOB_TYPE, new ByteArrayInputStream(getBodyBytes(messageAsArray, bodyStartOctet)))); + Pair.of(HEADER_BLOB_TYPE, new Store.FixedLengthInputStream(new ByteArrayInputStream(headerBytes), headerBytes.length)), + Pair.of(BODY_BLOB_TYPE, new Store.FixedLengthInputStream(new ByteArrayInputStream(bodyBytes), bodyBytes.length))); } catch (MessagingException | IOException e) { throw new RuntimeException(e); } } - + private static byte[] messageToArray(MimeMessage message) throws IOException, MessagingException { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); message.writeTo(byteArrayOutputStream); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
