This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.9.x
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit df4fe56f9d23f172a5ec062de7a686d8616528b0
Author: Quan Tran <[email protected]>
AuthorDate: Tue Oct 28 16:07:03 2025 +0700

    JAMES-4148 RunRuleOnAllMailboxesRoute: avoid blocking call when submitting 
tasks
    
    `runRulesOnAllUsersMailboxShouldComplete` tests failed:
    15:45:02.194 [ERROR] s.h.m.GeneralError -
    com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out 
after PT2S
    
    {
        "statusCode": 500,
        "type": "ServerError",
        "message": "WebAdmin encountered an unexpected internal error",
        "details": null
    }
---
 ...itMQWebAdminServerTaskSerializationIntegrationTest.java |  2 +-
 .../webadmin/data/jmap/RunRuleOnAllMailboxesRoute.java     | 14 ++++++++------
 2 files changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
 
b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
index acca84a319..32a5a89011 100644
--- 
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
+++ 
b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
@@ -841,7 +841,7 @@ class 
RabbitMQWebAdminServerTaskSerializationIntegrationTest {
             .getList(".");
 
         assertThat(list)
-            .hasSize(3)
+            .hasSize(2)
             .first()
             .satisfies(map -> assertThat(map).hasSize(2)
                 .containsKeys("taskId")
diff --git 
a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RunRuleOnAllMailboxesRoute.java
 
b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RunRuleOnAllMailboxesRoute.java
index 15c01c64e8..da8863fd78 100644
--- 
a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RunRuleOnAllMailboxesRoute.java
+++ 
b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RunRuleOnAllMailboxesRoute.java
@@ -33,9 +33,9 @@ import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.task.Task;
-import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
 import org.apache.james.user.api.UsersRepository;
+import org.apache.james.util.ReactorUtils;
 import org.apache.james.webadmin.data.jmap.dto.UserTask;
 import org.apache.james.webadmin.routes.ConditionalRoute;
 import org.apache.james.webadmin.tasks.TaskRegistrationKey;
@@ -57,7 +57,7 @@ import spark.Request;
 import spark.Response;
 
 public class RunRuleOnAllMailboxesRoute implements ConditionalRoute {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(RunRulesOnMailboxRoutes.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RunRuleOnAllMailboxesRoute.class);
 
     private static final TaskRegistrationKey TRIAGE = 
TaskRegistrationKey.of("triage");
     private static final String ACTION_QUERY_PARAM = "action";
@@ -124,15 +124,17 @@ public class RunRuleOnAllMailboxesRoute implements 
ConditionalRoute {
     private List<UserTask> runRulesOnAllUsersMailbox(MailboxName mailboxName, 
Rules rules) {
         return Flux.from(usersRepository.listReactive())
             .filterWhen(username -> mailboxForUserExists(username, 
mailboxName))
-            .map(username -> runRulesOnUserMailbox(username, mailboxName, 
rules))
+            .flatMap(username -> runRulesOnUserMailbox(username, mailboxName, 
rules))
             .collectList()
             .block();
     }
 
-    private UserTask runRulesOnUserMailbox(Username username, MailboxName 
mailboxName, Rules rules) {
+    private Mono<UserTask> runRulesOnUserMailbox(Username username, 
MailboxName mailboxName, Rules rules) {
         Task task = new RunRulesOnMailboxTask(username, mailboxName, rules, 
runRulesOnMailboxService);
-        TaskId taskId = taskManager.submit(task);
-        return new UserTask(username, taskId);
+
+        return Mono.fromCallable(() -> taskManager.submit(task))
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+            .map(taskId -> new UserTask(username, taskId));
     }
 
     private void actionPrecondition(Request request) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to