This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ac3f32fef8a84b88728ab9b3e38b70f17dc6979d Author: Benoit Tellier <[email protected]> AuthorDate: Fri Jun 4 21:31:54 2021 +0700 [PERFORMANCE] Simplify reactive flow within CachedBlobStore --- .../blob/cassandra/cache/CachedBlobStore.java | 39 +++++++++++----------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java index bf4a1a0..9b6d0b9 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java @@ -138,17 +138,21 @@ public class CachedBlobStore implements BlobStore { if (storagePolicy == LOW_COST) { return backend.read(bucketName, blobId); } - return Mono.just(bucketName) - .filter(getDefaultBucketName()::equals) - .flatMap(defaultBucket -> readInDefaultBucket(bucketName, blobId)) - .switchIfEmpty(readFromBackend(bucketName, blobId)) + return readInputStream(bucketName, blobId) .blockOptional() .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId.asString()))); } + private Mono<InputStream> readInputStream(BucketName bucketName, BlobId blobId) { + if (bucketName.equals(getDefaultBucketName())) { + return readInDefaultBucket(bucketName, blobId); + } + return readFromBackend(bucketName, blobId); + } + private Mono<InputStream> readInDefaultBucket(BucketName bucketName, BlobId blobId) { return readFromCache(blobId) - .flatMap(this::toInputStream) + .<InputStream>map(ByteArrayInputStream::new) .switchIfEmpty(readFromBackend(bucketName, blobId) .flatMap(inputStream -> Mono.fromCallable(() -> ReadAheadInputStream.eager().of(inputStream).length(sizeThresholdInBytes)) @@ -242,8 +246,7 @@ public class CachedBlobStore implements BlobStore { public Mono<Boolean> delete(BucketName bucketName, BlobId blobId) { return Mono.from(backend.delete(bucketName, blobId)) .flatMap(deleted -> { - if (backend.getDefaultBucketName().equals(bucketName) - && deleted) { + if (backend.getDefaultBucketName().equals(bucketName) && deleted) { return Mono.from(cache.remove(blobId)).thenReturn(deleted); } return Mono.just(deleted); @@ -267,16 +270,18 @@ public class CachedBlobStore implements BlobStore { } private Mono<Void> putInCacheIfNeeded(BucketName bucketName, StoragePolicy storagePolicy, ReadAheadInputStream readAhead, BlobId blobId) { - return Mono.justOrEmpty(readAhead.firstBytes) + return readAhead.firstBytes .filter(bytes -> isAbleToCache(bucketName, readAhead, storagePolicy)) - .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes))); + .map(bytes -> Mono.from(cache.cache(blobId, bytes))) + .orElse(Mono.empty()); } private Mono<Void> putInCacheIfNeeded(BucketName bucketName, ReadAheadInputStream readAhead, BlobId blobId) { - return Mono.justOrEmpty(readAhead.firstBytes) + return readAhead.firstBytes .filter(bytes -> isAbleToCache(readAhead, bucketName)) - .doOnNext(any -> metricRetrieveMissCount.increment()) - .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes))); + .map(bytes -> Mono.fromRunnable(metricRetrieveMissCount::increment) + .then(Mono.from(cache.cache(blobId, bytes)))) + .orElse(Mono.empty()); } private Mono<Void> saveInCache(BlobId blobId, byte[] bytes) { @@ -303,10 +308,6 @@ public class CachedBlobStore implements BlobStore { return bytes.length <= sizeThresholdInBytes; } - private Mono<InputStream> toInputStream(byte[] bytes) { - return Mono.fromCallable(() -> new ByteArrayInputStream(bytes)); - } - private Mono<byte[]> readFromCache(BlobId blobId) { return Mono.from(metricFactory.decoratePublisherWithTimerMetric(BLOBSTORE_CACHED_LATENCY_METRIC_NAME, cache.read(blobId))) .doOnNext(any -> metricRetrieveHitCount.increment()); @@ -318,9 +319,7 @@ public class CachedBlobStore implements BlobStore { } private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) { - return Mono.fromCallable(() -> metricFactory.timer(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME)) - .flatMap(timer -> Mono.from(backend.readBytes(bucketName, blobId)) - .doOnSuccess(any -> timer.stopAndPublish()) - .doOnError(ObjectNotFoundException.class, any -> timer.stopAndPublish())); + return Mono.from(metricFactory.decoratePublisherWithTimerMetric(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME, + backend.readBytes(bucketName, blobId))); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
