This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d62e6ab84 [FIX] MailReceptionCheck should not call sink concurrently
1d62e6ab84 is described below

commit 1d62e6ab842d6dc6ebdef7a621c40f1f1dadb3e8
Author: Benoit TELLIER <[email protected]>
AuthorDate: Thu Feb 1 12:57:03 2024 +0100

    [FIX] MailReceptionCheck should not call sink concurrently
    
    By pre-filtering events to the one that interest us we avoid data race.
---
 .../james/healthcheck/MailReceptionCheck.java      | 76 +++++++++++++---------
 1 file changed, 47 insertions(+), 29 deletions(-)

diff --git 
a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
 
b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
index 8d865ee097..72efbd91c2 100644
--- 
a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
+++ 
b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
@@ -69,6 +69,9 @@ import reactor.core.publisher.Sinks;
 import reactor.core.scheduler.Schedulers;
 
 public class MailReceptionCheck implements HealthCheck {
+
+    public static final ComponentName COMPONENT_NAME = new 
ComponentName("MailReceptionCheck");
+
     public static class Configuration {
         private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
         public static final Configuration DEFAULT = new 
Configuration(Optional.empty(), DEFAULT_TIMEOUT);
@@ -159,23 +162,47 @@ public class MailReceptionCheck implements HealthCheck {
     }
 
     public static class AwaitReceptionListener implements 
EventListener.ReactiveEventListener {
-        private final Sinks.Many<Added> sink;
-
-        public AwaitReceptionListener() {
-            sink = Sinks.many().multicast().onBackpressureBuffer();
+        private final Sinks.Many<Result> sink;
+        private final MessageManager mailbox;
+        private final Content content;
+        private final MailboxSession session;
+
+        public AwaitReceptionListener(MessageManager mailbox, Content content, 
MailboxSession session) {
+            this.mailbox = mailbox;
+            this.content = content;
+            this.session = session;
+            this.sink = Sinks.many().multicast().onBackpressureBuffer();
         }
 
         @Override
         public Publisher<Void> reactiveEvent(Event event) {
             if (event instanceof Added) {
-                return Mono.fromRunnable(() -> sink.emitNext((Added) event, 
FAIL_FAST))
-                    .subscribeOn(Schedulers.boundedElastic())
+                Added added = (Added) event;
+                return checkReceived(added)
+                    .flatMap(result -> Mono.fromRunnable(() -> 
sink.emitNext(result, FAIL_FAST))
+                        .subscribeOn(Schedulers.boundedElastic()))
                     .then();
             }
             return Mono.empty();
         }
 
-        public Flux<Added> addedEvents() {
+        private Mono<Result> checkReceived(Added added) {
+            return Flux.fromIterable(added.getUids())
+                .flatMap(uid -> 
Flux.from(mailbox.getMessagesReactive(MessageRange.one(uid), 
FetchGroup.FULL_CONTENT, session)))
+                .filter(Throwing.predicate(messageResult -> 
IOUtils.toString(messageResult.getBody().getInputStream(), 
StandardCharsets.US_ASCII)
+                    .contains(content.asString())))
+                // Cleanup our testing mail
+                .concatMap(messageResult -> 
Mono.from(mailbox.deleteReactive(ImmutableList.of(messageResult.getUid()), 
session))
+                    .onErrorResume(e -> {
+                        LOGGER.warn("Failed to delete Health check testing 
email", e);
+                        return Mono.empty();
+                    })
+                    .thenReturn(messageResult))
+                .map(any -> Result.healthy(COMPONENT_NAME))
+                .next();
+        }
+
+        public Flux<Result> results() {
             return sink.asFlux();
         }
     }
@@ -199,7 +226,7 @@ public class MailReceptionCheck implements HealthCheck {
 
     @Override
     public ComponentName componentName() {
-        return new ComponentName("MailReceptionCheck");
+        return COMPONENT_NAME;
     }
 
     @Override
@@ -210,15 +237,18 @@ public class MailReceptionCheck implements HealthCheck {
     }
 
     private Mono<Result> check(Username username) {
+        Content content = Content.generate();
         MailboxSession session = mailboxManager.createSystemSession(username);
-        AwaitReceptionListener listener = new AwaitReceptionListener();
 
         return retrieveInbox(username, session)
-            .flatMap(mailbox -> Mono.usingWhen(
-                Mono.from(eventBus.register(listener, new 
MailboxIdRegistrationKey(mailbox.getId()))),
-                registration -> sendMail(username)
-                    .flatMap(content -> checkReceived(session, listener, 
mailbox, content)),
-                Registration::unregister))
+            .flatMap(mailbox -> {
+                AwaitReceptionListener listener = new 
AwaitReceptionListener(mailbox, content, session);
+                return Mono.usingWhen(
+                    Mono.from(eventBus.register(listener, new 
MailboxIdRegistrationKey(mailbox.getId()))),
+                    registration -> sendMail(username, content)
+                        .flatMap(any -> checkReceived(listener)),
+                    Registration::unregister);
+            })
             .timeout(configuration.getTimeout(), Mono.error(() -> new 
RuntimeException("HealthCheck email was not received after " + 
configuration.getTimeout().toMillis() + "ms")))
             .onErrorResume(e -> {
                 LOGGER.error("Mail reception check failed", e);
@@ -234,25 +264,13 @@ public class MailReceptionCheck implements HealthCheck {
                 .then(Mono.from(mailboxManager.getMailboxReactive(mailboxPath, 
session))));
     }
 
-    private Mono<Result> checkReceived(MailboxSession session, 
AwaitReceptionListener listener, MessageManager mailbox, Content content) {
-        return listener.addedEvents()
-            .flatMapIterable(Added::getUids)
-            .flatMap(uid -> 
Flux.from(mailbox.getMessagesReactive(MessageRange.one(uid), 
FetchGroup.FULL_CONTENT, session)))
-            .filter(Throwing.predicate(messageResult -> 
IOUtils.toString(messageResult.getBody().getInputStream(), 
StandardCharsets.US_ASCII)
-                .contains(content.asString())))
-            // Cleanup our testing mail
-            .concatMap(messageResult -> 
Mono.from(mailbox.deleteReactive(ImmutableList.of(messageResult.getUid()), 
session))
-                .onErrorResume(e -> {
-                    LOGGER.warn("Failed to delete Health check testing email", 
e);
-                    return Mono.empty();
-                })
-                .thenReturn(messageResult))
+    private Mono<Result> checkReceived(AwaitReceptionListener listener) {
+        return listener.results()
             .map(any -> Result.healthy(componentName()))
             .next();
     }
 
-    private Mono<Content> sendMail(Username username) {
-        Content content = Content.generate();
+    private Mono<Content> sendMail(Username username, Content content) {
 
         return Mono.fromCallable(() -> 
usersRepository.getMailAddressFor(username))
             .flatMap(address ->


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to