This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit beebeedf70d12343de673ac804ed3fb8499d7d99
Author: Matthieu Baechler <matth...@apache.org>
AuthorDate: Fri Jan 10 15:11:47 2020 +0100

    JAMES-3028 Implement DataChuncker for InputStream
---
 .../james/blob/cassandra/CassandraBlobStore.java   |  23 ++-
 .../james/blob/cassandra/utils/DataChunker.java    |  49 ++++--
 .../blob/cassandra/utils/DataChunkerTest.java      | 181 ++++++++++++++-------
 3 files changed, 173 insertions(+), 80 deletions(-)

diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 854bf44..5976354 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -25,7 +25,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -43,10 +42,10 @@ import org.apache.james.util.ReactorUtils;
 import com.datastax.driver.core.Session;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.function.Tuple2;
 
 public class CassandraBlobStore implements BlobStore {
 
@@ -78,22 +77,22 @@ public class CassandraBlobStore implements BlobStore {
     @Override
     public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy 
storagePolicy) {
         Preconditions.checkNotNull(data);
-
         return saveAsMono(bucketName, data);
     }
 
     private Mono<BlobId> saveAsMono(BucketName bucketName, byte[] data) {
         BlobId blobId = blobIdFactory.forPayload(data);
-        return saveBlobParts(bucketName, data, blobId)
-            .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, 
blobId, numberOfChunk)
-                .then(Mono.just(blobId)));
+        return Mono.fromCallable(() -> dataChunker.chunk(data, 
configuration.getBlobPartSize()))
+            .flatMap(chunks -> saveBlobParts(bucketName, blobId, chunks))
+            .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, 
blobId, numberOfChunk))
+            .thenReturn(blobId);
     }
 
-    private Mono<Integer> saveBlobParts(BucketName bucketName, byte[] data, 
BlobId blobId) {
-        Stream<Pair<Integer, ByteBuffer>> chunks = dataChunker.chunk(data, 
configuration.getBlobPartSize());
-        return Flux.fromStream(chunks)
+    private Mono<Integer> saveBlobParts(BucketName bucketName, BlobId blobId, 
Flux<ByteBuffer> chunksAsFlux) {
+        return chunksAsFlux
             .publishOn(Schedulers.elastic(), PREFETCH)
-            .flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), 
pair.getValue())
+            .index()
+            .flatMap(pair -> writePart(bucketName, blobId, 
pair.getT1().intValue(), pair.getT2())
                 .then(Mono.just(getChunkNum(pair))))
             .collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
             .<Integer>handle((t, sink) -> t.ifPresent(sink::next))
@@ -106,8 +105,8 @@ public class CassandraBlobStore implements BlobStore {
         return number + 1;
     }
 
