This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 102cc9cacc [ENHANCEMENT] AESBlobStoreDAO::readBytes: remove 1 file copy
102cc9cacc is described below

commit 102cc9caccc6f61c9c066fa028edc5953f99d001
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Apr 2 00:08:12 2024 +0200

    [ENHANCEMENT] AESBlobStoreDAO::readBytes: remove 1 file copy
---
 .../org/apache/james/blob/aes/AESBlobStoreDAO.java | 22 +++++-----------------
 1 file changed, 5 insertions(+), 17 deletions(-)

diff --git 
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
 
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
index a73495e375..b8d7478f0e 100644
--- 
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
+++ 
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
@@ -23,8 +23,6 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
 import java.security.GeneralSecurityException;
 import java.util.Collection;
 import java.util.Optional;
@@ -36,6 +34,7 @@ import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.api.ObjectStoreIOException;
+import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.Size;
 import org.reactivestreams.Publisher;
 import org.slf4j.LoggerFactory;
@@ -104,18 +103,12 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
             throw new RuntimeException(blobId.asString() + " exceeded maximum 
blob size");
         }
 
-        FileBackedOutputStream encryptedContent = new 
FileBackedOutputStream(FILE_THRESHOLD_DECRYPT);
-        WritableByteChannel channel = Channels.newChannel(encryptedContent);
-
-        return Flux.from(ciphertext.getContent())
-            .publishOn(Schedulers.boundedElastic())
-            .doOnNext(Throwing.consumer(channel::write))
-            .then(Mono.fromCallable(() -> {
+        return Mono.fromCallable(() -> {
                 try {
                     FileBackedOutputStream decryptedContent = new 
FileBackedOutputStream(FILE_THRESHOLD_DECRYPT);
                     try {
                         CountingOutputStream countingOutputStream = new 
CountingOutputStream(decryptedContent);
-                        try (InputStream ciphertextStream = 
encryptedContent.asByteSource().openStream()) {
+                        try (InputStream ciphertextStream = 
ReactorUtils.toInputStream(Flux.from(ciphertext.getContent()))) {
                             
decrypt(ciphertextStream).transferTo(countingOutputStream);
                         }
                         try (InputStream decryptedStream = 
decryptedContent.asByteSource().openStream()) {
@@ -130,12 +123,7 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
                         .error("OOM reading {}. Blob size read so far {} 
bytes.", blobId.asString(), ciphertext.getSize());
                     throw error;
                 }
-            }))
-            .doFinally(Throwing.consumer(any -> {
-                channel.close();
-                encryptedContent.reset();
-                encryptedContent.close();
-            }));
+            });
     }
 
     @Override
@@ -157,7 +145,7 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
     public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
         return Mono.from(underlying.readAsByteSource(bucketName, blobId))
             .flatMap(reactiveByteSource -> 
decryptReactiveByteSource(reactiveByteSource, blobId))
-            .subscribeOn(Schedulers.boundedElastic());
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to