This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new beb23d48e3 Reactor blocking call fixes (#2016)
beb23d48e3 is described below

commit beb23d48e36615669a86a0b9c8326995523eb51f
Author: Benoit TELLIER <[email protected]>
AuthorDate: Mon Feb 19 07:24:03 2024 +0100

    Reactor blocking call fixes (#2016)
---
 .../src/main/java/org/apache/james/mailbox/store/MessageStorer.java     | 2 ++
 .../src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java        | 2 ++
 .../java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java    | 2 +-
 .../apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala | 1 +
 4 files changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageStorer.java 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageStorer.java
index 7b63a6b5bc..2d2a00e508 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageStorer.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageStorer.java
@@ -54,6 +54,7 @@ import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 
 public interface MessageStorer {
@@ -105,6 +106,7 @@ public interface MessageStorer {
             return mapperFactory.getMessageMapper(session)
                 .executeReactive(
                     storeAttachments(messageId, content, maybeMessage, session)
+                        .subscribeOn(Schedulers.boundedElastic())
                         
.zipWith(threadIdGuessingAlgorithm.guessThreadIdReactive(messageId, 
mimeMessageId, inReplyTo, references, subject, session))
                         
.flatMap(Throwing.function((Tuple2<List<MessageAttachmentMetadata>, ThreadId> 
pair) -> {
                             List<MessageAttachmentMetadata> attachments = 
pair.getT1();
diff --git 
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
 
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
index 2fa7aff50f..3d75499fd4 100644
--- 
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
+++ 
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
@@ -41,6 +41,7 @@ import com.google.common.io.FileBackedOutputStream;
 import com.google.crypto.tink.subtle.AesGcmHkdfStreaming;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class AESBlobStoreDAO implements BlobStoreDAO {
     // For now, aligned with with MimeMessageInputStreamSource file threshold, 
detailed benchmarking might be conducted to challenge this choice
@@ -115,6 +116,7 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
                 () -> encrypt(inputStream),
                 fileBackedOutputStream -> 
Mono.from(underlying.save(bucketName, blobId, 
fileBackedOutputStream.asByteSource())),
                 Throwing.consumer(FileBackedOutputStream::reset))
+            .subscribeOn(Schedulers.boundedElastic())
             .onErrorMap(e -> new ObjectStoreIOException("Exception occurred 
while saving bytearray", e));
     }
 
diff --git 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index b24460176f..326e9fae65 100644
--- 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -326,6 +326,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, 
Startable, Closeable {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
         return Mono.fromCallable(content::size)
+            .subscribeOn(Schedulers.boundedElastic())
             .flatMap(contentLength ->
                 
Mono.usingWhen(Mono.fromCallable(content::openStream).subscribeOn(Schedulers.boundedElastic()),
                     stream -> save(resolvedBucketName, blobId, stream, 
contentLength),
@@ -333,7 +334,6 @@ public class S3BlobStoreDAO implements BlobStoreDAO, 
Startable, Closeable {
             .retryWhen(createBucketOnRetry(resolvedBucketName))
             .onErrorMap(IOException.class, e -> new 
ObjectStoreIOException("Error saving blob", e))
             .onErrorMap(SdkClientException.class, e -> new 
ObjectStoreIOException("Error saving blob", e))
-            .publishOn(Schedulers.parallel())
             .then();
     }
 
diff --git 
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
 
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
index e7bd582714..ff88a59b2b 100644
--- 
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
+++ 
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
@@ -77,6 +77,7 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: 
BlobStoreDAO,
       sourceSupplier.asJava,
       ((fileBackedOutputStream: FileBackedOutputStream) => 
fileBackedOutputStream.reset()).asJava,
       DeDuplicationBlobStore.LAZY_RESOURCE_CLEANUP)
+      .subscribeOn(Schedulers.boundedElastic())
   }
 
   private def saveAndGenerateBlobId(bucketName: BucketName, 
hashingInputStream: HashingInputStream, fileBackedOutputStream: 
FileBackedOutputStream): SMono[BlobId] =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to