This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 978540c07b Refactor some event bus handlers so that they do not 
tightly bind to RabbitMQEventBus (#2656)
978540c07b is described below

commit 978540c07b3fa7afbf2ff0d28633f896548f0a2e
Author: hungphan227 <45198168+hungphan...@users.noreply.github.com>
AuthorDate: Mon Mar 3 08:02:40 2025 +0700

    Refactor some event bus handlers so that they do not tightly bind to 
RabbitMQEventBus (#2656)
    
    Co-authored-by: hung phan <hp...@linagora.com>
---
 .../java/org/apache/james/events/EventBus.java     |  9 +++++++++
 .../james/events/EventBusReconnectionHandler.java  |  7 ++-----
 .../james/events/GroupRegistrationHandler.java     |  4 ----
 ...ler.java => GroupRegistrationHandlerGroup.java} | 22 +---------------------
 .../james/events/KeyReconnectionHandler.java       |  3 ---
 .../events/RabbitEventBusConsumerHealthCheck.java  |  6 +++---
 .../org/apache/james/events/RabbitMQEventBus.java  |  2 ++
 .../james/CassandraRabbitMQJamesServerMain.java    |  4 ++--
 .../james/DistributedPOP3JamesServerMain.java      |  4 ++--
 .../org/apache/james/PostgresJamesServerMain.java  |  4 ++--
 ...ntBusModule.java => MailboxEventBusModule.java} | 16 +++++++++++-----
 11 files changed, 34 insertions(+), 47 deletions(-)

diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java 
b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
index 52b7ef76b4..f46ba06e05 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
@@ -100,4 +100,13 @@ public interface EventBus {
     default Collection<Group> listRegisteredGroups() {
         return ImmutableList.of();
     }
+
+    default void start() {
+    }
+
+    default void restart() {
+    }
+
+    default void stop() {
+    }
 }
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
index 98c0b03670..1510e635ce 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
@@ -19,8 +19,6 @@
 
 package org.apache.james.events;
 
-import jakarta.inject.Inject;
-
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.reactivestreams.Publisher;
 
@@ -29,10 +27,9 @@ import com.rabbitmq.client.Connection;
 import reactor.core.publisher.Mono;
 
 public class EventBusReconnectionHandler implements 
