This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 412ee6850e [ENHANCEMENT] Allow custom prefetch in Flux => InputStream
convertions
412ee6850e is described below
commit 412ee6850e53590e044a6725f7e0cd6332a94079
Author: Benoit TELLIER <[email protected]>
AuthorDate: Thu May 16 15:39:32 2024 +0200
[ENHANCEMENT] Allow custom prefetch in Flux => InputStream convertions
---
.../distributed-app/docs/modules/ROOT/pages/configure/jvm.adoc | 10 ++++++++++
.../apps/distributed-app/sample-configuration/jvm.properties | 4 ++++
.../util/src/main/java/org/apache/james/util/ReactorUtils.java | 5 ++++-
3 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jvm.adoc
b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jvm.adoc
index 2885d70ff4..170869594b 100644
--- a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jvm.adoc
+++ b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jvm.adoc
@@ -93,3 +93,13 @@ james.s3.metrics.enabled=false
----
To disable the S3 metrics.
+== Reactor Stream Prefetch
+
+Prefetch to use in Reactor to stream convertions (S3 => InputStream). Default
to 1.
+Higher values will tend to block less often at the price of higher memory
consumptions.
+
+Ex in `jvm.properties`
+----
+# james.reactor.inputstream.prefetch=4
+----
+
diff --git a/server/apps/distributed-app/sample-configuration/jvm.properties
b/server/apps/distributed-app/sample-configuration/jvm.properties
index 7d7e5f084a..804561f566 100644
--- a/server/apps/distributed-app/sample-configuration/jvm.properties
+++ b/server/apps/distributed-app/sample-configuration/jvm.properties
@@ -78,3 +78,7 @@ jmx.remote.x.mlet.allow.getMBeansFromURL=false
# Maximum size of a blob. Larger blobs will be rejected.
# Unit supported: K, M, G, default to no unit
#james.blob.aes.blob.max.size=100M
+
+# Prefetch to use in Reactor to stream convertions (S3 => InputStream).
Default to 1.
+# Higher values will tend to block less often at the price of higher memory
consumptions.
+# james.reactor.inputstream.prefetch=4
\ No newline at end of file
diff --git
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 4e9766b26e..8c26be6382 100644
---
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -57,6 +57,9 @@ public class ReactorUtils {
public static final int DEFAULT_BOUNDED_ELASTIC_QUEUESIZE =
Optional.ofNullable(System.getProperty("james.schedulers.defaultBoundedElasticQueueSize"))
.map(Integer::parseInt)
.orElse(100000);
+ public static final int DEFAULT_INPUT_STREAM_PREFETCH =
Optional.ofNullable(System.getProperty("james.reactor.inputstream.prefetch"))
+ .map(Integer::parseInt)
+ .orElse(1);
private static final int TTL_SECONDS = 60;
private static final boolean DAEMON = true;
public static final Scheduler BLOCKING_CALL_WRAPPER =
Schedulers.newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE,
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
@@ -105,7 +108,7 @@ public class ReactorUtils {
}
public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) {
- return new StreamInputStream(byteArrays.toStream(1));
+ return new
StreamInputStream(byteArrays.toStream(DEFAULT_INPUT_STREAM_PREFETCH));
}
public static Flux<ByteBuffer> toChunks(InputStream inputStream, int
bufferSize) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]