This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 1d62e6ab84 [FIX] MailReceptionCheck should not call sink concurrently
1d62e6ab84 is described below
commit 1d62e6ab842d6dc6ebdef7a621c40f1f1dadb3e8
Author: Benoit TELLIER <[email protected]>
AuthorDate: Thu Feb 1 12:57:03 2024 +0100
[FIX] MailReceptionCheck should not call sink concurrently
By pre-filtering events to the one that interest us we avoid data race.
---
.../james/healthcheck/MailReceptionCheck.java | 76 +++++++++++++---------
1 file changed, 47 insertions(+), 29 deletions(-)
diff --git
a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
index 8d865ee097..72efbd91c2 100644
---
a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
+++
b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
@@ -69,6 +69,9 @@ import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
public class MailReceptionCheck implements HealthCheck {
+
+ public static final ComponentName COMPONENT_NAME = new
ComponentName("MailReceptionCheck");
+
public static class Configuration {
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
public static final Configuration DEFAULT = new
Configuration(Optional.empty(), DEFAULT_TIMEOUT);
@@ -159,23 +162,47 @@ public class MailReceptionCheck implements HealthCheck {
}
public static class AwaitReceptionListener implements
EventListener.ReactiveEventListener {
- private final Sinks.Many<Added> sink;
-
- public AwaitReceptionListener() {
- sink = Sinks.many().multicast().onBackpressureBuffer();
+ private final Sinks.Many<Result> sink;
+ private final MessageManager mailbox;
+ private final Content content;
+ private final MailboxSession session;
+
+ public AwaitReceptionListener(MessageManager mailbox, Content content,
MailboxSession session) {
+ this.mailbox = mailbox;
+ this.content = content;
+ this.session = session;
+ this.sink = Sinks.many().multicast().onBackpressureBuffer();
}
@Override
public Publisher<Void> reactiveEvent(Event event) {
if (event instanceof Added) {
- return Mono.fromRunnable(() -> sink.emitNext((Added) event,
FAIL_FAST))
- .subscribeOn(Schedulers.boundedElastic())
+ Added added = (Added) event;
+ return checkReceived(added)
+ .flatMap(result -> Mono.fromRunnable(() ->
sink.emitNext(result, FAIL_FAST))
+ .subscribeOn(Schedulers.boundedElastic()))
.then();
}
return Mono.empty();
}
- public Flux<Added> addedEvents() {
+ private Mono<Result> checkReceived(Added added) {
+ return Flux.fromIterable(added.getUids())
+ .flatMap(uid ->
Flux.from(mailbox.getMessagesReactive(MessageRange.one(uid),
FetchGroup.FULL_CONTENT, session)))
+ .filter(Throwing.predicate(messageResult ->
IOUtils.toString(messageResult.getBody().getInputStream(),
StandardCharsets.US_ASCII)
+ .contains(content.asString())))
+ // Cleanup our testing mail
+ .concatMap(messageResult ->
Mono.from(mailbox.deleteReactive(ImmutableList.of(messageResult.getUid()),
session))
+ .onErrorResume(e -> {
+ LOGGER.warn("Failed to delete Health check testing
email", e);
+ return Mono.empty();
+ })
+ .thenReturn(messageResult))
+ .map(any -> Result.healthy(COMPONENT_NAME))
+ .next();
+ }
+
+ public Flux<Result> results() {
return sink.asFlux();
}
}
@@ -199,7 +226,7 @@ public class MailReceptionCheck implements HealthCheck {
@Override
public ComponentName componentName() {
- return new ComponentName("MailReceptionCheck");
+ return COMPONENT_NAME;
}
@Override
@@ -210,15 +237,18 @@ public class MailReceptionCheck implements HealthCheck {
}
private Mono<Result> check(Username username) {
+ Content content = Content.generate();
MailboxSession session = mailboxManager.createSystemSession(username);
- AwaitReceptionListener listener = new AwaitReceptionListener();
return retrieveInbox(username, session)
- .flatMap(mailbox -> Mono.usingWhen(
- Mono.from(eventBus.register(listener, new
MailboxIdRegistrationKey(mailbox.getId()))),
- registration -> sendMail(username)
- .flatMap(content -> checkReceived(session, listener,
mailbox, content)),
- Registration::unregister))
+ .flatMap(mailbox -> {
+ AwaitReceptionListener listener = new
AwaitReceptionListener(mailbox, content, session);
+ return Mono.usingWhen(
+ Mono.from(eventBus.register(listener, new
MailboxIdRegistrationKey(mailbox.getId()))),
+ registration -> sendMail(username, content)
+ .flatMap(any -> checkReceived(listener)),
+ Registration::unregister);
+ })
.timeout(configuration.getTimeout(), Mono.error(() -> new
RuntimeException("HealthCheck email was not received after " +
configuration.getTimeout().toMillis() + "ms")))
.onErrorResume(e -> {
LOGGER.error("Mail reception check failed", e);
@@ -234,25 +264,13 @@ public class MailReceptionCheck implements HealthCheck {
.then(Mono.from(mailboxManager.getMailboxReactive(mailboxPath,
session))));
}
- private Mono<Result> checkReceived(MailboxSession session,
AwaitReceptionListener listener, MessageManager mailbox, Content content) {
- return listener.addedEvents()
- .flatMapIterable(Added::getUids)
- .flatMap(uid ->
Flux.from(mailbox.getMessagesReactive(MessageRange.one(uid),
FetchGroup.FULL_CONTENT, session)))
- .filter(Throwing.predicate(messageResult ->
IOUtils.toString(messageResult.getBody().getInputStream(),
StandardCharsets.US_ASCII)
- .contains(content.asString())))
- // Cleanup our testing mail
- .concatMap(messageResult ->
Mono.from(mailbox.deleteReactive(ImmutableList.of(messageResult.getUid()),
session))
- .onErrorResume(e -> {
- LOGGER.warn("Failed to delete Health check testing email",
e);
- return Mono.empty();
- })
- .thenReturn(messageResult))
+ private Mono<Result> checkReceived(AwaitReceptionListener listener) {
+ return listener.results()
.map(any -> Result.healthy(componentName()))
.next();
}
- private Mono<Content> sendMail(Username username) {
- Content content = Content.generate();
+ private Mono<Content> sendMail(Username username, Content content) {
return Mono.fromCallable(() ->
usersRepository.getMailAddressFor(username))
.flatMap(address ->
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]