This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e531cf98bc894396239cfa465e85a8f46a17dbb9 Author: Quan Tran <[email protected]> AuthorDate: Fri Mar 29 15:26:46 2024 +0700 JAMES-2586 Guice binding Distributed TaskManager for postgres-app --- .../org/apache/james/PostgresJamesServerMain.java | 33 +++++- server/container/guice/postgres-common/pom.xml | 4 + .../modules/data/PostgresEventStoreModule.java | 9 -- .../modules/task/DistributedTaskManagerModule.java | 117 +++++++++++++++++++++ 4 files changed, 150 insertions(+), 13 deletions(-) diff --git a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java index 452742958e..358b3eb401 100644 --- a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java +++ b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java @@ -20,10 +20,15 @@ package org.apache.james; import java.util.List; +import java.util.Set; import org.apache.james.data.UsersRepositoryModuleChooser; +import org.apache.james.eventsourcing.eventstore.EventNestedTypes; import org.apache.james.jmap.draft.JMAPListenerModule; +import org.apache.james.json.DTO; +import org.apache.james.json.DTOModule; import org.apache.james.modules.BlobExportMechanismModule; +import org.apache.james.modules.DistributedTaskSerializationModule; import org.apache.james.modules.MailboxModule; import org.apache.james.modules.MailetProcessingModule; import org.apache.james.modules.RunArgumentsModule; @@ -71,15 +76,23 @@ import org.apache.james.modules.server.TaskManagerModule; import org.apache.james.modules.server.UserIdentityModule; import org.apache.james.modules.server.WebAdminReIndexingTaskSerializationModule; import org.apache.james.modules.server.WebAdminServerModule; +import org.apache.james.modules.task.DistributedTaskManagerModule; import org.apache.james.modules.vault.DeletedMessageVaultRoutesModule; import org.apache.james.vault.VaultConfiguration; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.inject.Module; +import com.google.inject.TypeLiteral; +import com.google.inject.name.Names; import com.google.inject.util.Modules; public class PostgresJamesServerMain implements JamesServerMain { + private static final Module EVENT_STORE_JSON_SERIALIZATION_DEFAULT_MODULE = binder -> + binder.bind(new TypeLiteral<Set<DTOModule<?, ? extends DTO>>>() {}).annotatedWith(Names.named(EventNestedTypes.EVENT_NESTED_TYPES_INJECTION_NAME)) + .toInstance(ImmutableSet.of()); + private static final Module WEBADMIN = Modules.combine( new WebAdminServerModule(), new DataRoutesModules(), @@ -114,11 +127,11 @@ public class PostgresJamesServerMain implements JamesServerMain { new PostgresDataModule(), new MailboxModule(), new SievePostgresRepositoryModules(), - new TaskManagerModule(), new PostgresEventStoreModule(), new TikaMailboxModule(), new PostgresDLPConfigurationStoreModule(), - new PostgresVacationModule()); + new PostgresVacationModule(), + EVENT_STORE_JSON_SERIALIZATION_DEFAULT_MODULE); public static final Module JMAP = Modules.combine( new PostgresJmapModule(), @@ -150,13 +163,14 @@ public class PostgresJamesServerMain implements JamesServerMain { SearchConfiguration searchConfiguration = configuration.searchConfiguration(); return GuiceJamesServer.forConfiguration(configuration) + .combineWith(POSTGRES_MODULE_AGGREGATE) .combineWith(SearchModuleChooser.chooseModules(searchConfiguration)) .combineWith(chooseUsersRepositoryModule(configuration)) .combineWith(chooseBlobStoreModules(configuration)) .combineWith(chooseEventBusModules(configuration)) .combineWith(chooseDeletedMessageVaultModules(configuration.getDeletedMessageVaultConfiguration())) - .combineWith(POSTGRES_MODULE_AGGREGATE) - .overrideWith(chooseJmapModules(configuration)); + .overrideWith(chooseJmapModules(configuration)) + .overrideWith(chooseTaskManagerModules(configuration)); } private static List<Module> chooseUsersRepositoryModule(PostgresJamesConfiguration configuration) { @@ -173,6 +187,17 @@ public class PostgresJamesServerMain implements JamesServerMain { return builder.build(); } + public static List<Module> chooseTaskManagerModules(PostgresJamesConfiguration configuration) { + switch (configuration.eventBusImpl()) { + case IN_MEMORY: + return List.of(new TaskManagerModule()); + case RABBITMQ: + return List.of(new DistributedTaskManagerModule(), new DistributedTaskSerializationModule()); + default: + throw new RuntimeException("Unsupported event-bus implementation " + configuration.eventBusImpl().name()); + } + } + public static List<Module> chooseEventBusModules(PostgresJamesConfiguration configuration) { switch (configuration.eventBusImpl()) { case IN_MEMORY: diff --git a/server/container/guice/postgres-common/pom.xml b/server/container/guice/postgres-common/pom.xml index 5cc0f9d2b3..c0d95997d3 100644 --- a/server/container/guice/postgres-common/pom.xml +++ b/server/container/guice/postgres-common/pom.xml @@ -84,6 +84,10 @@ <groupId>${james.groupId}</groupId> <artifactId>james-server-mailbox-adapter</artifactId> </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-task-postgres</artifactId> + </dependency> <dependency> <groupId>${james.groupId}</groupId> <artifactId>testing-base</artifactId> diff --git a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresEventStoreModule.java b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresEventStoreModule.java index fefe5aa309..843ea4031e 100644 --- a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresEventStoreModule.java +++ b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresEventStoreModule.java @@ -19,24 +19,17 @@ package org.apache.james.modules.data; -import java.util.Set; - import org.apache.james.backends.postgres.PostgresModule; import org.apache.james.eventsourcing.Event; -import org.apache.james.eventsourcing.eventstore.EventNestedTypes; import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.eventsourcing.eventstore.dto.EventDTO; import org.apache.james.eventsourcing.eventstore.dto.EventDTOModule; import org.apache.james.eventsourcing.eventstore.postgres.PostgresEventStore; -import org.apache.james.json.DTO; -import org.apache.james.json.DTOModule; -import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; import com.google.inject.Scopes; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.Multibinder; -import com.google.inject.name.Names; public class PostgresEventStoreModule extends AbstractModule { @Override @@ -47,8 +40,6 @@ public class PostgresEventStoreModule extends AbstractModule { Multibinder<PostgresModule> postgresDataDefinitions = Multibinder.newSetBinder(binder(), PostgresModule.class); postgresDataDefinitions.addBinding().toInstance(org.apache.james.eventsourcing.eventstore.postgres.PostgresEventStoreModule.MODULE); - bind(new TypeLiteral<Set<DTOModule<?, ? extends DTO>>>() {}).annotatedWith(Names.named(EventNestedTypes.EVENT_NESTED_TYPES_INJECTION_NAME)) - .toInstance(ImmutableSet.of()); Multibinder.newSetBinder(binder(), new TypeLiteral<EventDTOModule<? extends Event, ? extends EventDTO>>() {}); } } diff --git a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/task/DistributedTaskManagerModule.java b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/task/DistributedTaskManagerModule.java new file mode 100644 index 0000000000..1812b2421c --- /dev/null +++ b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/task/DistributedTaskManagerModule.java @@ -0,0 +1,117 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.modules.task; + +import static org.apache.james.modules.queue.rabbitmq.RabbitMQModule.RABBITMQ_CONFIGURATION_NAME; + +import java.io.FileNotFoundException; + +import javax.inject.Singleton; + +import org.apache.commons.configuration2.Configuration; +import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.james.backends.postgres.PostgresModule; +import org.apache.james.backends.rabbitmq.SimpleConnectionPool; +import org.apache.james.core.healthcheck.HealthCheck; +import org.apache.james.modules.server.HostnameModule; +import org.apache.james.modules.server.TaskSerializationModule; +import org.apache.james.task.TaskManager; +import org.apache.james.task.eventsourcing.EventSourcingTaskManager; +import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection; +import org.apache.james.task.eventsourcing.TerminationSubscriber; +import org.apache.james.task.eventsourcing.WorkQueueSupplier; +import org.apache.james.task.eventsourcing.distributed.CancelRequestQueueName; +import org.apache.james.task.eventsourcing.distributed.DistributedTaskManagerHealthCheck; +import org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber; +import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue; +import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueConfiguration; +import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueConfiguration$; +import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueReconnectionHandler; +import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueSupplier; +import org.apache.james.task.eventsourcing.distributed.TerminationQueueName; +import org.apache.james.task.eventsourcing.distributed.TerminationReconnectionHandler; +import org.apache.james.task.eventsourcing.postgres.PostgresTaskExecutionDetailsProjection; +import org.apache.james.task.eventsourcing.postgres.PostgresTaskExecutionDetailsProjectionModule; +import org.apache.james.utils.InitializationOperation; +import org.apache.james.utils.InitilizationOperationBuilder; +import org.apache.james.utils.PropertiesProvider; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.multibindings.Multibinder; +import com.google.inject.multibindings.ProvidesIntoSet; + +public class DistributedTaskManagerModule extends AbstractModule { + + @Override + protected void configure() { + install(new HostnameModule()); + install(new TaskSerializationModule()); + + bind(PostgresTaskExecutionDetailsProjection.class).in(Scopes.SINGLETON); + bind(EventSourcingTaskManager.class).in(Scopes.SINGLETON); + bind(RabbitMQWorkQueueSupplier.class).in(Scopes.SINGLETON); + bind(RabbitMQTerminationSubscriber.class).in(Scopes.SINGLETON); + bind(TaskExecutionDetailsProjection.class).to(PostgresTaskExecutionDetailsProjection.class); + bind(TerminationSubscriber.class).to(RabbitMQTerminationSubscriber.class); + bind(TaskManager.class).to(EventSourcingTaskManager.class); + bind(WorkQueueSupplier.class).to(RabbitMQWorkQueueSupplier.class); + bind(CancelRequestQueueName.class).toInstance(CancelRequestQueueName.generate()); + bind(TerminationQueueName.class).toInstance(TerminationQueueName.generate()); + + Multibinder<PostgresModule> postgresDataDefinitions = Multibinder.newSetBinder(binder(), PostgresModule.class); + postgresDataDefinitions.addBinding().toInstance(PostgresTaskExecutionDetailsProjectionModule.MODULE()); + + Multibinder<SimpleConnectionPool.ReconnectionHandler> reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), SimpleConnectionPool.ReconnectionHandler.class); + reconnectionHandlerMultibinder.addBinding().to(RabbitMQWorkQueueReconnectionHandler.class); + reconnectionHandlerMultibinder.addBinding().to(TerminationReconnectionHandler.class); + + Multibinder.newSetBinder(binder(), HealthCheck.class) + .addBinding() + .to(DistributedTaskManagerHealthCheck.class); + } + + @Provides + @Singleton + private RabbitMQWorkQueueConfiguration getWorkQueueConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException { + try { + Configuration configuration = propertiesProvider.getConfiguration(RABBITMQ_CONFIGURATION_NAME); + return RabbitMQWorkQueueConfiguration$.MODULE$.from(configuration); + } catch (FileNotFoundException e) { + return RabbitMQWorkQueueConfiguration$.MODULE$.enabled(); + } + } + + @ProvidesIntoSet + InitializationOperation terminationSubscriber(RabbitMQTerminationSubscriber instance) { + return InitilizationOperationBuilder + .forClass(RabbitMQTerminationSubscriber.class) + .init(instance::start); + } + + @ProvidesIntoSet + InitializationOperation workQueue(EventSourcingTaskManager instance) { + return InitilizationOperationBuilder + .forClass(RabbitMQWorkQueue.class) + .init(instance::start); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
