This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new e1164458a7 JAMES-2937 Distributd Task manager enhancements (#2858)
e1164458a7 is described below

commit e1164458a7c1fa396bad39536c65695067127c3a
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Nov 18 08:34:32 2025 +0100

    JAMES-2937 Distributd Task manager enhancements (#2858)
    
    Because a single active consumer turns a distributed message broker
    into a single-threaded bottleneck, killing throughput, parallelism,
    and fault-tolerance by forcing all messages through one consumer.
---
 docs/modules/servers/partials/configure/jvm.adoc   | 19 ++++++++++
 .../distributed/RabbitMQWorkQueue.java             | 42 +++++++++++++++++-----
 2 files changed, 52 insertions(+), 9 deletions(-)

diff --git a/docs/modules/servers/partials/configure/jvm.adoc 
b/docs/modules/servers/partials/configure/jvm.adoc
index d8200a7d80..dd22fa8ec8 100644
--- a/docs/modules/servers/partials/configure/jvm.adoc
+++ b/docs/modules/servers/partials/configure/jvm.adoc
@@ -212,6 +212,25 @@ james.deduplicating.blobstore.thread.switch.threshold=32768
 james.deduplicating.blobstore.file.threshold=10240
 ----
 
+== Configuring Distributed Task Manager
+
+Turning Distributed Task Manager single active consumer off allows for all
+James nodes to share the task execution.
+
+Defaults to `true`.
+
+----
+james.task.rabbitmq.singleActive.enabled=false
+----
+
+Distributed task manager also supports setting the QOS (concurrent number of 
tasks handled):
+
+Defaults to `1`.
+
+----
+james.task.rabbitmq.qos=4
+----
+
 == Allow users to have rights for shares of different domain
 
 Typically, preventing users to obtain rights for shares of another domain is a 
useful security layer.
diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 79a93d3134..d994d21ec7 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -68,6 +68,13 @@ import reactor.util.retry.Retry;
 public class RabbitMQWorkQueue implements WorkQueue {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RabbitMQWorkQueue.class);
 
+    static final boolean SINGLE_ACTIVE = 
Optional.ofNullable(System.getProperty("james.task.rabbitmq.singleActive.enabled"))
+        .map(Boolean::parseBoolean)
+        .orElse(true);
+    static final int QOS = 
Optional.ofNullable(System.getProperty("james.task.rabbitmq.qos"))
+        .map(Integer::parseInt)
+        .orElse(1);
+
     static final String EXCHANGE_NAME = "taskManagerWorkQueueExchange";
     static final String QUEUE_NAME = "taskManagerWorkQueue";
     static final String ROUTING_KEY = "taskManagerWorkQueueRoutingKey";
@@ -127,8 +134,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
         Mono<AMQP.Queue.DeclareOk> declareQueue = sender
             .declare(QueueSpecification.queue(QUEUE_NAME)
                 .durable(evaluateDurable(DURABLE, 
rabbitMQConfiguration.isQuorumQueuesUsed()))
-                .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder()
-                    .singleActiveConsumer()
+                .arguments(queueBaseArguments()
                     .consumerTimeout(consumerTimeout)
                     .build()))
             .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
@@ -142,6 +148,14 @@ public class RabbitMQWorkQueue implements WorkQueue {
             .block();
     }
 
+    private QueueArguments.Builder queueBaseArguments() {
+        if (SINGLE_ACTIVE) {
+            return rabbitMQConfiguration.workQueueArgumentsBuilder()
+                .singleActiveConsumer();
+        }
+        return rabbitMQConfiguration.workQueueArgumentsBuilder();
+    }
+
     @Override
     public void restart() {
         closeRabbitResources();
@@ -149,13 +163,23 @@ public class RabbitMQWorkQueue implements WorkQueue {
     }
 
     private void consumeWorkqueue() {
-        receiverHandle = Flux.using(
-                receiverProvider::createReceiver,
-                receiver -> receiver.consumeManualAck(QUEUE_NAME, new 
ConsumeOptions()),
-                Receiver::close)
-            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
-            .concatMap(this::executeTask)
-            .subscribe();
+        if (QOS > 1) {
+            receiverHandle =  Flux.using(
+                    receiverProvider::createReceiver,
+                    receiver -> receiver.consumeManualAck(QUEUE_NAME, new 
ConsumeOptions().qos(QOS)),
+                    Receiver::close)
+                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+                .flatMap(this::executeTask, QOS)
+                .subscribe();
+        } else {
+            receiverHandle =  Flux.using(
+                    receiverProvider::createReceiver,
+                    receiver -> receiver.consumeManualAck(QUEUE_NAME, new 
ConsumeOptions()),
+                    Receiver::close)
+                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+                .concatMap(this::executeTask)
+                .subscribe();
+        }
     }
 
     private Mono<Task.Result> executeTask(AcknowledgableDelivery delivery) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to