This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b6b68130dab07dfbf543590e10ed56a4f6d75aae Author: Quan Tran <hqt...@linagora.com> AuthorDate: Fri Feb 7 11:41:31 2025 +0700 [ENHANCEMENT] Allow plug custom indexers into OpenSearchListeningMessageSearchIndex --- .../OpenSearchListeningMessageSearchIndex.java | 44 +++++++++++++++++----- .../opensearch/OpenSearchIntegrationTest.java | 3 +- .../OpenSearchNoIndexBodyIntegrationTest.java | 3 +- .../OpenSearchListeningMessageSearchIndexTest.java | 6 ++- .../search/OpenSearchSearchHighlighterTest.java | 3 +- .../opensearch/search/OpenSearchSearcherTest.java | 3 +- .../store/search/ListeningMessageSearchIndex.java | 6 ++- .../elasticsearch/host/OpenSearchHostSystem.java | 3 +- .../modules/mailbox/OpenSearchMailboxModule.java | 2 + .../james/webadmin/routes/MailboxesRoutesTest.java | 3 +- .../webadmin/routes/UserMailboxesRoutesTest.java | 3 +- 11 files changed, 60 insertions(+), 19 deletions(-) diff --git a/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java b/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java index 33d1abe7ea..dccead70fa 100644 --- a/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java +++ b/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java @@ -107,6 +107,17 @@ public class OpenSearchListeningMessageSearchIndex extends ListeningMessageSearc Mono<Void> handleAddedEvent(MailboxSession session, MailboxEvents.Added addedEvent, MailboxId mailboxId); } + public interface Indexer { + Mono<Void> added(MailboxSession session, MailboxEvents.Added addedEvent, Mailbox mailbox, MailboxMessage message); + + static Indexer merge(Indexer defaultIndexer, Set<Indexer> overrides) { + return (session, addedEvent, mailbox, message) -> defaultIndexer.added(session, addedEvent, mailbox, message) + .then(Flux.fromIterable(overrides) + .concatMap(indexer -> indexer.added(session, addedEvent, mailbox, message)) + .then()); + } + } + class NaiveIndexingStrategy implements IndexingStrategy { @Override @@ -120,6 +131,20 @@ public class OpenSearchListeningMessageSearchIndex extends ListeningMessageSearc } } + class DefaultIndexer implements Indexer { + @Override + public Mono<Void> added(MailboxSession session, MailboxEvents.Added addedEvent, Mailbox mailbox, MailboxMessage message) { + LOGGER.info("Indexing mailbox {}-{} of user {} on message {}", + mailbox.getName(), + mailbox.getMailboxId().serialize(), + session.getUser().asString(), + message.getUid().asLong()); + + return generateIndexedJson(mailbox, message, session) + .flatMap(jsonContent -> add(mailbox.getMailboxId(), message.getUid(), jsonContent)); + } + } + class OptimizedIndexingStrategy implements IndexingStrategy { @@ -231,6 +256,7 @@ public class OpenSearchListeningMessageSearchIndex extends ListeningMessageSearc private final Metric reIndexNotFoundMetric; private final IndexingStrategy indexingStrategy; private final IndexBody indexBody; + private final Set<Indexer> indexerOverrides; @Inject public OpenSearchListeningMessageSearchIndex(MailboxSessionMapperFactory factory, @@ -238,7 +264,7 @@ public class OpenSearchListeningMessageSearchIndex extends ListeningMessageSearc @Named(MailboxOpenSearchConstants.InjectionNames.MAILBOX) OpenSearchIndexer indexer, OpenSearchSearcher searcher, MessageToOpenSearchJson messageToOpenSearchJson, SessionProvider sessionProvider, RoutingKey.Factory<MailboxId> routingKeyFactory, MessageId.Factory messageIdFactory, - OpenSearchMailboxConfiguration configuration, MetricFactory metricFactory) { + OpenSearchMailboxConfiguration configuration, MetricFactory metricFactory, Set<Indexer> indexersOverride) { super(factory, searchOverrides, sessionProvider); this.sessionProvider = sessionProvider; this.factory = factory; @@ -247,6 +273,7 @@ public class OpenSearchListeningMessageSearchIndex extends ListeningMessageSearc this.searcher = searcher; this.routingKeyFactory = routingKeyFactory; this.messageIdFactory = messageIdFactory; + this.indexerOverrides = indexersOverride; if (configuration.isOptimiseMoves()) { this.indexingStrategy = new OptimizedIndexingStrategy(); } else { @@ -346,14 +373,13 @@ public class OpenSearchListeningMessageSearchIndex extends ListeningMessageSearc @Override public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message) { - LOGGER.info("Indexing mailbox {}-{} of user {} on message {}", - mailbox.getName(), - mailbox.getMailboxId().serialize(), - session.getUser().asString(), - message.getUid().asLong()); - - return generateIndexedJson(mailbox, message, session) - .flatMap(jsonContent -> add(mailbox.getMailboxId(), message.getUid(), jsonContent)); + return add(session, mailbox, message, null); + } + + @Override + public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message, MailboxEvents.Added added) { + return Indexer.merge(new DefaultIndexer(), indexerOverrides) + .added(session, added, mailbox, message); } private Mono<Void> add(MailboxId mailboxId, MessageUid messageUid, String jsonContent) { diff --git a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java index 84a521aee9..e1b3299606 100644 --- a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java +++ b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java @@ -162,7 +162,8 @@ class OpenSearchIntegrationTest extends AbstractMessageSearchIndexTest { readAliasName, routingKeyFactory), new MessageToOpenSearchJson(textExtractor, ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES), preInstanciationStage.getSessionProvider(), routingKeyFactory, messageIdFactory, - openSearchMailboxConfiguration(), new RecordingMetricFactory())) + openSearchMailboxConfiguration(), new RecordingMetricFactory(), + ImmutableSet.of())) .noPreDeletionHooks() .storeQuotaManager() .build(); diff --git a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchNoIndexBodyIntegrationTest.java b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchNoIndexBodyIntegrationTest.java index 7b11590732..8673fe901f 100644 --- a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchNoIndexBodyIntegrationTest.java +++ b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchNoIndexBodyIntegrationTest.java @@ -126,7 +126,8 @@ public class OpenSearchNoIndexBodyIntegrationTest { new OpenSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE, readAliasName, routingKeyFactory), new MessageToOpenSearchJson(textExtractor, ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES, IndexBody.NO), preInstanciationStage.getSessionProvider(), routingKeyFactory, messageIdFactory, - OpenSearchMailboxConfiguration.builder().indexBody(IndexBody.NO).build(), new RecordingMetricFactory())) + OpenSearchMailboxConfiguration.builder().indexBody(IndexBody.NO).build(), new RecordingMetricFactory(), + ImmutableSet.of())) .noPreDeletionHooks() .storeQuotaManager() .build(); diff --git a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndexTest.java b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndexTest.java index 90cfe54fa1..4c4fdf119b 100644 --- a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndexTest.java +++ b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndexTest.java @@ -220,7 +220,8 @@ class OpenSearchListeningMessageSearchIndexTest { testee = new OpenSearchListeningMessageSearchIndex(mapperFactory, ImmutableSet.of(), openSearchIndexer, openSearchSearcher, messageToOpenSearchJson, sessionProvider, new MailboxIdRoutingKeyFactory(), messageIdFactory, - OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory()); + OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory(), + ImmutableSet.of()); session = sessionProvider.createSystemSession(USERNAME); mailbox = mapperFactory.getMailboxMapper(session).create(MailboxPath.forUser(USERNAME, DefaultMailboxes.INBOX), UidValidity.generate()).block(); @@ -287,7 +288,8 @@ class OpenSearchListeningMessageSearchIndexTest { testee = new OpenSearchListeningMessageSearchIndex(mapperFactory, ImmutableSet.of(), openSearchIndexer, openSearchSearcher, messageToOpenSearchJson, sessionProvider, new MailboxIdRoutingKeyFactory(), new InMemoryMessageId.Factory(), - OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory()); + OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory(), + ImmutableSet.of()); testee.add(session, mailbox, MESSAGE_WITH_ATTACHMENT).block(); awaitForOpenSearch(QueryBuilders.matchAll().build().toQuery(), 1L); diff --git a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearchHighlighterTest.java b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearchHighlighterTest.java index ef4503a78d..34162e536f 100644 --- a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearchHighlighterTest.java +++ b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearchHighlighterTest.java @@ -142,7 +142,8 @@ public class OpenSearchSearchHighlighterTest implements SearchHighLighterContrac openSearchSearcher, new MessageToOpenSearchJson(textExtractor, ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES), preInstanciationStage.getSessionProvider(), routingKeyFactory, messageIdFactory, - openSearchMailboxConfiguration, new RecordingMetricFactory())) + openSearchMailboxConfiguration, new RecordingMetricFactory(), + ImmutableSet.of())) .noPreDeletionHooks() .storeQuotaManager() .build(); diff --git a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcherTest.java b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcherTest.java index 412f94414d..635f98b837 100644 --- a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcherTest.java +++ b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcherTest.java @@ -132,7 +132,8 @@ class OpenSearchSearcherTest { new OpenSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE, readAliasName, routingKeyFactory), new MessageToOpenSearchJson(textExtractor, ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES), preInstanciationStage.getSessionProvider(), routingKeyFactory, messageIdFactory, - OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory())) + OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory(), + ImmutableSet.of())) .noPreDeletionHooks() .storeQuotaManager() .build(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java index 6c7d7f1fde..054717f020 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java @@ -141,7 +141,7 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex, return Flux.fromIterable(MessageRange.toRanges(added.getUids())) .concatMap(range -> retrieveMailboxMessages(session, mailbox, range, fetchType)) .publishOn(Schedulers.parallel()) - .concatMap(mailboxMessage -> add(session, mailbox, mailboxMessage)) + .concatMap(mailboxMessage -> add(session, mailbox, mailboxMessage, added)) .then(); } @@ -161,6 +161,10 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex, */ public abstract Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message); + public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message, Added added) { + return add(session, mailbox, message); + } + /** * Delete the concerned UIDs for the given {@link Mailbox} from the index * diff --git a/mpt/impl/imap-mailbox/opensearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/OpenSearchHostSystem.java b/mpt/impl/imap-mailbox/opensearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/OpenSearchHostSystem.java index 8ce82167ac..2a161caf37 100644 --- a/mpt/impl/imap-mailbox/opensearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/OpenSearchHostSystem.java +++ b/mpt/impl/imap-mailbox/opensearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/OpenSearchHostSystem.java @@ -108,7 +108,8 @@ public class OpenSearchHostSystem extends JamesImapHostSystem { MailboxOpenSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, routingKeyFactory), new MessageToOpenSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES), preInstanciationStage.getSessionProvider(), routingKeyFactory, messageIdFactory, - OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory())) + OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory(), + ImmutableSet.of())) .noPreDeletionHooks() .storeQuotaManager() .build(); diff --git a/server/container/guice/opensearch/src/main/java/org/apache/james/modules/mailbox/OpenSearchMailboxModule.java b/server/container/guice/opensearch/src/main/java/org/apache/james/modules/mailbox/OpenSearchMailboxModule.java index a36402ca70..4abde88eac 100644 --- a/server/container/guice/opensearch/src/main/java/org/apache/james/modules/mailbox/OpenSearchMailboxModule.java +++ b/server/container/guice/opensearch/src/main/java/org/apache/james/modules/mailbox/OpenSearchMailboxModule.java @@ -104,6 +104,8 @@ public class OpenSearchMailboxModule extends AbstractModule { Multibinder.newSetBinder(binder(), StartUpCheck.class) .addBinding() .to(OpenSearchStartUpCheck.class); + + Multibinder.newSetBinder(binder(), OpenSearchListeningMessageSearchIndex.Indexer.class); } @Provides diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java index 5ff774c7bb..0f94c80b21 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java @@ -165,7 +165,8 @@ class MailboxesRoutesTest { new OpenSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE, MailboxOpenSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, routingKeyFactory), new MessageToOpenSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES), - preInstanciationStage.getSessionProvider(), routingKeyFactory, messageIdFactory, OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory())) + preInstanciationStage.getSessionProvider(), routingKeyFactory, messageIdFactory, OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory(), + ImmutableSet.of())) .noPreDeletionHooks() .storeQuotaManager() .build(); diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java index 7c82bdcb0c..e6ea677bd1 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java @@ -1521,7 +1521,8 @@ class UserMailboxesRoutesTest { new OpenSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE, MailboxOpenSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, routingKeyFactory), new MessageToOpenSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES), - preInstanciationStage.getSessionProvider(), routingKeyFactory, messageIdFactory, OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory())) + preInstanciationStage.getSessionProvider(), routingKeyFactory, messageIdFactory, OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory(), + ImmutableSet.of())) .noPreDeletionHooks() .storeQuotaManager() .build(); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org