This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 978540c07b Refactor some event bus handlers so that they do not tightly bind to RabbitMQEventBus (#2656) 978540c07b is described below commit 978540c07b3fa7afbf2ff0d28633f896548f0a2e Author: hungphan227 <45198168+hungphan...@users.noreply.github.com> AuthorDate: Mon Mar 3 08:02:40 2025 +0700 Refactor some event bus handlers so that they do not tightly bind to RabbitMQEventBus (#2656) Co-authored-by: hung phan <hp...@linagora.com> --- .../java/org/apache/james/events/EventBus.java | 9 +++++++++ .../james/events/EventBusReconnectionHandler.java | 7 ++----- .../james/events/GroupRegistrationHandler.java | 4 ---- ...ler.java => GroupRegistrationHandlerGroup.java} | 22 +--------------------- .../james/events/KeyReconnectionHandler.java | 3 --- .../events/RabbitEventBusConsumerHealthCheck.java | 6 +++--- .../org/apache/james/events/RabbitMQEventBus.java | 2 ++ .../james/CassandraRabbitMQJamesServerMain.java | 4 ++-- .../james/DistributedPOP3JamesServerMain.java | 4 ++-- .../org/apache/james/PostgresJamesServerMain.java | 4 ++-- ...ntBusModule.java => MailboxEventBusModule.java} | 16 +++++++++++----- 11 files changed, 34 insertions(+), 47 deletions(-) diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java index 52b7ef76b4..f46ba06e05 100644 --- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java +++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java @@ -100,4 +100,13 @@ public interface EventBus { default Collection<Group> listRegisteredGroups() { return ImmutableList.of(); } + + default void start() { + } + + default void restart() { + } + + default void stop() { + } } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java index 98c0b03670..1510e635ce 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java @@ -19,8 +19,6 @@ package org.apache.james.events; -import jakarta.inject.Inject; - import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.reactivestreams.Publisher; @@ -29,10 +27,9 @@ import com.rabbitmq.client.Connection; import reactor.core.publisher.Mono; public class EventBusReconnectionHandler implements SimpleConnectionPool.ReconnectionHandler { - private final RabbitMQEventBus rabbitMQEventBus; + private final EventBus rabbitMQEventBus; - @Inject - public EventBusReconnectionHandler(RabbitMQEventBus rabbitMQEventBus) { + public EventBusReconnectionHandler(EventBus rabbitMQEventBus) { this.rabbitMQEventBus = rabbitMQEventBus; } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java index 5480af0a8f..415bc8355f 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java @@ -63,10 +63,6 @@ import reactor.util.retry.Retry; class GroupRegistrationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistrationHandler.class); - public static class GroupRegistrationHandlerGroup extends Group { - - } - static final Group GROUP = new GroupRegistrationHandlerGroup(); private final NamingStrategy namingStrategy; diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java similarity index 65% copy from event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java copy to event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java index 98c0b03670..77626bd3f2 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java @@ -19,25 +19,5 @@ package org.apache.james.events; -import jakarta.inject.Inject; - -import org.apache.james.backends.rabbitmq.SimpleConnectionPool; -import org.reactivestreams.Publisher; - -import com.rabbitmq.client.Connection; - -import reactor.core.publisher.Mono; - -public class EventBusReconnectionHandler implements SimpleConnectionPool.ReconnectionHandler { - private final RabbitMQEventBus rabbitMQEventBus; - - @Inject - public EventBusReconnectionHandler(RabbitMQEventBus rabbitMQEventBus) { - this.rabbitMQEventBus = rabbitMQEventBus; - } - - @Override - public Publisher<Void> handleReconnection(Connection connection) { - return Mono.fromRunnable(rabbitMQEventBus::restart); - } +public class GroupRegistrationHandlerGroup extends Group { } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java index 34cf26046e..aefa2ce7e9 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java @@ -26,8 +26,6 @@ import static org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete; import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable; import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive; -import jakarta.inject.Inject; - import org.apache.james.backends.rabbitmq.QueueArguments; import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.SimpleConnectionPool; @@ -47,7 +45,6 @@ public class KeyReconnectionHandler implements SimpleConnectionPool.Reconnection private final EventBusId eventBusId; private final RabbitMQConfiguration configuration; - @Inject public KeyReconnectionHandler(NamingStrategy namingStrategy, EventBusId eventBusId, RabbitMQConfiguration configuration) { this.namingStrategy = namingStrategy; this.eventBusId = eventBusId; diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java index 8478561121..ed0e462cc3 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java @@ -36,11 +36,11 @@ import reactor.core.publisher.Mono; public class RabbitEventBusConsumerHealthCheck implements HealthCheck { public static final String COMPONENT = "EventbusConsumers"; - private final RabbitMQEventBus eventBus; + private final EventBus eventBus; private final NamingStrategy namingStrategy; private final SimpleConnectionPool connectionPool; - public RabbitEventBusConsumerHealthCheck(RabbitMQEventBus eventBus, NamingStrategy namingStrategy, + public RabbitEventBusConsumerHealthCheck(EventBus eventBus, NamingStrategy namingStrategy, SimpleConnectionPool connectionPool) { this.eventBus = eventBus; this.namingStrategy = namingStrategy; @@ -65,7 +65,7 @@ public class RabbitEventBusConsumerHealthCheck implements HealthCheck { private Result check(Channel channel) { Stream<Group> groups = Stream.concat( eventBus.listRegisteredGroups().stream(), - Stream.of(new GroupRegistrationHandler.GroupRegistrationHandlerGroup())); + Stream.of(new GroupRegistrationHandlerGroup())); Optional<String> queueWithoutConsumers = groups .map(namingStrategy::workQueue) diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java index ccd8df7bac..71e3ea757d 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java @@ -83,6 +83,7 @@ public class RabbitMQEventBus implements EventBus, Startable { this.isStopping = false; } + @Override public void start() { if (!isRunning && !isStopping) { @@ -97,6 +98,7 @@ public class RabbitMQEventBus implements EventBus, Startable { } } + @Override public void restart() { keyRegistrationHandler.restart(); groupRegistrationHandler.restart(); diff --git a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java index 2abeb519fe..6d0ae07e4e 100644 --- a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java +++ b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java @@ -51,7 +51,7 @@ import org.apache.james.modules.data.CassandraSieveRepositoryModule; import org.apache.james.modules.data.CassandraUsersRepositoryModule; import org.apache.james.modules.data.CassandraVacationModule; import org.apache.james.modules.event.JMAPEventBusModule; -import org.apache.james.modules.event.RabbitMQEventBusModule; +import org.apache.james.modules.event.MailboxEventBusModule; import org.apache.james.modules.eventstore.CassandraEventStoreModule; import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule; import org.apache.james.modules.mailbox.CassandraMailboxModule; @@ -181,7 +181,7 @@ public class CassandraRabbitMQJamesServerMain implements JamesServerMain { protected static final Module MODULES = Modules.override(REQUIRE_TASK_MANAGER_MODULE, new DistributedTaskManagerModule()) .with(new RabbitMQModule(), - new RabbitMQEventBusModule(), + new MailboxEventBusModule(), new DistributedTaskSerializationModule()); public static void main(String[] args) throws Exception { diff --git a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java index 9ecac2a5fb..c1247e8474 100644 --- a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java +++ b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java @@ -57,7 +57,7 @@ import org.apache.james.modules.data.CassandraSieveRepositoryModule; import org.apache.james.modules.data.CassandraUsersRepositoryModule; import org.apache.james.modules.data.CassandraVacationModule; import org.apache.james.modules.event.JMAPEventBusModule; -import org.apache.james.modules.event.RabbitMQEventBusModule; +import org.apache.james.modules.event.MailboxEventBusModule; import org.apache.james.modules.eventstore.CassandraEventStoreModule; import org.apache.james.modules.mailbox.CassandraBlobStoreDependenciesModule; import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule; @@ -175,7 +175,7 @@ public class DistributedPOP3JamesServerMain implements JamesServerMain { .with(new RabbitMQModule(), new RabbitMQMailQueueModule(), new RabbitMailQueueRoutesModule(), - new RabbitMQEventBusModule(), + new MailboxEventBusModule(), new DistributedTaskSerializationModule()); public static void main(String[] args) throws Exception { diff --git a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java index 772d321f50..3d7df65509 100644 --- a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java +++ b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java @@ -49,7 +49,7 @@ import org.apache.james.modules.data.PostgresUsersRepositoryModule; import org.apache.james.modules.data.PostgresVacationModule; import org.apache.james.modules.data.SievePostgresRepositoryModules; import org.apache.james.modules.event.JMAPEventBusModule; -import org.apache.james.modules.event.RabbitMQEventBusModule; +import org.apache.james.modules.event.MailboxEventBusModule; import org.apache.james.modules.events.PostgresDeadLetterModule; import org.apache.james.modules.mailbox.DefaultEventModule; import org.apache.james.modules.mailbox.PostgresDeletedMessageVaultModule; @@ -231,7 +231,7 @@ public class PostgresJamesServerMain implements JamesServerMain { new ActiveMQQueueModule()); case RABBITMQ: return List.of( - Modules.override(new DefaultEventModule()).with(new RabbitMQEventBusModule()), + Modules.override(new DefaultEventModule()).with(new MailboxEventBusModule()), new RabbitMQModule(), new RabbitMQMailQueueModule(), new FakeMailQueueViewModule(), diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java similarity index 90% rename from server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java rename to server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java index 37522182a1..2e8d3edb0b 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java @@ -52,7 +52,7 @@ import com.google.inject.multibindings.ProvidesIntoSet; import reactor.rabbitmq.Sender; -public class RabbitMQEventBusModule extends AbstractModule { +public class MailboxEventBusModule extends AbstractModule { @Override protected void configure() { @@ -64,10 +64,6 @@ public class RabbitMQEventBusModule extends AbstractModule { bind(RetryBackoffConfiguration.class).toInstance(RetryBackoffConfiguration.DEFAULT); bind(EventBusId.class).toInstance(EventBusId.random()); - Multibinder<SimpleConnectionPool.ReconnectionHandler> reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), SimpleConnectionPool.ReconnectionHandler.class); - reconnectionHandlerMultibinder.addBinding().to(KeyReconnectionHandler.class); - reconnectionHandlerMultibinder.addBinding().to(EventBusReconnectionHandler.class); - Multibinder.newSetBinder(binder(), HealthCheck.class) .addBinding().to(RabbitMQMailboxEventBusDeadLetterQueueHealthCheck.class); } @@ -78,6 +74,16 @@ public class RabbitMQEventBusModule extends AbstractModule { return new RabbitEventBusConsumerHealthCheck(eventBus, namingStrategy, connectionPool); } + @ProvidesIntoSet + SimpleConnectionPool.ReconnectionHandler provideReconnectionHandler(RabbitMQEventBus eventBus) { + return new EventBusReconnectionHandler(eventBus); + } + + @ProvidesIntoSet + SimpleConnectionPool.ReconnectionHandler provideReconnectionHandler(NamingStrategy namingStrategy, EventBusId eventBusId, RabbitMQConfiguration configuration) { + return new KeyReconnectionHandler(namingStrategy, eventBusId, configuration); + } + @ProvidesIntoSet InitializationOperation workQueue(RabbitMQEventBus instance) { return InitilizationOperationBuilder --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org