This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit beebeedf70d12343de673ac804ed3fb8499d7d99 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Fri Jan 10 15:11:47 2020 +0100 JAMES-3028 Implement DataChuncker for InputStream --- .../james/blob/cassandra/CassandraBlobStore.java | 23 ++- .../james/blob/cassandra/utils/DataChunker.java | 49 ++++-- .../blob/cassandra/utils/DataChunkerTest.java | 181 ++++++++++++++------- 3 files changed, 173 insertions(+), 80 deletions(-) diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java index 854bf44..5976354 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java @@ -25,7 +25,6 @@ import java.util.Comparator; import java.util.List; import java.util.NoSuchElementException; import java.util.stream.Collectors; -import java.util.stream.Stream; import javax.inject.Inject; @@ -43,10 +42,10 @@ import org.apache.james.util.ReactorUtils; import com.datastax.driver.core.Session; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuple2; public class CassandraBlobStore implements BlobStore { @@ -78,22 +77,22 @@ public class CassandraBlobStore implements BlobStore { @Override public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) { Preconditions.checkNotNull(data); - return saveAsMono(bucketName, data); } private Mono<BlobId> saveAsMono(BucketName bucketName, byte[] data) { BlobId blobId = blobIdFactory.forPayload(data); - return saveBlobParts(bucketName, data, blobId) - .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk) - .then(Mono.just(blobId))); + return Mono.fromCallable(() -> dataChunker.chunk(data, configuration.getBlobPartSize())) + .flatMap(chunks -> saveBlobParts(bucketName, blobId, chunks)) + .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk)) + .thenReturn(blobId); } - private Mono<Integer> saveBlobParts(BucketName bucketName, byte[] data, BlobId blobId) { - Stream<Pair<Integer, ByteBuffer>> chunks = dataChunker.chunk(data, configuration.getBlobPartSize()); - return Flux.fromStream(chunks) + private Mono<Integer> saveBlobParts(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> chunksAsFlux) { + return chunksAsFlux .publishOn(Schedulers.elastic(), PREFETCH) - .flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), pair.getValue()) + .index() + .flatMap(pair -> writePart(bucketName, blobId, pair.getT1().intValue(), pair.getT2()) .then(Mono.just(getChunkNum(pair)))) .collect(Collectors.maxBy(Comparator.comparingInt(x -> x))) .<Integer>handle((t, sink) -> t.ifPresent(sink::next)) @@ -106,8 +105,8 @@ public class CassandraBlobStore implements BlobStore { return number + 1; } - private Integer getChunkNum(Pair<Integer, ByteBuffer> pair) { - return pair.getKey(); + private Integer getChunkNum(Tuple2<Long, ByteBuffer> pair) { + return pair.getT1().intValue(); } @Override diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java index aad71dc..1c2c395 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java @@ -19,34 +19,59 @@ package org.apache.james.blob.cassandra.utils; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -import org.apache.commons.lang3.tuple.Pair; import com.google.common.base.Preconditions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class DataChunker { - public Stream<Pair<Integer, ByteBuffer>> chunk(byte[] data, int chunkSize) { + private static final String CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE = "ChunkSize must be strictly positive"; + + public Flux<ByteBuffer> chunk(byte[] data, int chunkSize) { Preconditions.checkNotNull(data); - Preconditions.checkArgument(chunkSize > 0, "ChunkSize can not be negative"); + Preconditions.checkArgument(chunkSize > 0, CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE); int size = data.length; int fullChunkCount = size / chunkSize; - return Stream.concat( - IntStream.range(0, fullChunkCount) - .mapToObj(i -> Pair.of(i, ByteBuffer.wrap(data, i * chunkSize, chunkSize))), + return Flux.concat( + Flux.range(0, fullChunkCount) + .map(i -> ByteBuffer.wrap(data, i * chunkSize, chunkSize)), lastChunk(data, chunkSize * fullChunkCount, fullChunkCount)); } - private Stream<Pair<Integer, ByteBuffer>> lastChunk(byte[] data, int offset, int index) { + private Mono<ByteBuffer> lastChunk(byte[] data, int offset, int index) { if (offset == data.length && index > 0) { - return Stream.empty(); + return Mono.empty(); } - return Stream.of(Pair.of(index, ByteBuffer.wrap(data, offset, data.length - offset))); + return Mono.just(ByteBuffer.wrap(data, offset, data.length - offset)); } + public Flux<ByteBuffer> chunkStream(InputStream data, int chunkSize) { + Preconditions.checkNotNull(data); + Preconditions.checkArgument(chunkSize > 0, CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE); + BufferedInputStream bufferedInputStream = new BufferedInputStream(data); + return Flux + .<ByteBuffer>generate(sink -> { + try { + byte[] buffer = new byte[chunkSize]; + + int size = bufferedInputStream.read(buffer); + if (size <= 0) { + sink.complete(); + } else { + sink.next(ByteBuffer.wrap(buffer, 0, size)); + } + } catch (IOException e) { + sink.error(e); + } + }) + .defaultIfEmpty(ByteBuffer.wrap(new byte[0])); + } } diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java index 0f97a0d..a645b0b 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java @@ -22,16 +22,20 @@ package org.apache.james.blob.cassandra.utils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.ByteArrayInputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; +import org.assertj.core.api.Assumptions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Bytes; +import reactor.core.publisher.Flux; public class DataChunkerTest { @@ -44,67 +48,132 @@ public class DataChunkerTest { testee = new DataChunker(); } - @Test - public void chunkShouldThrowOnNullData() { - assertThatThrownBy(() -> testee.chunk(null, CHUNK_SIZE)) - .isInstanceOf(NullPointerException.class); - } - - @Test - public void chunkShouldThrowOnNegativeChunkSize() { - int chunkSize = -1; - assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize)) - .isInstanceOf(IllegalArgumentException.class); - } + @Nested + public class ByteArray { + + @Test + public void chunkShouldThrowOnNullData() { + assertThatThrownBy(() -> testee.chunk(null, CHUNK_SIZE)) + .isInstanceOf(NullPointerException.class); + } + + @Test + public void chunkShouldThrowOnNegativeChunkSize() { + int chunkSize = -1; + assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void chunkShouldThrowOnZeroChunkSize() { + int chunkSize = 0; + assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() { + Flux<ByteBuffer> chunks = testee.chunk(new byte[0], CHUNK_SIZE); + ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]); + assertThat(chunks.toStream()).containsExactly(emptyBuffer); + } + + @Test + public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() { + byte[] data = "12345".getBytes(StandardCharsets.UTF_8); + Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE); + + assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data)); + } + + @Test + public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() { + byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8); + Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE); + + Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE); + + assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data)); + } + + @Test + public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() { + byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8); + byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8); + Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE); + byte[] data = Bytes.concat(part1, part2); + + Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE); + + assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(part1), ByteBuffer.wrap(part2)); + } - @Test - public void chunkShouldThrowOnZeroChunkSize() { - int chunkSize = 0; - assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize)) - .isInstanceOf(IllegalArgumentException.class); } - @Test - public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() { - Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(new byte[0], CHUNK_SIZE); - ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]); - assertThat(chunks) - .containsOnlyElementsOf(ImmutableList.of(Pair.of(0, emptyBuffer))); - } - - @Test - public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() { - byte[] data = "12345".getBytes(StandardCharsets.UTF_8); - - Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE); - - assertThat(chunks) - .containsOnlyElementsOf(ImmutableList.of(Pair.of(0, ByteBuffer.wrap(data)))); + @Nested + public class InputStream { + + @Test + public void chunkShouldThrowOnNullData() { + assertThatThrownBy(() -> testee.chunkStream(null, CHUNK_SIZE)) + .isInstanceOf(NullPointerException.class); + } + + @Test + public void chunkShouldThrowOnNegativeChunkSize() { + int chunkSize = -1; + assertThatThrownBy(() -> testee.chunkStream(new ByteArrayInputStream(new byte[0]), chunkSize)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void chunkShouldThrowOnZeroChunkSize() { + int chunkSize = 0; + assertThatThrownBy(() -> testee.chunkStream(new ByteArrayInputStream(new byte[0]), chunkSize)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() { + Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(new byte[0]), CHUNK_SIZE); + assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(new byte[0]); + } + + @Test + public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() { + byte[] data = "12345".getBytes(StandardCharsets.UTF_8); + Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE); + + assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(data); + } + + @Test + public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() { + byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8); + Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE); + + Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE); + + assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(data); + } + + @Test + public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() { + byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8); + byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8); + Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE); + byte[] data = Bytes.concat(part1, part2); + + Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE); + + assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(part1, part2); + } } - @Test - public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() { - byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8); - assertThat(data.length).isEqualTo(CHUNK_SIZE); - - Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE); - - assertThat(chunks) - .containsOnlyElementsOf(ImmutableList.of(Pair.of(0, ByteBuffer.wrap(data)))); - } - - @Test - public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() { - byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8); - byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8); - byte[] data = Bytes.concat(part1, part2); - - Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE); - - assertThat(chunks) - .containsOnlyElementsOf(ImmutableList.of( - Pair.of(0, ByteBuffer.wrap(part1)), - Pair.of(1, ByteBuffer.wrap(part2)))); + static byte[] read(ByteBuffer buffer) { + byte[] arr = new byte[buffer.remaining()]; + buffer.get(arr); + return arr; } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org