This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new beb23d48e3 Reactor blocking call fixes (#2016)
beb23d48e3 is described below
commit beb23d48e36615669a86a0b9c8326995523eb51f
Author: Benoit TELLIER <[email protected]>
AuthorDate: Mon Feb 19 07:24:03 2024 +0100
Reactor blocking call fixes (#2016)
---
.../src/main/java/org/apache/james/mailbox/store/MessageStorer.java | 2 ++
.../src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java | 2 ++
.../java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java | 2 +-
.../apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala | 1 +
4 files changed, 6 insertions(+), 1 deletion(-)
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageStorer.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageStorer.java
index 7b63a6b5bc..2d2a00e508 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageStorer.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageStorer.java
@@ -54,6 +54,7 @@ import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
public interface MessageStorer {
@@ -105,6 +106,7 @@ public interface MessageStorer {
return mapperFactory.getMessageMapper(session)
.executeReactive(
storeAttachments(messageId, content, maybeMessage, session)
+ .subscribeOn(Schedulers.boundedElastic())
.zipWith(threadIdGuessingAlgorithm.guessThreadIdReactive(messageId,
mimeMessageId, inReplyTo, references, subject, session))
.flatMap(Throwing.function((Tuple2<List<MessageAttachmentMetadata>, ThreadId>
pair) -> {
List<MessageAttachmentMetadata> attachments =
pair.getT1();
diff --git
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
index 2fa7aff50f..3d75499fd4 100644
---
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
+++
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
@@ -41,6 +41,7 @@ import com.google.common.io.FileBackedOutputStream;
import com.google.crypto.tink.subtle.AesGcmHkdfStreaming;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class AESBlobStoreDAO implements BlobStoreDAO {
// For now, aligned with with MimeMessageInputStreamSource file threshold,
detailed benchmarking might be conducted to challenge this choice
@@ -115,6 +116,7 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
() -> encrypt(inputStream),
fileBackedOutputStream ->
Mono.from(underlying.save(bucketName, blobId,
fileBackedOutputStream.asByteSource())),
Throwing.consumer(FileBackedOutputStream::reset))
+ .subscribeOn(Schedulers.boundedElastic())
.onErrorMap(e -> new ObjectStoreIOException("Exception occurred
while saving bytearray", e));
}
diff --git
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index b24460176f..326e9fae65 100644
---
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -326,6 +326,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO,
Startable, Closeable {
BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
return Mono.fromCallable(content::size)
+ .subscribeOn(Schedulers.boundedElastic())
.flatMap(contentLength ->
Mono.usingWhen(Mono.fromCallable(content::openStream).subscribeOn(Schedulers.boundedElastic()),
stream -> save(resolvedBucketName, blobId, stream,
contentLength),
@@ -333,7 +334,6 @@ public class S3BlobStoreDAO implements BlobStoreDAO,
Startable, Closeable {
.retryWhen(createBucketOnRetry(resolvedBucketName))
.onErrorMap(IOException.class, e -> new
ObjectStoreIOException("Error saving blob", e))
.onErrorMap(SdkClientException.class, e -> new
ObjectStoreIOException("Error saving blob", e))
- .publishOn(Schedulers.parallel())
.then();
}
diff --git
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
index e7bd582714..ff88a59b2b 100644
---
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
+++
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
@@ -77,6 +77,7 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO:
BlobStoreDAO,
sourceSupplier.asJava,
((fileBackedOutputStream: FileBackedOutputStream) =>
fileBackedOutputStream.reset()).asJava,
DeDuplicationBlobStore.LAZY_RESOURCE_CLEANUP)
+ .subscribeOn(Schedulers.boundedElastic())
}
private def saveAndGenerateBlobId(bucketName: BucketName,
hashingInputStream: HashingInputStream, fileBackedOutputStream:
FileBackedOutputStream): SMono[BlobId] =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]