SimpleConnectionPool.ReconnectionHandler {
-    private final RabbitMQEventBus rabbitMQEventBus;
+    private final EventBus rabbitMQEventBus;
 
-    @Inject
-    public EventBusReconnectionHandler(RabbitMQEventBus rabbitMQEventBus) {
+    public EventBusReconnectionHandler(EventBus rabbitMQEventBus) {
         this.rabbitMQEventBus = rabbitMQEventBus;
     }
 
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index 5480af0a8f..415bc8355f 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -63,10 +63,6 @@ import reactor.util.retry.Retry;
 class GroupRegistrationHandler {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(GroupRegistrationHandler.class);
 
-    public static class GroupRegistrationHandlerGroup extends Group {
-
-    }
-
     static final Group GROUP = new GroupRegistrationHandlerGroup();
 
     private final NamingStrategy namingStrategy;
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java
similarity index 65%
copy from 
event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
copy to 
event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java
index 98c0b03670..77626bd3f2 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java
@@ -19,25 +19,5 @@
 
 package org.apache.james.events;
 
-import jakarta.inject.Inject;
-
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
-import org.reactivestreams.Publisher;
-
-import com.rabbitmq.client.Connection;
-
-import reactor.core.publisher.Mono;
-
-public class EventBusReconnectionHandler implements 
SimpleConnectionPool.ReconnectionHandler {
-    private final RabbitMQEventBus rabbitMQEventBus;
-
-    @Inject
-    public EventBusReconnectionHandler(RabbitMQEventBus rabbitMQEventBus) {
-        this.rabbitMQEventBus = rabbitMQEventBus;
-    }
-
-    @Override
-    public Publisher<Void> handleReconnection(Connection connection) {
-        return Mono.fromRunnable(rabbitMQEventBus::restart);
-    }
+public class GroupRegistrationHandlerGroup extends Group {
 }
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
index 34cf26046e..aefa2ce7e9 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
@@ -26,8 +26,6 @@ import static 
org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete;
 import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable;
 import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive;
 
-import jakarta.inject.Inject;
-
 import org.apache.james.backends.rabbitmq.QueueArguments;
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
@@ -47,7 +45,6 @@ public class KeyReconnectionHandler implements 
SimpleConnectionPool.Reconnection
     private final EventBusId eventBusId;
     private final RabbitMQConfiguration configuration;
 
-    @Inject
     public KeyReconnectionHandler(NamingStrategy namingStrategy, EventBusId 
eventBusId, RabbitMQConfiguration configuration) {
         this.namingStrategy = namingStrategy;
         this.eventBusId = eventBusId;
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
index 8478561121..ed0e462cc3 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
@@ -36,11 +36,11 @@ import reactor.core.publisher.Mono;
 public class RabbitEventBusConsumerHealthCheck implements HealthCheck {
     public static final String COMPONENT = "EventbusConsumers";
 
-    private final RabbitMQEventBus eventBus;
+    private final EventBus eventBus;
     private final NamingStrategy namingStrategy;
     private final SimpleConnectionPool connectionPool;
 
-    public RabbitEventBusConsumerHealthCheck(RabbitMQEventBus eventBus, 
NamingStrategy namingStrategy,
+    public RabbitEventBusConsumerHealthCheck(EventBus eventBus, NamingStrategy 
namingStrategy,
                                              SimpleConnectionPool 
connectionPool) {
         this.eventBus = eventBus;
         this.namingStrategy = namingStrategy;
@@ -65,7 +65,7 @@ public class RabbitEventBusConsumerHealthCheck implements 
HealthCheck {
     private Result check(Channel channel) {
         Stream<Group> groups = Stream.concat(
             eventBus.listRegisteredGroups().stream(),
-            Stream.of(new 
GroupRegistrationHandler.GroupRegistrationHandlerGroup()));
+            Stream.of(new GroupRegistrationHandlerGroup()));
 
         Optional<String> queueWithoutConsumers = groups
             .map(namingStrategy::workQueue)
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index ccd8df7bac..71e3ea757d 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -83,6 +83,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
         this.isStopping = false;
     }
 
+    @Override
     public void start() {
         if (!isRunning && !isStopping) {
 
@@ -97,6 +98,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
         }
     }
 
+    @Override
     public void restart() {
         keyRegistrationHandler.restart();
         groupRegistrationHandler.restart();
diff --git 
a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
 
b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index 2abeb519fe..6d0ae07e4e 100644
--- 
a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++ 
b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -51,7 +51,7 @@ import 
org.apache.james.modules.data.CassandraSieveRepositoryModule;
 import org.apache.james.modules.data.CassandraUsersRepositoryModule;
 import org.apache.james.modules.data.CassandraVacationModule;
 import org.apache.james.modules.event.JMAPEventBusModule;
-import org.apache.james.modules.event.RabbitMQEventBusModule;
+import org.apache.james.modules.event.MailboxEventBusModule;
 import org.apache.james.modules.eventstore.CassandraEventStoreModule;
 import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule;
 import org.apache.james.modules.mailbox.CassandraMailboxModule;
@@ -181,7 +181,7 @@ public class CassandraRabbitMQJamesServerMain implements 
JamesServerMain {
 
     protected static final Module MODULES = 
Modules.override(REQUIRE_TASK_MANAGER_MODULE, new 
DistributedTaskManagerModule())
         .with(new RabbitMQModule(),
-            new RabbitMQEventBusModule(),
+            new MailboxEventBusModule(),
             new DistributedTaskSerializationModule());
 
     public static void main(String[] args) throws Exception {
diff --git 
a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
 
b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
index 9ecac2a5fb..c1247e8474 100644
--- 
a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
+++ 
b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
@@ -57,7 +57,7 @@ import 
org.apache.james.modules.data.CassandraSieveRepositoryModule;
 import org.apache.james.modules.data.CassandraUsersRepositoryModule;
 import org.apache.james.modules.data.CassandraVacationModule;
 import org.apache.james.modules.event.JMAPEventBusModule;
-import org.apache.james.modules.event.RabbitMQEventBusModule;
+import org.apache.james.modules.event.MailboxEventBusModule;
 import org.apache.james.modules.eventstore.CassandraEventStoreModule;
 import org.apache.james.modules.mailbox.CassandraBlobStoreDependenciesModule;
 import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule;
@@ -175,7 +175,7 @@ public class DistributedPOP3JamesServerMain implements 
JamesServerMain {
         .with(new RabbitMQModule(),
             new RabbitMQMailQueueModule(),
             new RabbitMailQueueRoutesModule(),
-            new RabbitMQEventBusModule(),
+            new MailboxEventBusModule(),
             new DistributedTaskSerializationModule());
 
     public static void main(String[] args) throws Exception {
diff --git 
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
 
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
index 772d321f50..3d7df65509 100644
--- 
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
+++ 
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
@@ -49,7 +49,7 @@ import 
org.apache.james.modules.data.PostgresUsersRepositoryModule;
 import org.apache.james.modules.data.PostgresVacationModule;
 import org.apache.james.modules.data.SievePostgresRepositoryModules;
 import org.apache.james.modules.event.JMAPEventBusModule;
-import org.apache.james.modules.event.RabbitMQEventBusModule;
+import org.apache.james.modules.event.MailboxEventBusModule;
 import org.apache.james.modules.events.PostgresDeadLetterModule;
 import org.apache.james.modules.mailbox.DefaultEventModule;
 import org.apache.james.modules.mailbox.PostgresDeletedMessageVaultModule;
@@ -231,7 +231,7 @@ public class PostgresJamesServerMain implements 
JamesServerMain {
                     new ActiveMQQueueModule());
             case RABBITMQ:
                 return List.of(
-                    Modules.override(new DefaultEventModule()).with(new 
RabbitMQEventBusModule()),
+                    Modules.override(new DefaultEventModule()).with(new 
MailboxEventBusModule()),
                     new RabbitMQModule(),
                     new RabbitMQMailQueueModule(),
                     new FakeMailQueueViewModule(),
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java
similarity index 90%
rename from 
server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
rename to 
server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java
index 37522182a1..2e8d3edb0b 100644
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java
@@ -52,7 +52,7 @@ import com.google.inject.multibindings.ProvidesIntoSet;
 
 import reactor.rabbitmq.Sender;
 
-public class RabbitMQEventBusModule extends AbstractModule {
+public class MailboxEventBusModule extends AbstractModule {
 
     @Override
     protected void configure() {
@@ -64,10 +64,6 @@ public class RabbitMQEventBusModule extends AbstractModule {
         
bind(RetryBackoffConfiguration.class).toInstance(RetryBackoffConfiguration.DEFAULT);
         bind(EventBusId.class).toInstance(EventBusId.random());
 
-        Multibinder<SimpleConnectionPool.ReconnectionHandler> 
reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), 
SimpleConnectionPool.ReconnectionHandler.class);
-        
reconnectionHandlerMultibinder.addBinding().to(KeyReconnectionHandler.class);
-        
reconnectionHandlerMultibinder.addBinding().to(EventBusReconnectionHandler.class);
-
         Multibinder.newSetBinder(binder(), HealthCheck.class)
             
.addBinding().to(RabbitMQMailboxEventBusDeadLetterQueueHealthCheck.class);
     }
@@ -78,6 +74,16 @@ public class RabbitMQEventBusModule extends AbstractModule {
         return new RabbitEventBusConsumerHealthCheck(eventBus, namingStrategy, 
connectionPool);
     }
 
+    @ProvidesIntoSet
+    SimpleConnectionPool.ReconnectionHandler 
provideReconnectionHandler(RabbitMQEventBus eventBus) {
+        return new EventBusReconnectionHandler(eventBus);
+    }
+
+    @ProvidesIntoSet
+    SimpleConnectionPool.ReconnectionHandler 
provideReconnectionHandler(NamingStrategy namingStrategy, EventBusId 
eventBusId, RabbitMQConfiguration configuration) {
+        return new KeyReconnectionHandler(namingStrategy, eventBusId, 
configuration);
+    }
+
     @ProvidesIntoSet
     InitializationOperation workQueue(RabbitMQEventBus instance) {
         return InitilizationOperationBuilder


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to