This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 22d8d93ee74e4052f6b191c12330a43cbc76445f Author: ducnv <duc91....@gmail.com> AuthorDate: Tue Apr 28 17:17:37 2020 +0700 JAMES-3140: Hiding technical details into class for CachedBlobStore --- .../blob/cassandra/cache/CachedBlobStore.java | 141 ++++++++++++--------- 1 file changed, 84 insertions(+), 57 deletions(-) diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java index 86aead0..04857e4 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java @@ -42,6 +42,62 @@ import com.google.common.base.Preconditions; import reactor.core.publisher.Mono; public class CachedBlobStore implements BlobStore { + + private static class ReadAheadInputStream { + + @FunctionalInterface + interface RequireStream { + RequireLength of(InputStream in); + } + + interface RequireLength { + ReadAheadInputStream length(int length) throws IOException; + } + + + static RequireStream eager() { + return in -> length -> { + //+1 is to evaluate hasMore + var stream = new PushbackInputStream(in, length + 1); + var bytes = new byte[length]; + int readByteCount = IOUtils.read(stream, bytes); + Optional<byte[]> firstBytes; + boolean hasMore; + if (readByteCount < 0) { + firstBytes = Optional.empty(); + hasMore = false; + } else { + byte[] readBytes = Arrays.copyOf(bytes, readByteCount); + hasMore = hasMore(stream); + stream.unread(readBytes); + firstBytes = Optional.of(readBytes); + } + return new ReadAheadInputStream(stream, firstBytes, hasMore); + }; + } + + private static boolean hasMore(PushbackInputStream stream) throws IOException { + int nextByte = stream.read(); + if (nextByte >= 0) { + stream.unread(nextByte); + return true; + } else { + return false; + } + } + + final PushbackInputStream in; + final Optional<byte[]> firstBytes; + final boolean hasMore; + + private ReadAheadInputStream(PushbackInputStream in, Optional<byte[]> firstBytes, boolean hasMore) { + this.in = in; + this.firstBytes = firstBytes; + this.hasMore = hasMore; + } + + } + private final BlobStoreCache cache; private final BlobStore backend; private final Integer sizeThresholdInBytes; @@ -61,8 +117,10 @@ public class CachedBlobStore implements BlobStore { .flatMap(ignored -> readFromCache(blobId) .flatMap(this::toInputStream)) .switchIfEmpty(readFromBackend(bucketName, blobId) - .map(this::toPushbackStream) - .flatMap(pushbackInputStream -> saveInCache(pushbackInputStream, blobId, bucketName))) + .flatMap(inputStream -> + Mono.fromCallable(() -> ReadAheadInputStream.eager().of(inputStream).length(sizeThresholdInBytes)) + .flatMap(readAheadInputStream -> putInCacheIfNeeded(bucketName, readAheadInputStream, blobId) + .thenReturn(readAheadInputStream.in)))) .blockOptional() .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))); } @@ -94,8 +152,7 @@ public class CachedBlobStore implements BlobStore { Preconditions.checkNotNull(inputStream, "InputStream must not be null"); if (isAbleToCache(bucketName, storagePolicy)) { - return Mono.fromCallable(() -> toPushbackStream(inputStream)) - .flatMap(pushbackInputStream -> saveInCache(bucketName, pushbackInputStream, storagePolicy)); + return saveInCache(bucketName, inputStream, storagePolicy); } return backend.save(bucketName, inputStream, storagePolicy); @@ -120,64 +177,47 @@ public class CachedBlobStore implements BlobStore { return Mono.from(backend.deleteBucket(bucketName)); } - private Optional<byte[]> fullyReadSmallStream(PushbackInputStream pushbackInputStream) throws IOException { - byte[] bytes = new byte[sizeThresholdInBytes]; - int readByteCount = IOUtils.read(pushbackInputStream, bytes); - int extraByte = pushbackInputStream.read(); - try { - if (extraByte >= 0) { - return Optional.empty(); - } - if (readByteCount < 0) { - return Optional.of(new byte[] {}); - } - return Optional.of(Arrays.copyOf(bytes, readByteCount)); - } finally { - if (extraByte >= 0) { - pushbackInputStream.unread(extraByte); - } - if (readByteCount > 0) { - pushbackInputStream.unread(bytes, 0, readByteCount); - } - } + private Mono<BlobId> saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) { + return Mono.fromCallable(() -> ReadAheadInputStream.eager().of(inputStream).length(sizeThresholdInBytes)) + .flatMap(readAhead -> saveToBackend(bucketName, storagePolicy, readAhead) + .flatMap(blobId -> putInCacheIfNeeded(bucketName, storagePolicy, readAhead, blobId) + .thenReturn(blobId))); } - private Mono<BlobId> saveInCache(BucketName bucketName, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) { - return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream)) - .flatMap(Mono::justOrEmpty) - .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy)) - .flatMap(bytes -> saveInBackend(bucketName, pushbackInputStream, storagePolicy) - .flatMap(blobId -> saveInCache(blobId, bytes).thenReturn(blobId))) - .switchIfEmpty(saveInBackend(bucketName, pushbackInputStream, storagePolicy)); + private Mono<BlobId> saveToBackend(BucketName bucketName, StoragePolicy storagePolicy, ReadAheadInputStream readAhead) { + return Mono.from(backend.save(bucketName, readAhead.in, storagePolicy)); } - private Mono<BlobId> saveInBackend(BucketName bucketName, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) { - return Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy)); + private Mono<Void> putInCacheIfNeeded(BucketName bucketName, StoragePolicy storagePolicy, ReadAheadInputStream readAhead, BlobId blobId) { + return Mono.justOrEmpty(readAhead.firstBytes) + .filter(bytes -> isAbleToCache(bucketName, readAhead, storagePolicy)) + .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes))); } - private Mono<Void> saveInCache(BlobId blobId, byte[] bytes) { - return Mono.from(cache.cache(blobId, bytes)); + private Mono<Void> putInCacheIfNeeded(BucketName bucketName, ReadAheadInputStream readAhead, BlobId blobId) { + return Mono.justOrEmpty(readAhead.firstBytes) + .filter(bytes -> isAbleToCache(readAhead, bucketName)) + .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes))); } - private Mono<InputStream> saveInCache(PushbackInputStream pushbackInputStream, BlobId blobId, BucketName bucketName) { - return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream)) - .flatMap(Mono::justOrEmpty) - .filter(bytes -> isAbleToCache(bytes, bucketName)) - .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)) - .map(ignore -> pushbackBytesArrayRead(pushbackInputStream, bytes))) - .then(Mono.just(pushbackInputStream)); + private Mono<Void> saveInCache(BlobId blobId, byte[] bytes) { + return Mono.from(cache.cache(blobId, bytes)); } private boolean isAbleToCache(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) { return isAbleToCache(bucketName, storagePolicy) && isAbleToCache(bytes); } + private boolean isAbleToCache(BucketName bucketName, ReadAheadInputStream readAhead, StoragePolicy storagePolicy) { + return isAbleToCache(bucketName, storagePolicy) && !readAhead.hasMore; + } + private boolean isAbleToCache(BucketName bucketName, StoragePolicy storagePolicy) { return backend.getDefaultBucketName().equals(bucketName) && !storagePolicy.equals(LOW_COST); } - private boolean isAbleToCache(byte[] bytes, BucketName bucketName) { - return isAbleToCache(bytes) && backend.getDefaultBucketName().equals(bucketName); + private boolean isAbleToCache(ReadAheadInputStream readAhead, BucketName bucketName) { + return !readAhead.hasMore && backend.getDefaultBucketName().equals(bucketName); } private boolean isAbleToCache(byte[] bytes) { @@ -192,10 +232,6 @@ public class CachedBlobStore implements BlobStore { return Mono.fromCallable(() -> backend.read(bucketName, blobId)); } - private PushbackInputStream toPushbackStream(InputStream inputStream) { - return new PushbackInputStream(inputStream, sizeThresholdInBytes); - } - private Mono<byte[]> readFromCache(BlobId blobId) { return Mono.from(cache.read(blobId)); } @@ -203,13 +239,4 @@ public class CachedBlobStore implements BlobStore { private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) { return Mono.from(backend.readBytes(bucketName, blobId)); } - - private Mono<Void> pushbackBytesArrayRead(PushbackInputStream pushbackInputStream, byte[] bytes) { - try { - pushbackInputStream.unread(bytes); - } catch (IOException e) { - // Ignore - } - return Mono.empty(); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org