This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 412ee6850e [ENHANCEMENT] Allow custom prefetch in Flux => InputStream 
convertions
412ee6850e is described below

commit 412ee6850e53590e044a6725f7e0cd6332a94079
Author: Benoit TELLIER <[email protected]>
AuthorDate: Thu May 16 15:39:32 2024 +0200

    [ENHANCEMENT] Allow custom prefetch in Flux => InputStream convertions
---
 .../distributed-app/docs/modules/ROOT/pages/configure/jvm.adoc | 10 ++++++++++
 .../apps/distributed-app/sample-configuration/jvm.properties   |  4 ++++
 .../util/src/main/java/org/apache/james/util/ReactorUtils.java |  5 ++++-
 3 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jvm.adoc 
b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jvm.adoc
index 2885d70ff4..170869594b 100644
--- a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jvm.adoc
+++ b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jvm.adoc
@@ -93,3 +93,13 @@ james.s3.metrics.enabled=false
 ----
 To disable the S3 metrics.
 
+== Reactor Stream Prefetch
+
+Prefetch to use in Reactor to stream convertions (S3 => InputStream). Default 
to 1.
+Higher values will tend to block less often at the price of higher memory 
consumptions.
+
+Ex in `jvm.properties`
+----
+# james.reactor.inputstream.prefetch=4
+----
+
diff --git a/server/apps/distributed-app/sample-configuration/jvm.properties 
b/server/apps/distributed-app/sample-configuration/jvm.properties
index 7d7e5f084a..804561f566 100644
--- a/server/apps/distributed-app/sample-configuration/jvm.properties
+++ b/server/apps/distributed-app/sample-configuration/jvm.properties
@@ -78,3 +78,7 @@ jmx.remote.x.mlet.allow.getMBeansFromURL=false
 # Maximum size of a blob. Larger blobs will be rejected.
 # Unit supported: K, M, G, default to no unit
 #james.blob.aes.blob.max.size=100M
+
+# Prefetch to use in Reactor to stream convertions (S3 => InputStream). 
Default to 1.
+# Higher values will tend to block less often at the price of higher memory 
consumptions.
+# james.reactor.inputstream.prefetch=4
\ No newline at end of file
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 4e9766b26e..8c26be6382 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -57,6 +57,9 @@ public class ReactorUtils {
     public static final int DEFAULT_BOUNDED_ELASTIC_QUEUESIZE = 
Optional.ofNullable(System.getProperty("james.schedulers.defaultBoundedElasticQueueSize"))
         .map(Integer::parseInt)
         .orElse(100000);
+    public static final int DEFAULT_INPUT_STREAM_PREFETCH = 
Optional.ofNullable(System.getProperty("james.reactor.inputstream.prefetch"))
+        .map(Integer::parseInt)
+        .orElse(1);
     private static final int TTL_SECONDS = 60;
     private static final boolean DAEMON = true;
     public static final Scheduler BLOCKING_CALL_WRAPPER = 
Schedulers.newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, 
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
@@ -105,7 +108,7 @@ public class ReactorUtils {
     }
 
     public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) {
-        return new StreamInputStream(byteArrays.toStream(1));
+        return new 
StreamInputStream(byteArrays.toStream(DEFAULT_INPUT_STREAM_PREFETCH));
     }
 
     public static Flux<ByteBuffer> toChunks(InputStream inputStream, int 
bufferSize) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to