This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e531cf98bc894396239cfa465e85a8f46a17dbb9
Author: Quan Tran <[email protected]>
AuthorDate: Fri Mar 29 15:26:46 2024 +0700

    JAMES-2586 Guice binding Distributed TaskManager for postgres-app
---
 .../org/apache/james/PostgresJamesServerMain.java  |  33 +++++-
 server/container/guice/postgres-common/pom.xml     |   4 +
 .../modules/data/PostgresEventStoreModule.java     |   9 --
 .../modules/task/DistributedTaskManagerModule.java | 117 +++++++++++++++++++++
 4 files changed, 150 insertions(+), 13 deletions(-)

diff --git 
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
 
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
index 452742958e..358b3eb401 100644
--- 
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
+++ 
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
@@ -20,10 +20,15 @@
 package org.apache.james;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.james.data.UsersRepositoryModuleChooser;
+import org.apache.james.eventsourcing.eventstore.EventNestedTypes;
 import org.apache.james.jmap.draft.JMAPListenerModule;
+import org.apache.james.json.DTO;
+import org.apache.james.json.DTOModule;
 import org.apache.james.modules.BlobExportMechanismModule;
+import org.apache.james.modules.DistributedTaskSerializationModule;
 import org.apache.james.modules.MailboxModule;
 import org.apache.james.modules.MailetProcessingModule;
 import org.apache.james.modules.RunArgumentsModule;
@@ -71,15 +76,23 @@ import org.apache.james.modules.server.TaskManagerModule;
 import org.apache.james.modules.server.UserIdentityModule;
 import 
org.apache.james.modules.server.WebAdminReIndexingTaskSerializationModule;
 import org.apache.james.modules.server.WebAdminServerModule;
+import org.apache.james.modules.task.DistributedTaskManagerModule;
 import org.apache.james.modules.vault.DeletedMessageVaultRoutesModule;
 import org.apache.james.vault.VaultConfiguration;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.inject.Module;
