This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ba178a4e1a02ece1885670b5d3f93e368303b4f3
Author: Quan Tran <[email protected]>
AuthorDate: Tue Dec 23 17:15:16 2025 +0700

    [IMPROVEMENT] Increase concurrency for MailboxMergingTask
---
 .../mail/task/MailboxMergingTaskRunner.java        |  3 +-
 ...dminServerTaskSerializationIntegrationTest.java | 82 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 1 deletion(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
index 478fcebdbf..d184b4672e 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
@@ -76,7 +76,8 @@ public class MailboxMergingTaskRunner {
         return cassandraMessageIdDAO.retrieveMessages(oldMailboxId, 
MessageRange.all(), Limit.unlimited())
             .map(CassandraMessageMetadata::getComposedMessageId)
             .map(ComposedMessageIdWithMetaData::getComposedMessageId)
-            .concatMap(messageId -> Mono.fromCallable(() -> 
moveMessage(newMailboxId, messageId, session, 
context)).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER))
+            .flatMap(messageId -> Mono.fromCallable(() -> 
moveMessage(newMailboxId, messageId, session, context))
+                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER), 
ReactorUtils.DEFAULT_CONCURRENCY)
             .reduce(Task.Result.COMPLETED, Task::combine)
             .block();
     }
diff --git 
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
 
b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
index 0a992d21c7..e47f6e1826 100644
--- 
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
+++ 
b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
@@ -33,6 +33,8 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.collection.IsMapWithSize.anEmptyMap;
 
 import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -57,6 +59,7 @@ import org.apache.james.junit.categories.BasicFeature;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.events.GenericGroup;
 import org.apache.james.mailbox.events.MailboxEvents.MailboxAdded;
+import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.inmemory.InMemoryId;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.MailboxConstants;
@@ -75,6 +78,7 @@ import 
org.apache.james.modules.blobstore.BlobStoreConfiguration;
 import org.apache.james.probe.DataProbe;
 import org.apache.james.server.core.MailImpl;
 import org.apache.james.task.TaskManager;
+import org.apache.james.util.ReactorUtils;
 import org.apache.james.utils.DataProbeImpl;
 import org.apache.james.utils.MailRepositoryProbeImpl;
 import org.apache.james.utils.WebAdminGuiceProbe;
@@ -87,6 +91,7 @@ import 
org.apache.james.webadmin.routes.MailRepositoriesRoutes;
 import org.apache.james.webadmin.routes.TasksRoutes;
 import org.apache.james.webadmin.service.ClearMailboxContentTask;
 import org.apache.james.webadmin.vault.routes.DeletedMessagesVaultRoutes;
+import org.awaitility.Awaitility;
 import org.eclipse.jetty.http.HttpStatus;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeEach;
@@ -96,9 +101,23 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import io.restassured.RestAssured;
 import io.restassured.http.ContentType;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 @Tag(BasicFeature.TAG)
 class RabbitMQWebAdminServerTaskSerializationIntegrationTest {
+    private static class MailboxIdPair {
+        private final MailboxId origin;
+        private final MailboxId destination;
+
+        private MailboxIdPair(MailboxId origin, MailboxId destination) {
+            this.origin = origin;
+            this.destination = destination;
+        }
+    }
+
+    private static final int TASK_COUNT = 60;
+    private static final int MESSAGES_PER_ORIGIN_MAILBOX = 20;
 
     @RegisterExtension
     static JamesServerExtension testExtension = new 
JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir ->
@@ -192,6 +211,69 @@ class 
RabbitMQWebAdminServerTaskSerializationIntegrationTest {
             .body("additionalInformation.unsubscribedCount", is(0));
     }
 
+    @Test
+    void 
multipleMailboxMergingTasksShouldCompleteQuicklyDespitePollingInterval(GuiceJamesServer
 server) throws Exception {
+        server.getProbe(DataProbeImpl.class).addUser(USERNAME, "secret");
+        mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME, 
MailboxConstants.INBOX);
+
+        List<MailboxIdPair> mailboxes = Flux.range(0, TASK_COUNT)
+            .flatMap(this::provisionMailbox, ReactorUtils.LOW_CONCURRENCY)
+            .collectList()
+            .block();
+
+        List<String> taskIds = mailboxes.stream()
+            .map(pair -> given()
+                .contentType(ContentType.JSON)
+                .body("{" +
+                    "\"mergeOrigin\":\"" + pair.origin.serialize() + "\"," +
+                    "\"mergeDestination\":\"" + pair.destination.serialize() + 
"\"" +
+                    "}")
+                .post(CassandraMailboxMergingRoutes.BASE)
+                .then()
+                .statusCode(CREATED_201)
+                .extract()
+                .jsonPath()
+                .getString("taskId"))
+            .toList();
+
+        // expect all tasks to complete within 2 minutes
+        Awaitility.await()
+            .atMost(Duration.ofMinutes(2))
+            .untilAsserted(() -> taskIds.forEach(taskId -> given()
+                .basePath(TasksRoutes.BASE)
+                .get(taskId + "/await")
+                .then()
+                .statusCode(HttpStatus.OK_200)
+                .body("status", is("completed"))));
+    }
+
+    private Mono<MailboxIdPair> provisionMailbox(Integer number) {
+        return Mono.fromCallable(() -> {
+            String originMailboxName = "origin-" + number;
+            MailboxId originMailboxId = 
mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME, 
originMailboxName);
+            MailboxId destinationMailboxId = 
mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME, 
"destination-" + number);
+
+            try {
+                appendMessages(USERNAME, originMailboxName, 
MESSAGES_PER_ORIGIN_MAILBOX);
+            } catch (MailboxException e) {
+                throw new RuntimeException(e);
+            }
+            return new MailboxIdPair(originMailboxId, destinationMailboxId);
+        });
+    }
+
+    private void appendMessages(String username, String mailboxName, int 
messageCount) throws MailboxException {
+        MailboxPath mailboxPath = new 
MailboxPath(MailboxConstants.USER_NAMESPACE, Username.of(username), 
mailboxName);
+        Flux.range(0, messageCount)
+            .flatMap(i -> Mono.fromCallable(() -> 
mailboxProbe.appendMessage(username, mailboxPath,
+                new ByteArrayInputStream(("Subject: test " + i + 
"\r\n\r\nbody").getBytes(StandardCharsets.UTF_8)),
+                new Date(),
+                false,
+                new Flags())), ReactorUtils.DEFAULT_CONCURRENCY)
+            .collectList()
+            .block();
+    }
+
     @Test
     void singleMailboxReindexingShouldComplete(GuiceJamesServer server) {
         MailboxId mailboxId = server.getProbe(MailboxProbeImpl.class)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to