This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 340b31f0da JAMES-4081 Implement MailToAllUsers mailet (#2469) 340b31f0da is described below commit 340b31f0da3a94ed4a6cbf200a69e67277b2a578 Author: Trần Hồng Quân <55171818+quantranhong1...@users.noreply.github.com> AuthorDate: Mon Oct 28 10:47:24 2024 +0700 JAMES-4081 Implement MailToAllUsers mailet (#2469) --- docs/modules/servers/partials/MailToAllUsers.adoc | 25 +++++ .../james/transport/mailets/MailToAllUsers.java | 124 +++++++++++++++++++++ .../transport/mailets/MailToAllUsersTest.java | 106 ++++++++++++++++++ 3 files changed, 255 insertions(+) diff --git a/docs/modules/servers/partials/MailToAllUsers.adoc b/docs/modules/servers/partials/MailToAllUsers.adoc new file mode 100644 index 0000000000..caf5516d3f --- /dev/null +++ b/docs/modules/servers/partials/MailToAllUsers.adoc @@ -0,0 +1,25 @@ +=== MailToAllUsers + +A mailet that helps to email to all users in the system. The emails are sent in batches to manage +the load. The first batch is sent directly, while the remaining batches are sent asynchronously. + +==== Configuration parameters + +*batchSize*:: +The number of recipients to include in each batch. Optional, default to 100. + +==== Sample configuration + +[source,xml] +---- +<matcher name="notify-matcher" match="org.apache.james.mailetcontainer.impl.matchers.And"> + <matcher match="SenderIs=ad...@gov.org"/> + <matcher match="RecipientIs=a...@gov.org"/> +</matcher> +<mailet match="notify-matcher" class="MailToAllUsers"> + <batchSize>100</batchSize> +</mailet> +---- + + + diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java new file mode 100644 index 0000000000..46a28d42c0 --- /dev/null +++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java @@ -0,0 +1,124 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.transport.mailets; + +import java.util.Optional; +import java.util.function.Function; + +import jakarta.inject.Inject; +import jakarta.mail.MessagingException; + +import org.apache.james.core.MailAddress; +import org.apache.james.core.Username; +import org.apache.james.lifecycle.api.LifecycleUtil; +import org.apache.james.user.api.UsersRepository; +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMailet; +import org.reactivestreams.Publisher; + +import com.github.fge.lambdas.Throwing; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; + +/** + * A mailet that helps to email all users in the system. The emails are sent in batches to manage + * the load. The first batch is sent directly, while the remaining batches are sent asynchronously. + * The batch size can be configured via the <b>batchSize</b> parameter (optional, defaults to 100). + * + * <h3>Configuration</h3> + * <pre><code> + * {@code + * <matcher name="notify-matcher" match="org.apache.james.mailetcontainer.impl.matchers.And"> + * <matcher match="SenderIs=ad...@gov.org"/> + * <matcher match="RecipientIs=a...@gov.org"/> + * </matcher> + * <mailet match="notify-matcher" class="MailToAllUsers"> + * <batchSize>100</batchSize> + * </mailet> + * } + * </code></pre> + * + */ +public class MailToAllUsers extends GenericMailet { + private static final int DEFAULT_BATCH_SIZE = 100; + private final UsersRepository usersRepository; + private int batchSize; + + @Inject + public MailToAllUsers(UsersRepository usersRepository) { + this.usersRepository = usersRepository; + } + + @Override + public void init() throws MessagingException { + batchSize = Integer.parseInt(Optional.ofNullable(getInitParameter("batchSize")) + .orElse(String.valueOf(DEFAULT_BATCH_SIZE))); + } + + @Override + public void service(Mail mail) throws MessagingException { + Flux.from(usersRepository.listReactive()) + .map(Throwing.function(Username::asMailAddress)) + .window(batchSize) + .index() + .flatMap(sendMail(mail)) + .then() + .block(); + } + + private Function<Tuple2<Long, Flux<MailAddress>>, Publisher<Void>> sendMail(Mail mail) { + return tuple -> { + boolean firstBatch = tuple.getT1() == 0; + if (firstBatch) { + return sendMailToFirstRecipientsBatchDirectly(mail, tuple.getT2()); + } + return sendMailToRemainingRecipientsBatchAsynchronously(mail, tuple.getT2()); + }; + } + + private Mono<Void> sendMailToFirstRecipientsBatchDirectly(Mail mail, Flux<MailAddress> firstRecipientsBatch) { + return firstRecipientsBatch + .collectList() + .flatMap(recipients -> Mono.fromRunnable(() -> mail.setRecipients(recipients))) + .then(); + } + + private Mono<Void> sendMailToRemainingRecipientsBatchAsynchronously(Mail mail, Flux<MailAddress> remainingRecipientsBatch) { + return remainingRecipientsBatch + .collectList() + .flatMap(recipients -> Mono.fromRunnable(Throwing.runnable(() -> { + Mail duplicateMail = mail.duplicate(); + try { + duplicateMail.setRecipients(recipients); + getMailetContext().sendMail(duplicateMail); + } finally { + LifecycleUtil.dispose(duplicateMail); + } + }))) + .then(); + } + + @Override + public String getMailetName() { + return "MailToAllUsers"; + } +} diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java new file mode 100644 index 0000000000..be7dd91657 --- /dev/null +++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java @@ -0,0 +1,106 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.transport.mailets; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import jakarta.mail.MessagingException; + +import org.apache.james.core.Username; +import org.apache.james.core.builder.MimeMessageBuilder; +import org.apache.james.user.api.UsersRepository; +import org.apache.mailet.Mail; +import org.apache.mailet.MailetContext; +import org.apache.mailet.base.test.FakeMail; +import org.apache.mailet.base.test.FakeMailetConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import reactor.core.publisher.Flux; + +class MailToAllUsersTest { + private static final Username USER_1 = Username.of("us...@gov.org"); + private static final Username USER_2 = Username.of("us...@gov.org"); + private static final Username USER_3 = Username.of("us...@gov.org"); + private static final Username USER_4 = Username.of("us...@gov.org"); + + private UsersRepository usersRepository; + private MailetContext mailetContext; + private MailToAllUsers testee; + + @BeforeEach + void setUp() { + usersRepository = mock(UsersRepository.class); + mailetContext = Mockito.mock(MailetContext.class); + testee = new MailToAllUsers(usersRepository); + } + + @Test + void firstUsersBatchShouldBeSentDirectly() throws Exception { + when(usersRepository.listReactive()) + .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4)); + + testee.init(FakeMailetConfig.builder() + .mailetContext(mailetContext) + .setProperty("batchSize", "2") + .build()); + + Mail originalMail = createMail(); + testee.service(originalMail); + + assertThat(originalMail.getRecipients()) + .containsExactlyInAnyOrder(USER_1.asMailAddress(), USER_2.asMailAddress()); + } + + @Test + void remainingUsersBatchesShouldBeSentAsync() throws Exception { + when(usersRepository.listReactive()) + .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4)); + + testee.init(FakeMailetConfig.builder() + .mailetContext(mailetContext) + .setProperty("batchSize", "2") + .build()); + + Mail originalMail = createMail(); + testee.service(originalMail); + + ArgumentCaptor<Mail> mailCaptor = ArgumentCaptor.forClass(Mail.class); + verify(mailetContext).sendMail(mailCaptor.capture()); + assertThat(mailCaptor.getValue().getRecipients()) + .containsExactly(USER_3.asMailAddress(), USER_4.asMailAddress()); + } + + private Mail createMail() throws MessagingException { + return FakeMail.builder() + .name("name") + .mimeMessage(MimeMessageBuilder.mimeMessageBuilder() + .setSender("ad...@gov.org") + .setSubject("Hi all") + .addToRecipient("a...@gov.org")) + .recipient("a...@gov.org") + .build(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org