+import com.google.inject.TypeLiteral;
+import com.google.inject.name.Names;
 import com.google.inject.util.Modules;
 
 public class PostgresJamesServerMain implements JamesServerMain {
 
+    private static final Module EVENT_STORE_JSON_SERIALIZATION_DEFAULT_MODULE 
= binder ->
+        binder.bind(new TypeLiteral<Set<DTOModule<?, ? extends DTO>>>() 
{}).annotatedWith(Names.named(EventNestedTypes.EVENT_NESTED_TYPES_INJECTION_NAME))
+            .toInstance(ImmutableSet.of());
+
     private static final Module WEBADMIN = Modules.combine(
         new WebAdminServerModule(),
         new DataRoutesModules(),
@@ -114,11 +127,11 @@ public class PostgresJamesServerMain implements 
JamesServerMain {
         new PostgresDataModule(),
         new MailboxModule(),
         new SievePostgresRepositoryModules(),
-        new TaskManagerModule(),
         new PostgresEventStoreModule(),
         new TikaMailboxModule(),
         new PostgresDLPConfigurationStoreModule(),
-        new PostgresVacationModule());
+        new PostgresVacationModule(),
+        EVENT_STORE_JSON_SERIALIZATION_DEFAULT_MODULE);
 
     public static final Module JMAP = Modules.combine(
         new PostgresJmapModule(),
@@ -150,13 +163,14 @@ public class PostgresJamesServerMain implements 
JamesServerMain {
         SearchConfiguration searchConfiguration = 
configuration.searchConfiguration();
 
         return GuiceJamesServer.forConfiguration(configuration)
+            .combineWith(POSTGRES_MODULE_AGGREGATE)
             
.combineWith(SearchModuleChooser.chooseModules(searchConfiguration))
             .combineWith(chooseUsersRepositoryModule(configuration))
             .combineWith(chooseBlobStoreModules(configuration))
             .combineWith(chooseEventBusModules(configuration))
             
.combineWith(chooseDeletedMessageVaultModules(configuration.getDeletedMessageVaultConfiguration()))
-            .combineWith(POSTGRES_MODULE_AGGREGATE)
-            .overrideWith(chooseJmapModules(configuration));
+            .overrideWith(chooseJmapModules(configuration))
+            .overrideWith(chooseTaskManagerModules(configuration));
     }
 
     private static List<Module> 
chooseUsersRepositoryModule(PostgresJamesConfiguration configuration) {
@@ -173,6 +187,17 @@ public class PostgresJamesServerMain implements 
JamesServerMain {
         return builder.build();
     }
 
+    public static List<Module> 
chooseTaskManagerModules(PostgresJamesConfiguration configuration) {
+        switch (configuration.eventBusImpl()) {
+            case IN_MEMORY:
+                return List.of(new TaskManagerModule());
+            case RABBITMQ:
+                return List.of(new DistributedTaskManagerModule(), new 
DistributedTaskSerializationModule());
+            default:
+                throw new RuntimeException("Unsupported event-bus 
implementation " + configuration.eventBusImpl().name());
+        }
+    }
+
     public static List<Module> 
chooseEventBusModules(PostgresJamesConfiguration configuration) {
         switch (configuration.eventBusImpl()) {
             case IN_MEMORY:
diff --git a/server/container/guice/postgres-common/pom.xml 
b/server/container/guice/postgres-common/pom.xml
index 5cc0f9d2b3..c0d95997d3 100644
--- a/server/container/guice/postgres-common/pom.xml
+++ b/server/container/guice/postgres-common/pom.xml
@@ -84,6 +84,10 @@
             <groupId>${james.groupId}</groupId>
             <artifactId>james-server-mailbox-adapter</artifactId>
         </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-task-postgres</artifactId>
+        </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>testing-base</artifactId>
diff --git 
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresEventStoreModule.java
 
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresEventStoreModule.java
index fefe5aa309..843ea4031e 100644
--- 
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresEventStoreModule.java
+++ 
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresEventStoreModule.java
@@ -19,24 +19,17 @@
 
 package org.apache.james.modules.data;
 
-import java.util.Set;
-
 import org.apache.james.backends.postgres.PostgresModule;
 import org.apache.james.eventsourcing.Event;
-import org.apache.james.eventsourcing.eventstore.EventNestedTypes;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.eventsourcing.eventstore.dto.EventDTO;
 import org.apache.james.eventsourcing.eventstore.dto.EventDTOModule;
 import org.apache.james.eventsourcing.eventstore.postgres.PostgresEventStore;
-import org.apache.james.json.DTO;
-import org.apache.james.json.DTOModule;
 
-import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
 import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.Multibinder;
-import com.google.inject.name.Names;
 
 public class PostgresEventStoreModule extends AbstractModule {
     @Override
@@ -47,8 +40,6 @@ public class PostgresEventStoreModule extends AbstractModule {
         Multibinder<PostgresModule> postgresDataDefinitions = 
Multibinder.newSetBinder(binder(), PostgresModule.class);
         
postgresDataDefinitions.addBinding().toInstance(org.apache.james.eventsourcing.eventstore.postgres.PostgresEventStoreModule.MODULE);
 
-        bind(new TypeLiteral<Set<DTOModule<?, ? extends DTO>>>() 
{}).annotatedWith(Names.named(EventNestedTypes.EVENT_NESTED_TYPES_INJECTION_NAME))
-            .toInstance(ImmutableSet.of());
         Multibinder.newSetBinder(binder(), new TypeLiteral<EventDTOModule<? 
extends Event, ? extends EventDTO>>() {});
     }
 }
diff --git 
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/task/DistributedTaskManagerModule.java
 
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/task/DistributedTaskManagerModule.java
new file mode 100644
index 0000000000..1812b2421c
--- /dev/null
+++ 
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/task/DistributedTaskManagerModule.java
@@ -0,0 +1,117 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.task;
+
+import static 
org.apache.james.modules.queue.rabbitmq.RabbitMQModule.RABBITMQ_CONFIGURATION_NAME;
+
+import java.io.FileNotFoundException;
+
+import javax.inject.Singleton;
+
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.james.backends.postgres.PostgresModule;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.modules.server.HostnameModule;
+import org.apache.james.modules.server.TaskSerializationModule;
+import org.apache.james.task.TaskManager;
+import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
+import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.TerminationSubscriber;
+import org.apache.james.task.eventsourcing.WorkQueueSupplier;
+import org.apache.james.task.eventsourcing.distributed.CancelRequestQueueName;
+import 
org.apache.james.task.eventsourcing.distributed.DistributedTaskManagerHealthCheck;
+import 
org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber;
+import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue;
+import 
org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueConfiguration;
+import 
org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueConfiguration$;
+import 
org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueReconnectionHandler;
+import 
org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueSupplier;
+import org.apache.james.task.eventsourcing.distributed.TerminationQueueName;
+import 
org.apache.james.task.eventsourcing.distributed.TerminationReconnectionHandler;
+import 
org.apache.james.task.eventsourcing.postgres.PostgresTaskExecutionDetailsProjection;
+import 
org.apache.james.task.eventsourcing.postgres.PostgresTaskExecutionDetailsProjectionModule;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
+import org.apache.james.utils.PropertiesProvider;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
+
+public class DistributedTaskManagerModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        install(new HostnameModule());
+        install(new TaskSerializationModule());
+
+        
bind(PostgresTaskExecutionDetailsProjection.class).in(Scopes.SINGLETON);
+        bind(EventSourcingTaskManager.class).in(Scopes.SINGLETON);
+        bind(RabbitMQWorkQueueSupplier.class).in(Scopes.SINGLETON);
+        bind(RabbitMQTerminationSubscriber.class).in(Scopes.SINGLETON);
+        
bind(TaskExecutionDetailsProjection.class).to(PostgresTaskExecutionDetailsProjection.class);
+        
bind(TerminationSubscriber.class).to(RabbitMQTerminationSubscriber.class);
+        bind(TaskManager.class).to(EventSourcingTaskManager.class);
+        bind(WorkQueueSupplier.class).to(RabbitMQWorkQueueSupplier.class);
+        
bind(CancelRequestQueueName.class).toInstance(CancelRequestQueueName.generate());
+        
bind(TerminationQueueName.class).toInstance(TerminationQueueName.generate());
+
+        Multibinder<PostgresModule> postgresDataDefinitions = 
Multibinder.newSetBinder(binder(), PostgresModule.class);
+        
postgresDataDefinitions.addBinding().toInstance(PostgresTaskExecutionDetailsProjectionModule.MODULE());
+
+        Multibinder<SimpleConnectionPool.ReconnectionHandler> 
reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), 
SimpleConnectionPool.ReconnectionHandler.class);
+        
reconnectionHandlerMultibinder.addBinding().to(RabbitMQWorkQueueReconnectionHandler.class);
+        
reconnectionHandlerMultibinder.addBinding().to(TerminationReconnectionHandler.class);
+
+        Multibinder.newSetBinder(binder(), HealthCheck.class)
+            .addBinding()
+            .to(DistributedTaskManagerHealthCheck.class);
+    }
+
+    @Provides
+    @Singleton
+    private RabbitMQWorkQueueConfiguration 
getWorkQueueConfiguration(PropertiesProvider propertiesProvider) throws 
ConfigurationException {
+        try {
+            Configuration configuration = 
propertiesProvider.getConfiguration(RABBITMQ_CONFIGURATION_NAME);
+            return RabbitMQWorkQueueConfiguration$.MODULE$.from(configuration);
+        } catch (FileNotFoundException e) {
+            return RabbitMQWorkQueueConfiguration$.MODULE$.enabled();
+        }
+    }
+
+    @ProvidesIntoSet
+    InitializationOperation 
terminationSubscriber(RabbitMQTerminationSubscriber instance) {
+        return InitilizationOperationBuilder
+            .forClass(RabbitMQTerminationSubscriber.class)
+            .init(instance::start);
+    }
+
+    @ProvidesIntoSet
+    InitializationOperation workQueue(EventSourcingTaskManager instance) {
+        return InitilizationOperationBuilder
+            .forClass(RabbitMQWorkQueue.class)
+            .init(instance::start);
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to