-    private Integer getChunkNum(Pair<Integer, ByteBuffer> pair) {
-        return pair.getKey();
+    private Integer getChunkNum(Tuple2<Long, ByteBuffer> pair) {
+        return pair.getT1().intValue();
     }
 
     @Override
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java
index aad71dc..1c2c395 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java
@@ -19,34 +19,59 @@
 
 package org.apache.james.blob.cassandra.utils;
 
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import org.apache.commons.lang3.tuple.Pair;
 
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class DataChunker {
 
-    public Stream<Pair<Integer, ByteBuffer>> chunk(byte[] data, int chunkSize) 
{
+    private static final String CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE = 
"ChunkSize must be strictly positive";
+
+    public Flux<ByteBuffer> chunk(byte[] data, int chunkSize) {
         Preconditions.checkNotNull(data);
-        Preconditions.checkArgument(chunkSize > 0, "ChunkSize can not be 
negative");
+        Preconditions.checkArgument(chunkSize > 0, 
CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE);
 
         int size = data.length;
         int fullChunkCount = size / chunkSize;
 
-        return Stream.concat(
-            IntStream.range(0, fullChunkCount)
-                .mapToObj(i -> Pair.of(i, ByteBuffer.wrap(data, i * chunkSize, 
chunkSize))),
+        return Flux.concat(
+            Flux.range(0, fullChunkCount)
+                .map(i -> ByteBuffer.wrap(data, i * chunkSize, chunkSize)),
             lastChunk(data, chunkSize * fullChunkCount, fullChunkCount));
     }
 
-    private Stream<Pair<Integer, ByteBuffer>> lastChunk(byte[] data, int 
offset, int index) {
+    private Mono<ByteBuffer> lastChunk(byte[] data, int offset, int index) {
         if (offset == data.length && index > 0) {
-            return Stream.empty();
+            return Mono.empty();
         }
-        return Stream.of(Pair.of(index, ByteBuffer.wrap(data, offset, 
data.length - offset)));
+        return Mono.just(ByteBuffer.wrap(data, offset, data.length - offset));
     }
 
+    public Flux<ByteBuffer> chunkStream(InputStream data, int chunkSize) {
+        Preconditions.checkNotNull(data);
+        Preconditions.checkArgument(chunkSize > 0, 
CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE);
+        BufferedInputStream bufferedInputStream = new 
BufferedInputStream(data);
+        return Flux
+            .<ByteBuffer>generate(sink -> {
+                try {
+                    byte[] buffer = new byte[chunkSize];
+
+                    int size = bufferedInputStream.read(buffer);
+                    if (size <= 0) {
+                        sink.complete();
+                    } else {
+                        sink.next(ByteBuffer.wrap(buffer, 0, size));
+                    }
+                } catch (IOException e) {
+                    sink.error(e);
+                }
+            })
+            .defaultIfEmpty(ByteBuffer.wrap(new byte[0]));
+    }
 }
diff --git 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java
 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java
index 0f97a0d..a645b0b 100644
--- 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java
+++ 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java
@@ -22,16 +22,20 @@ package org.apache.james.blob.cassandra.utils;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.io.ByteArrayInputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.Assumptions;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Bytes;
+import reactor.core.publisher.Flux;
 
 public class DataChunkerTest {
 
@@ -44,67 +48,132 @@ public class DataChunkerTest {
         testee = new DataChunker();
     }
 
-    @Test
-    public void chunkShouldThrowOnNullData() {
-        assertThatThrownBy(() -> testee.chunk(null, CHUNK_SIZE))
-            .isInstanceOf(NullPointerException.class);
-    }
-
-    @Test
-    public void chunkShouldThrowOnNegativeChunkSize() {
-        int chunkSize = -1;
-        assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
-            .isInstanceOf(IllegalArgumentException.class);
-    }
+    @Nested
+    public class ByteArray {
+
+        @Test
+        public void chunkShouldThrowOnNullData() {
+            assertThatThrownBy(() -> testee.chunk(null, CHUNK_SIZE))
+                .isInstanceOf(NullPointerException.class);
+        }
+
+        @Test
+        public void chunkShouldThrowOnNegativeChunkSize() {
+            int chunkSize = -1;
+            assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        public void chunkShouldThrowOnZeroChunkSize() {
+            int chunkSize = 0;
+            assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
+            Flux<ByteBuffer> chunks = testee.chunk(new byte[0], CHUNK_SIZE);
+            ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
+            assertThat(chunks.toStream()).containsExactly(emptyBuffer);
+        }
+
+        @Test
+        public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
+            byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
+            Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
+
+            
assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
+        }
+
+        @Test
+        public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
+            byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
+            Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE);
+
+            Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
+
+            
assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
+        }
+
+        @Test
+        public void 
chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
+            byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
+            byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
+            Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE);
+            byte[] data = Bytes.concat(part1, part2);
+
+            Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
+
+            
assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(part1), 
ByteBuffer.wrap(part2));
+        }
 
-    @Test
-    public void chunkShouldThrowOnZeroChunkSize() {
-        int chunkSize = 0;
-        assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
-            .isInstanceOf(IllegalArgumentException.class);
     }
 
-    @Test
-    public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
-        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(new byte[0], 
CHUNK_SIZE);
-        ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
-        assertThat(chunks)
-            .containsOnlyElementsOf(ImmutableList.of(Pair.of(0, emptyBuffer)));
-    }
-
-    @Test
-    public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
-        byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
-
-        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, 
CHUNK_SIZE);
-
-        assertThat(chunks)
-            .containsOnlyElementsOf(ImmutableList.of(Pair.of(0, 
ByteBuffer.wrap(data))));
+    @Nested
+    public class InputStream {
+
+        @Test
+        public void chunkShouldThrowOnNullData() {
+            assertThatThrownBy(() -> testee.chunkStream(null, CHUNK_SIZE))
+                .isInstanceOf(NullPointerException.class);
+        }
+
+        @Test
+        public void chunkShouldThrowOnNegativeChunkSize() {
+            int chunkSize = -1;
+            assertThatThrownBy(() -> testee.chunkStream(new 
ByteArrayInputStream(new byte[0]), chunkSize))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        public void chunkShouldThrowOnZeroChunkSize() {
+            int chunkSize = 0;
+            assertThatThrownBy(() -> testee.chunkStream(new 
ByteArrayInputStream(new byte[0]), chunkSize))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
+            Flux<ByteBuffer> chunks = testee.chunkStream(new 
ByteArrayInputStream(new byte[0]), CHUNK_SIZE);
+            
assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(new 
byte[0]);
+        }
+
+        @Test
+        public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
+            byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
+            Flux<ByteBuffer> chunks = testee.chunkStream(new 
ByteArrayInputStream(data), CHUNK_SIZE);
+
+            
assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(data);
+        }
+
+        @Test
+        public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
+            byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
+            Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE);
+
+            Flux<ByteBuffer> chunks = testee.chunkStream(new 
ByteArrayInputStream(data), CHUNK_SIZE);
+
+            
assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(data);
+        }
+
+        @Test
+        public void 
chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
+            byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
+            byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
+            Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE);
+            byte[] data = Bytes.concat(part1, part2);
+
+            Flux<ByteBuffer> chunks = testee.chunkStream(new 
ByteArrayInputStream(data), CHUNK_SIZE);
+
+            
assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(part1, 
part2);
+        }
     }
 
-    @Test
-    public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
-        byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
-        assertThat(data.length).isEqualTo(CHUNK_SIZE);
-
-        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, 
CHUNK_SIZE);
-
-        assertThat(chunks)
-            .containsOnlyElementsOf(ImmutableList.of(Pair.of(0, 
ByteBuffer.wrap(data))));
-    }
-
-    @Test
-    public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
-        byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
-        byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
-        byte[] data = Bytes.concat(part1, part2);
-
-        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, 
CHUNK_SIZE);
-
-        assertThat(chunks)
-        .containsOnlyElementsOf(ImmutableList.of(
-                Pair.of(0, ByteBuffer.wrap(part1)),
-                Pair.of(1, ByteBuffer.wrap(part2))));
+    static byte[] read(ByteBuffer buffer) {
+        byte[] arr = new byte[buffer.remaining()];
+        buffer.get(arr);
+        return arr;
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to