This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4cb2104ea88b1c5468d673be360be333f38e1a49 Author: Gautier DI FOLCO <[email protected]> AuthorDate: Tue Aug 20 14:20:18 2019 +0200 JAMES-2851 CassandraBlobStore reads ByteBuffer instead of byte[] --- .../james/blob/cassandra/CassandraBlobStore.java | 20 ++++++++++++++++---- .../james/blob/cassandra/CassandraBucketDAO.java | 9 +++------ .../blob/cassandra/CassandraDefaultBucketDAO.java | 9 +++------ .../james/blob/cassandra/CassandraBucketDAOTest.java | 20 ++++++++++---------- .../cassandra/CassandraDefaultBucketDAOTest.java | 18 +++++++++--------- .../java/org/apache/james/util/ReactorUtils.java | 19 ++++++++++--------- .../java/org/apache/james/util/ReactorUtilsTest.java | 15 ++++++++++----- 7 files changed, 61 insertions(+), 49 deletions(-) diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java index 171f215..7f7efe3 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java @@ -22,6 +22,7 @@ package org.apache.james.blob.cassandra; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Comparator; +import java.util.List; import java.util.NoSuchElementException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -42,7 +43,6 @@ import org.apache.james.util.ReactorUtils; import com.datastax.driver.core.Session; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.primitives.Bytes; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -114,7 +114,7 @@ public class CassandraBlobStore implements BlobStore { public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { return readBlobParts(bucketName, blobId) .collectList() - .map(parts -> Bytes.concat(parts.toArray(new byte[0][]))); + .map(this::byteBuffersToBytesArray); } @Override @@ -127,7 +127,7 @@ public class CassandraBlobStore implements BlobStore { return BucketName.DEFAULT; } - private Flux<byte[]> readBlobParts(BucketName bucketName, BlobId blobId) { + private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) { Integer rowCount = selectRowCount(bucketName, blobId) .publishOn(Schedulers.elastic()) .single() @@ -173,7 +173,7 @@ public class CassandraBlobStore implements BlobStore { } } - private Mono<byte[]> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) { + private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) { if (isDefaultBucket(bucketName)) { return defaultBucketDAO.readPart(blobId, partIndex); } else { @@ -208,4 +208,16 @@ public class CassandraBlobStore implements BlobStore { private boolean isDefaultBucket(BucketName bucketName) { return bucketName.equals(getDefaultBucketName()); } + + private byte[] byteBuffersToBytesArray(List<ByteBuffer> byteBuffers) { + int targetSize = byteBuffers + .stream() + .mapToInt(ByteBuffer::remaining) + .sum(); + + return byteBuffers + .stream() + .reduce(ByteBuffer.allocate(targetSize), (accumulator, element) -> accumulator.put(element)) + .array(); + } } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java index 8d355bf..f6d124f 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java @@ -145,7 +145,7 @@ class CassandraBucketDAO { .map(row -> row.getInt(NUMBER_OF_CHUNK)); } - Mono<byte[]> readPart(BucketName bucketName, BlobId blobId, int position) { + Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, int position) { return cassandraAsyncExecutor.executeSingleRow( selectPart.bind() .setString(BucketBlobParts.BUCKET, bucketName.asString()) @@ -173,10 +173,7 @@ class CassandraBucketDAO { .map(row -> Pair.of(BucketName.of(row.getString(BUCKET)), blobIdFactory.from(row.getString(ID)))); } - private byte[] rowToData(Row row) { - ByteBuffer byteBuffer = row.getBytes(BucketBlobParts.DATA); - byte[] data = new byte[byteBuffer.remaining()]; - byteBuffer.get(data); - return data; + private ByteBuffer rowToData(Row row) { + return row.getBytes(BucketBlobParts.DATA); } } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java index 3bcc99e..d564066 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java @@ -123,7 +123,7 @@ public class CassandraDefaultBucketDAO { .map(row -> row.getInt(NUMBER_OF_CHUNK)); } - Mono<byte[]> readPart(BlobId blobId, int position) { + Mono<ByteBuffer> readPart(BlobId blobId, int position) { return cassandraAsyncExecutor.executeSingleRow( selectPart.bind() .setString(DefaultBucketBlobParts.ID, blobId.asString()) @@ -143,10 +143,7 @@ public class CassandraDefaultBucketDAO { .setString(DefaultBucketBlobParts.ID, blobId.asString())); } - private byte[] rowToData(Row row) { - ByteBuffer byteBuffer = row.getBytes(DefaultBucketBlobParts.DATA); - byte[] data = new byte[byteBuffer.remaining()]; - byteBuffer.get(data); - return data; + private ByteBuffer rowToData(Row row) { + return row.getBytes(DefaultBucketBlobParts.DATA); } } diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java index 66a7abc..2142200 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java @@ -55,7 +55,7 @@ class CassandraBucketDAOTest { @Test void readPartShouldReturnEmptyWhenNone() { - Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional(); assertThat(maybeBytes).isEmpty(); } @@ -88,7 +88,7 @@ class CassandraBucketDAOTest { testee.deleteParts(BUCKET_NAME, BLOB_ID).block(); - Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional(); assertThat(maybeBytes).isEmpty(); } @@ -99,8 +99,8 @@ class CassandraBucketDAOTest { testee.deleteParts(BUCKET_NAME, BLOB_ID).block(); - Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional(); - Optional<byte[]> maybeBytes2 = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional(); + Optional<ByteBuffer> maybeBytes2 = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional(); assertThat(maybeBytes).isEmpty(); assertThat(maybeBytes2).isEmpty(); } @@ -109,16 +109,16 @@ class CassandraBucketDAOTest { void readPartShouldReturnPreviouslySavedData() { testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block(); - Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional(); - assertThat(maybeBytes).contains(DATA); + assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA)); } @Test void readPartShouldNotReturnContentOfOtherParts() { testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block(); - Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional(); assertThat(maybeBytes).isEmpty(); } @@ -127,7 +127,7 @@ class CassandraBucketDAOTest { void readPartShouldNotReturnContentOfOtherBuckets() { testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block(); - Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME_2, BLOB_ID, POSITION).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME_2, BLOB_ID, POSITION).blockOptional(); assertThat(maybeBytes).isEmpty(); } @@ -137,9 +137,9 @@ class CassandraBucketDAOTest { testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block(); testee.writePart(ByteBuffer.wrap(DATA_2), BUCKET_NAME, BLOB_ID, POSITION).block(); - Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional(); - assertThat(maybeBytes).contains(DATA_2); + assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA_2)); } @Test diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java index 01ffaab..78d9359 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java @@ -52,7 +52,7 @@ class CassandraDefaultBucketDAOTest { @Test void readPartShouldReturnEmptyWhenNone() { - Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional(); assertThat(maybeBytes).isEmpty(); } @@ -61,16 +61,16 @@ class CassandraDefaultBucketDAOTest { void readPartShouldReturnPreviouslySavedData() { testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block(); - Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional(); - assertThat(maybeBytes).contains(DATA); + assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA)); } @Test void readPartShouldNotReturnContentOfOtherParts() { testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block(); - Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION_2).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION_2).blockOptional(); assertThat(maybeBytes).isEmpty(); } @@ -80,9 +80,9 @@ class CassandraDefaultBucketDAOTest { testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block(); testee.writePart(ByteBuffer.wrap(DATA_2), BLOB_ID, POSITION).block(); - Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional(); - assertThat(maybeBytes).contains(DATA_2); + assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA_2)); } @Test @@ -138,7 +138,7 @@ class CassandraDefaultBucketDAOTest { testee.deleteParts(BLOB_ID).block(); - Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional(); assertThat(maybeBytes).isEmpty(); } @@ -149,8 +149,8 @@ class CassandraDefaultBucketDAOTest { testee.deleteParts(BLOB_ID).block(); - Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional(); - Optional<byte[]> maybeBytes2 = testee.readPart(BLOB_ID, POSITION_2).blockOptional(); + Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional(); + Optional<ByteBuffer> maybeBytes2 = testee.readPart(BLOB_ID, POSITION_2).blockOptional(); assertThat(maybeBytes).isEmpty(); assertThat(maybeBytes2).isEmpty(); } diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index 1ed7963..df51e07 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -18,9 +18,9 @@ ****************************************************************/ package org.apache.james.util; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.Optional; import java.util.Spliterator; import java.util.stream.Stream; @@ -33,18 +33,18 @@ public class ReactorUtils { return Mono.fromRunnable(runnable).then(Mono.empty()); } - public static InputStream toInputStream(Flux<byte[]> byteArrays) { + public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) { return new StreamInputStream(byteArrays.toStream(1)); } private static class StreamInputStream extends InputStream { private static final int NO_MORE_DATA = -1; - private final Stream<byte[]> source; - private final Spliterator<byte[]> spliterator; - private Optional<ByteArrayInputStream> currentItemByteStream; + private final Stream<ByteBuffer> source; + private final Spliterator<ByteBuffer> spliterator; + private Optional<ByteBuffer> currentItemByteStream; - StreamInputStream(Stream<byte[]> source) { + StreamInputStream(Stream<ByteBuffer> source) { this.source = source; this.spliterator = source.spliterator(); this.currentItemByteStream = Optional.empty(); @@ -62,8 +62,9 @@ public class ReactorUtils { return NO_MORE_DATA; } - return currentItemByteStream.map(ByteArrayInputStream::read) - .filter(readResult -> readResult != NO_MORE_DATA) + return currentItemByteStream + .filter(ByteBuffer::hasRemaining) + .map(buffer -> buffer.get() & 0xFF) .orElseGet(this::readNextChunk); } catch (Throwable t) { source.close(); @@ -77,7 +78,7 @@ public class ReactorUtils { private void switchToNextChunk() { spliterator.tryAdvance(bytes -> - currentItemByteStream = Optional.of(new ByteArrayInputStream(bytes))); + currentItemByteStream = Optional.of(bytes)); } private Integer readNextChunk() { diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index 98bb9e1..7bdc678 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; @@ -83,11 +84,12 @@ class ReactorUtilsTest { @Test void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); - Flux<byte[]> source = Flux.range(0, 10) + Flux<ByteBuffer> source = Flux.range(0, 10) .subscribeOn(Schedulers.elastic()) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)) - .map(index -> new byte[] {(byte) (int) index}); + .map(index -> new byte[] {(byte) (int) index}) + .map(ByteBuffer::wrap); InputStream inputStream = ReactorUtils.toInputStream(source); byte[] readBytes = new byte[5]; @@ -101,8 +103,9 @@ class ReactorUtilsTest { @Test void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); - Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}) + Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}) .subscribeOn(Schedulers.elastic()) + .map(ByteBuffer::wrap) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)); @@ -118,8 +121,9 @@ class ReactorUtilsTest { @Test void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); - Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11}) + Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11}) .subscribeOn(Schedulers.elastic()) + .map(ByteBuffer::wrap) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)); @@ -135,8 +139,9 @@ class ReactorUtilsTest { @Test void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); - Flux<byte[]> source = Flux.<byte[]>empty() + Flux<ByteBuffer> source = Flux.<byte[]>empty() .subscribeOn(Schedulers.elastic()) + .map(ByteBuffer::wrap) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
