This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 340b31f0da JAMES-4081 Implement MailToAllUsers mailet (#2469)
340b31f0da is described below

commit 340b31f0da3a94ed4a6cbf200a69e67277b2a578
Author: Trần Hồng Quân <55171818+quantranhong1...@users.noreply.github.com>
AuthorDate: Mon Oct 28 10:47:24 2024 +0700

    JAMES-4081 Implement MailToAllUsers mailet (#2469)
---
 docs/modules/servers/partials/MailToAllUsers.adoc  |  25 +++++
 .../james/transport/mailets/MailToAllUsers.java    | 124 +++++++++++++++++++++
 .../transport/mailets/MailToAllUsersTest.java      | 106 ++++++++++++++++++
 3 files changed, 255 insertions(+)

diff --git a/docs/modules/servers/partials/MailToAllUsers.adoc 
b/docs/modules/servers/partials/MailToAllUsers.adoc
new file mode 100644
index 0000000000..caf5516d3f
--- /dev/null
+++ b/docs/modules/servers/partials/MailToAllUsers.adoc
@@ -0,0 +1,25 @@
+=== MailToAllUsers
+
+A mailet that helps to email to all users in the system. The emails are sent 
in batches to manage
+the load. The first batch is sent directly, while the remaining batches are 
sent asynchronously.
+
+==== Configuration parameters
+
+*batchSize*::
+The number of recipients to include in each batch. Optional, default to 100.
+
+==== Sample configuration
+
+[source,xml]
+----
+<matcher name="notify-matcher" 
match="org.apache.james.mailetcontainer.impl.matchers.And">
+    <matcher match="SenderIs=ad...@gov.org"/>
+    <matcher match="RecipientIs=a...@gov.org"/>
+</matcher>
+<mailet match="notify-matcher" class="MailToAllUsers">
+    <batchSize>100</batchSize>
+</mailet>
+----
+
+
+
diff --git 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
new file mode 100644
index 0000000000..46a28d42c0
--- /dev/null
+++ 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
@@ -0,0 +1,124 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import jakarta.inject.Inject;
+import jakarta.mail.MessagingException;
+
+import org.apache.james.core.MailAddress;
+import org.apache.james.core.Username;
+import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+import org.reactivestreams.Publisher;
+
+import com.github.fge.lambdas.Throwing;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+
+/**
+ * A mailet that helps to email all users in the system. The emails are sent 
in batches to manage
+ * the load. The first batch is sent directly, while the remaining batches are 
sent asynchronously.
+ * The batch size can be configured via the <b>batchSize</b> parameter 
(optional, defaults to 100).
+ *
+ * <h3>Configuration</h3>
+ * <pre><code>
+ * {@code
+ * <matcher name="notify-matcher" 
match="org.apache.james.mailetcontainer.impl.matchers.And">
+ *     <matcher match="SenderIs=ad...@gov.org"/>
+ *     <matcher match="RecipientIs=a...@gov.org"/>
+ * </matcher>
+ * <mailet match="notify-matcher" class="MailToAllUsers">
+ *     <batchSize>100</batchSize>
+ * </mailet>
+ * }
+ * </code></pre>
+ *
+ */
+public class MailToAllUsers extends GenericMailet {
+    private static final int DEFAULT_BATCH_SIZE = 100;
+    private final UsersRepository usersRepository;
+    private int batchSize;
+
+    @Inject
+    public MailToAllUsers(UsersRepository usersRepository) {
+        this.usersRepository = usersRepository;
+    }
+
+    @Override
+    public void init() throws MessagingException {
+        batchSize = 
Integer.parseInt(Optional.ofNullable(getInitParameter("batchSize"))
+            .orElse(String.valueOf(DEFAULT_BATCH_SIZE)));
+    }
+
+    @Override
+    public void service(Mail mail) throws MessagingException {
+        Flux.from(usersRepository.listReactive())
+            .map(Throwing.function(Username::asMailAddress))
+            .window(batchSize)
+            .index()
+            .flatMap(sendMail(mail))
+            .then()
+            .block();
+    }
+
+    private Function<Tuple2<Long, Flux<MailAddress>>, Publisher<Void>> 
sendMail(Mail mail) {
+        return tuple -> {
+            boolean firstBatch = tuple.getT1() == 0;
+            if (firstBatch) {
+                return sendMailToFirstRecipientsBatchDirectly(mail, 
tuple.getT2());
+            }
+            return sendMailToRemainingRecipientsBatchAsynchronously(mail, 
tuple.getT2());
+        };
+    }
+
+    private Mono<Void> sendMailToFirstRecipientsBatchDirectly(Mail mail, 
Flux<MailAddress> firstRecipientsBatch) {
+        return firstRecipientsBatch
+            .collectList()
+            .flatMap(recipients -> Mono.fromRunnable(() -> 
mail.setRecipients(recipients)))
+            .then();
+    }
+
+    private Mono<Void> sendMailToRemainingRecipientsBatchAsynchronously(Mail 
mail, Flux<MailAddress> remainingRecipientsBatch) {
+        return remainingRecipientsBatch
+            .collectList()
+            .flatMap(recipients -> Mono.fromRunnable(Throwing.runnable(() -> {
+                Mail duplicateMail = mail.duplicate();
+                try {
+                    duplicateMail.setRecipients(recipients);
+                    getMailetContext().sendMail(duplicateMail);
+                } finally {
+                    LifecycleUtil.dispose(duplicateMail);
+                }
+            })))
+            .then();
+    }
+
+    @Override
+    public String getMailetName() {
+        return "MailToAllUsers";
+    }
+}
diff --git 
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
 
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
new file mode 100644
index 0000000000..be7dd91657
--- /dev/null
+++ 
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
@@ -0,0 +1,106 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import jakarta.mail.MessagingException;
+
+import org.apache.james.core.Username;
+import org.apache.james.core.builder.MimeMessageBuilder;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.mailet.Mail;
+import org.apache.mailet.MailetContext;
+import org.apache.mailet.base.test.FakeMail;
+import org.apache.mailet.base.test.FakeMailetConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import reactor.core.publisher.Flux;
+
+class MailToAllUsersTest {
+    private static final Username USER_1 = Username.of("us...@gov.org");
+    private static final Username USER_2 = Username.of("us...@gov.org");
+    private static final Username USER_3 = Username.of("us...@gov.org");
+    private static final Username USER_4 = Username.of("us...@gov.org");
+
+    private UsersRepository usersRepository;
+    private MailetContext mailetContext;
+    private MailToAllUsers testee;
+
+    @BeforeEach
+    void setUp() {
+        usersRepository = mock(UsersRepository.class);
+        mailetContext = Mockito.mock(MailetContext.class);
+        testee = new MailToAllUsers(usersRepository);
+    }
+
+    @Test
+    void firstUsersBatchShouldBeSentDirectly() throws Exception {
+        when(usersRepository.listReactive())
+            .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
+
+        testee.init(FakeMailetConfig.builder()
+            .mailetContext(mailetContext)
+            .setProperty("batchSize", "2")
+            .build());
+
+        Mail originalMail = createMail();
+        testee.service(originalMail);
+
+        assertThat(originalMail.getRecipients())
+            .containsExactlyInAnyOrder(USER_1.asMailAddress(), 
USER_2.asMailAddress());
+    }
+
+    @Test
+    void remainingUsersBatchesShouldBeSentAsync() throws Exception {
+        when(usersRepository.listReactive())
+            .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
+
+        testee.init(FakeMailetConfig.builder()
+            .mailetContext(mailetContext)
+            .setProperty("batchSize", "2")
+            .build());
+
+        Mail originalMail = createMail();
+        testee.service(originalMail);
+
+        ArgumentCaptor<Mail> mailCaptor = ArgumentCaptor.forClass(Mail.class);
+        verify(mailetContext).sendMail(mailCaptor.capture());
+        assertThat(mailCaptor.getValue().getRecipients())
+            .containsExactly(USER_3.asMailAddress(), USER_4.asMailAddress());
+    }
+
+    private Mail createMail() throws MessagingException {
+        return FakeMail.builder()
+            .name("name")
+            .mimeMessage(MimeMessageBuilder.mimeMessageBuilder()
+                .setSender("ad...@gov.org")
+                .setSubject("Hi all")
+                .addToRecipient("a...@gov.org"))
+            .recipient("a...@gov.org")
+            .build();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to