This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ba178a4e1a02ece1885670b5d3f93e368303b4f3 Author: Quan Tran <[email protected]> AuthorDate: Tue Dec 23 17:15:16 2025 +0700 [IMPROVEMENT] Increase concurrency for MailboxMergingTask --- .../mail/task/MailboxMergingTaskRunner.java | 3 +- ...dminServerTaskSerializationIntegrationTest.java | 82 ++++++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java index 478fcebdbf..d184b4672e 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java @@ -76,7 +76,8 @@ public class MailboxMergingTaskRunner { return cassandraMessageIdDAO.retrieveMessages(oldMailboxId, MessageRange.all(), Limit.unlimited()) .map(CassandraMessageMetadata::getComposedMessageId) .map(ComposedMessageIdWithMetaData::getComposedMessageId) - .concatMap(messageId -> Mono.fromCallable(() -> moveMessage(newMailboxId, messageId, session, context)).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)) + .flatMap(messageId -> Mono.fromCallable(() -> moveMessage(newMailboxId, messageId, session, context)) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER), ReactorUtils.DEFAULT_CONCURRENCY) .reduce(Task.Result.COMPLETED, Task::combine) .block(); } diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java index 0a992d21c7..e47f6e1826 100644 --- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java +++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java @@ -33,6 +33,8 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.collection.IsMapWithSize.anEmptyMap; import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Date; import java.util.List; import java.util.Map; @@ -57,6 +59,7 @@ import org.apache.james.junit.categories.BasicFeature; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.events.GenericGroup; import org.apache.james.mailbox.events.MailboxEvents.MailboxAdded; +import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.inmemory.InMemoryId; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.MailboxConstants; @@ -75,6 +78,7 @@ import org.apache.james.modules.blobstore.BlobStoreConfiguration; import org.apache.james.probe.DataProbe; import org.apache.james.server.core.MailImpl; import org.apache.james.task.TaskManager; +import org.apache.james.util.ReactorUtils; import org.apache.james.utils.DataProbeImpl; import org.apache.james.utils.MailRepositoryProbeImpl; import org.apache.james.utils.WebAdminGuiceProbe; @@ -87,6 +91,7 @@ import org.apache.james.webadmin.routes.MailRepositoriesRoutes; import org.apache.james.webadmin.routes.TasksRoutes; import org.apache.james.webadmin.service.ClearMailboxContentTask; import org.apache.james.webadmin.vault.routes.DeletedMessagesVaultRoutes; +import org.awaitility.Awaitility; import org.eclipse.jetty.http.HttpStatus; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; @@ -96,9 +101,23 @@ import org.junit.jupiter.api.extension.RegisterExtension; import io.restassured.RestAssured; import io.restassured.http.ContentType; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @Tag(BasicFeature.TAG) class RabbitMQWebAdminServerTaskSerializationIntegrationTest { + private static class MailboxIdPair { + private final MailboxId origin; + private final MailboxId destination; + + private MailboxIdPair(MailboxId origin, MailboxId destination) { + this.origin = origin; + this.destination = destination; + } + } + + private static final int TASK_COUNT = 60; + private static final int MESSAGES_PER_ORIGIN_MAILBOX = 20; @RegisterExtension static JamesServerExtension testExtension = new JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir -> @@ -192,6 +211,69 @@ class RabbitMQWebAdminServerTaskSerializationIntegrationTest { .body("additionalInformation.unsubscribedCount", is(0)); } + @Test + void multipleMailboxMergingTasksShouldCompleteQuicklyDespitePollingInterval(GuiceJamesServer server) throws Exception { + server.getProbe(DataProbeImpl.class).addUser(USERNAME, "secret"); + mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME, MailboxConstants.INBOX); + + List<MailboxIdPair> mailboxes = Flux.range(0, TASK_COUNT) + .flatMap(this::provisionMailbox, ReactorUtils.LOW_CONCURRENCY) + .collectList() + .block(); + + List<String> taskIds = mailboxes.stream() + .map(pair -> given() + .contentType(ContentType.JSON) + .body("{" + + "\"mergeOrigin\":\"" + pair.origin.serialize() + "\"," + + "\"mergeDestination\":\"" + pair.destination.serialize() + "\"" + + "}") + .post(CassandraMailboxMergingRoutes.BASE) + .then() + .statusCode(CREATED_201) + .extract() + .jsonPath() + .getString("taskId")) + .toList(); + + // expect all tasks to complete within 2 minutes + Awaitility.await() + .atMost(Duration.ofMinutes(2)) + .untilAsserted(() -> taskIds.forEach(taskId -> given() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await") + .then() + .statusCode(HttpStatus.OK_200) + .body("status", is("completed")))); + } + + private Mono<MailboxIdPair> provisionMailbox(Integer number) { + return Mono.fromCallable(() -> { + String originMailboxName = "origin-" + number; + MailboxId originMailboxId = mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME, originMailboxName); + MailboxId destinationMailboxId = mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME, "destination-" + number); + + try { + appendMessages(USERNAME, originMailboxName, MESSAGES_PER_ORIGIN_MAILBOX); + } catch (MailboxException e) { + throw new RuntimeException(e); + } + return new MailboxIdPair(originMailboxId, destinationMailboxId); + }); + } + + private void appendMessages(String username, String mailboxName, int messageCount) throws MailboxException { + MailboxPath mailboxPath = new MailboxPath(MailboxConstants.USER_NAMESPACE, Username.of(username), mailboxName); + Flux.range(0, messageCount) + .flatMap(i -> Mono.fromCallable(() -> mailboxProbe.appendMessage(username, mailboxPath, + new ByteArrayInputStream(("Subject: test " + i + "\r\n\r\nbody").getBytes(StandardCharsets.UTF_8)), + new Date(), + false, + new Flags())), ReactorUtils.DEFAULT_CONCURRENCY) + .collectList() + .block(); + } + @Test void singleMailboxReindexingShouldComplete(GuiceJamesServer server) { MailboxId mailboxId = server.getProbe(MailboxProbeImpl.class) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
