This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new e1164458a7 JAMES-2937 Distributd Task manager enhancements (#2858)
e1164458a7 is described below
commit e1164458a7c1fa396bad39536c65695067127c3a
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Nov 18 08:34:32 2025 +0100
JAMES-2937 Distributd Task manager enhancements (#2858)
Because a single active consumer turns a distributed message broker
into a single-threaded bottleneck, killing throughput, parallelism,
and fault-tolerance by forcing all messages through one consumer.
---
docs/modules/servers/partials/configure/jvm.adoc | 19 ++++++++++
.../distributed/RabbitMQWorkQueue.java | 42 +++++++++++++++++-----
2 files changed, 52 insertions(+), 9 deletions(-)
diff --git a/docs/modules/servers/partials/configure/jvm.adoc
b/docs/modules/servers/partials/configure/jvm.adoc
index d8200a7d80..dd22fa8ec8 100644
--- a/docs/modules/servers/partials/configure/jvm.adoc
+++ b/docs/modules/servers/partials/configure/jvm.adoc
@@ -212,6 +212,25 @@ james.deduplicating.blobstore.thread.switch.threshold=32768
james.deduplicating.blobstore.file.threshold=10240
----
+== Configuring Distributed Task Manager
+
+Turning Distributed Task Manager single active consumer off allows for all
+James nodes to share the task execution.
+
+Defaults to `true`.
+
+----
+james.task.rabbitmq.singleActive.enabled=false
+----
+
+Distributed task manager also supports setting the QOS (concurrent number of
tasks handled):
+
+Defaults to `1`.
+
+----
+james.task.rabbitmq.qos=4
+----
+
== Allow users to have rights for shares of different domain
Typically, preventing users to obtain rights for shares of another domain is a
useful security layer.
diff --git
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 79a93d3134..d994d21ec7 100644
---
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -68,6 +68,13 @@ import reactor.util.retry.Retry;
public class RabbitMQWorkQueue implements WorkQueue {
private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitMQWorkQueue.class);
+ static final boolean SINGLE_ACTIVE =
Optional.ofNullable(System.getProperty("james.task.rabbitmq.singleActive.enabled"))
+ .map(Boolean::parseBoolean)
+ .orElse(true);
+ static final int QOS =
Optional.ofNullable(System.getProperty("james.task.rabbitmq.qos"))
+ .map(Integer::parseInt)
+ .orElse(1);
+
static final String EXCHANGE_NAME = "taskManagerWorkQueueExchange";
static final String QUEUE_NAME = "taskManagerWorkQueue";
static final String ROUTING_KEY = "taskManagerWorkQueueRoutingKey";
@@ -127,8 +134,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
Mono<AMQP.Queue.DeclareOk> declareQueue = sender
.declare(QueueSpecification.queue(QUEUE_NAME)
.durable(evaluateDurable(DURABLE,
rabbitMQConfiguration.isQuorumQueuesUsed()))
- .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder()
- .singleActiveConsumer()
+ .arguments(queueBaseArguments()
.consumerTimeout(consumerTimeout)
.build()))
.retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
@@ -142,6 +148,14 @@ public class RabbitMQWorkQueue implements WorkQueue {
.block();
}
+ private QueueArguments.Builder queueBaseArguments() {
+ if (SINGLE_ACTIVE) {
+ return rabbitMQConfiguration.workQueueArgumentsBuilder()
+ .singleActiveConsumer();
+ }
+ return rabbitMQConfiguration.workQueueArgumentsBuilder();
+ }
+
@Override
public void restart() {
closeRabbitResources();
@@ -149,13 +163,23 @@ public class RabbitMQWorkQueue implements WorkQueue {
}
private void consumeWorkqueue() {
- receiverHandle = Flux.using(
- receiverProvider::createReceiver,
- receiver -> receiver.consumeManualAck(QUEUE_NAME, new
ConsumeOptions()),
- Receiver::close)
- .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
- .concatMap(this::executeTask)
- .subscribe();
+ if (QOS > 1) {
+ receiverHandle = Flux.using(
+ receiverProvider::createReceiver,
+ receiver -> receiver.consumeManualAck(QUEUE_NAME, new
ConsumeOptions().qos(QOS)),
+ Receiver::close)
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+ .flatMap(this::executeTask, QOS)
+ .subscribe();
+ } else {
+ receiverHandle = Flux.using(
+ receiverProvider::createReceiver,
+ receiver -> receiver.consumeManualAck(QUEUE_NAME, new
ConsumeOptions()),
+ Receiver::close)
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+ .concatMap(this::executeTask)
+ .subscribe();
+ }
}
private Mono<Task.Result> executeTask(AcknowledgableDelivery delivery) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]