This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b6b68130dab07dfbf543590e10ed56a4f6d75aae
Author: Quan Tran <hqt...@linagora.com>
AuthorDate: Fri Feb 7 11:41:31 2025 +0700

    [ENHANCEMENT] Allow plug custom indexers into 
OpenSearchListeningMessageSearchIndex
---
 .../OpenSearchListeningMessageSearchIndex.java     | 44 +++++++++++++++++-----
 .../opensearch/OpenSearchIntegrationTest.java      |  3 +-
 .../OpenSearchNoIndexBodyIntegrationTest.java      |  3 +-
 .../OpenSearchListeningMessageSearchIndexTest.java |  6 ++-
 .../search/OpenSearchSearchHighlighterTest.java    |  3 +-
 .../opensearch/search/OpenSearchSearcherTest.java  |  3 +-
 .../store/search/ListeningMessageSearchIndex.java  |  6 ++-
 .../elasticsearch/host/OpenSearchHostSystem.java   |  3 +-
 .../modules/mailbox/OpenSearchMailboxModule.java   |  2 +
 .../james/webadmin/routes/MailboxesRoutesTest.java |  3 +-
 .../webadmin/routes/UserMailboxesRoutesTest.java   |  3 +-
 11 files changed, 60 insertions(+), 19 deletions(-)

diff --git 
a/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java
 
b/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java
index 33d1abe7ea..dccead70fa 100644
--- 
a/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java
+++ 
b/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java
@@ -107,6 +107,17 @@ public class OpenSearchListeningMessageSearchIndex extends 
ListeningMessageSearc
         Mono<Void> handleAddedEvent(MailboxSession session, 
MailboxEvents.Added addedEvent, MailboxId mailboxId);
     }
 
+    public interface Indexer {
+        Mono<Void> added(MailboxSession session, MailboxEvents.Added 
addedEvent, Mailbox mailbox, MailboxMessage message);
+
+        static Indexer merge(Indexer defaultIndexer, Set<Indexer> overrides) {
+            return (session, addedEvent, mailbox, message) -> 
defaultIndexer.added(session, addedEvent, mailbox, message)
+                .then(Flux.fromIterable(overrides)
+                    .concatMap(indexer -> indexer.added(session, addedEvent, 
mailbox, message))
+                    .then());
+        }
+    }
+
     class NaiveIndexingStrategy implements IndexingStrategy {
 
         @Override
@@ -120,6 +131,20 @@ public class OpenSearchListeningMessageSearchIndex extends 
ListeningMessageSearc
         }
     }
 
+    class DefaultIndexer implements Indexer {
+        @Override
+        public Mono<Void> added(MailboxSession session, MailboxEvents.Added 
addedEvent, Mailbox mailbox, MailboxMessage message) {
+            LOGGER.info("Indexing mailbox {}-{} of user {} on message {}",
+                mailbox.getName(),
+                mailbox.getMailboxId().serialize(),
+                session.getUser().asString(),
+                message.getUid().asLong());
+
+            return generateIndexedJson(mailbox, message, session)
+                .flatMap(jsonContent -> add(mailbox.getMailboxId(), 
message.getUid(), jsonContent));
+        }
+    }
+
 
     class OptimizedIndexingStrategy implements IndexingStrategy {
 
@@ -231,6 +256,7 @@ public class OpenSearchListeningMessageSearchIndex extends 
ListeningMessageSearc
     private final Metric reIndexNotFoundMetric;
     private final IndexingStrategy indexingStrategy;
     private final IndexBody indexBody;
+    private final Set<Indexer> indexerOverrides;
 
     @Inject
     public OpenSearchListeningMessageSearchIndex(MailboxSessionMapperFactory 
factory,
@@ -238,7 +264,7 @@ public class OpenSearchListeningMessageSearchIndex extends 
ListeningMessageSearc
                                                  
@Named(MailboxOpenSearchConstants.InjectionNames.MAILBOX) OpenSearchIndexer 
indexer,
                                                  OpenSearchSearcher searcher, 
MessageToOpenSearchJson messageToOpenSearchJson,
                                                  SessionProvider 
sessionProvider, RoutingKey.Factory<MailboxId> routingKeyFactory, 
MessageId.Factory messageIdFactory,
-                                                 
OpenSearchMailboxConfiguration configuration, MetricFactory metricFactory) {
+                                                 
OpenSearchMailboxConfiguration configuration, MetricFactory metricFactory, 
Set<Indexer> indexersOverride) {
         super(factory, searchOverrides, sessionProvider);
         this.sessionProvider = sessionProvider;
         this.factory = factory;
@@ -247,6 +273,7 @@ public class OpenSearchListeningMessageSearchIndex extends 
ListeningMessageSearc
         this.searcher = searcher;
         this.routingKeyFactory = routingKeyFactory;
         this.messageIdFactory = messageIdFactory;
+        this.indexerOverrides = indexersOverride;
         if (configuration.isOptimiseMoves()) {
             this.indexingStrategy = new OptimizedIndexingStrategy();
         } else {
@@ -346,14 +373,13 @@ public class OpenSearchListeningMessageSearchIndex 
extends ListeningMessageSearc
 
     @Override
     public Mono<Void> add(MailboxSession session, Mailbox mailbox, 
MailboxMessage message) {
-        LOGGER.info("Indexing mailbox {}-{} of user {} on message {}",
-            mailbox.getName(),
-            mailbox.getMailboxId().serialize(),
-            session.getUser().asString(),
-            message.getUid().asLong());
-
-        return generateIndexedJson(mailbox, message, session)
-            .flatMap(jsonContent -> add(mailbox.getMailboxId(), 
message.getUid(), jsonContent));
+        return add(session, mailbox, message, null);
+    }
+
+    @Override
+    public Mono<Void> add(MailboxSession session, Mailbox mailbox, 
MailboxMessage message, MailboxEvents.Added added) {
+        return Indexer.merge(new DefaultIndexer(), indexerOverrides)
+            .added(session, added, mailbox, message);
     }
 
     private Mono<Void> add(MailboxId mailboxId, MessageUid messageUid, String 
jsonContent) {
diff --git 
a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java
 
b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java
index 84a521aee9..e1b3299606 100644
--- 
a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java
+++ 
b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java
@@ -162,7 +162,8 @@ class OpenSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
                     readAliasName, routingKeyFactory),
                 new MessageToOpenSearchJson(textExtractor, 
ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES),
                 preInstanciationStage.getSessionProvider(), routingKeyFactory, 
messageIdFactory,
-                openSearchMailboxConfiguration(), new 
RecordingMetricFactory()))
+                openSearchMailboxConfiguration(), new RecordingMetricFactory(),
+                ImmutableSet.of()))
             .noPreDeletionHooks()
             .storeQuotaManager()
             .build();
diff --git 
a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchNoIndexBodyIntegrationTest.java
 
b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchNoIndexBodyIntegrationTest.java
index 7b11590732..8673fe901f 100644
--- 
a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchNoIndexBodyIntegrationTest.java
+++ 
b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchNoIndexBodyIntegrationTest.java
@@ -126,7 +126,8 @@ public class OpenSearchNoIndexBodyIntegrationTest {
                 new OpenSearchSearcher(client, new QueryConverter(new 
CriterionConverter()), SEARCH_SIZE, readAliasName, routingKeyFactory),
                 new MessageToOpenSearchJson(textExtractor, 
ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES, 
IndexBody.NO),
                 preInstanciationStage.getSessionProvider(), routingKeyFactory, 
messageIdFactory,
-                
OpenSearchMailboxConfiguration.builder().indexBody(IndexBody.NO).build(), new 
RecordingMetricFactory()))
+                
OpenSearchMailboxConfiguration.builder().indexBody(IndexBody.NO).build(), new 
RecordingMetricFactory(),
+                ImmutableSet.of()))
             .noPreDeletionHooks()
             .storeQuotaManager()
             .build();
diff --git 
a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndexTest.java
 
b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndexTest.java
index 90cfe54fa1..4c4fdf119b 100644
--- 
a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndexTest.java
+++ 
b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndexTest.java
@@ -220,7 +220,8 @@ class OpenSearchListeningMessageSearchIndexTest {
         testee = new OpenSearchListeningMessageSearchIndex(mapperFactory,
             ImmutableSet.of(), openSearchIndexer, openSearchSearcher,
             messageToOpenSearchJson, sessionProvider, new 
MailboxIdRoutingKeyFactory(), messageIdFactory,
-            OpenSearchMailboxConfiguration.builder().build(), new 
RecordingMetricFactory());
+            OpenSearchMailboxConfiguration.builder().build(), new 
RecordingMetricFactory(),
+            ImmutableSet.of());
         session = sessionProvider.createSystemSession(USERNAME);
 
         mailbox = 
mapperFactory.getMailboxMapper(session).create(MailboxPath.forUser(USERNAME, 
DefaultMailboxes.INBOX), UidValidity.generate()).block();
@@ -287,7 +288,8 @@ class OpenSearchListeningMessageSearchIndexTest {
         testee = new OpenSearchListeningMessageSearchIndex(mapperFactory,
             ImmutableSet.of(), openSearchIndexer, openSearchSearcher,
             messageToOpenSearchJson, sessionProvider, new 
MailboxIdRoutingKeyFactory(), new InMemoryMessageId.Factory(),
-            OpenSearchMailboxConfiguration.builder().build(), new 
RecordingMetricFactory());
+            OpenSearchMailboxConfiguration.builder().build(), new 
RecordingMetricFactory(),
+            ImmutableSet.of());
 
         testee.add(session, mailbox, MESSAGE_WITH_ATTACHMENT).block();
         awaitForOpenSearch(QueryBuilders.matchAll().build().toQuery(), 1L);
diff --git 
a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearchHighlighterTest.java
 
b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearchHighlighterTest.java
index ef4503a78d..34162e536f 100644
--- 
a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearchHighlighterTest.java
+++ 
b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearchHighlighterTest.java
@@ -142,7 +142,8 @@ public class OpenSearchSearchHighlighterTest implements 
SearchHighLighterContrac
                 openSearchSearcher,
                 new MessageToOpenSearchJson(textExtractor, 
ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES),
                 preInstanciationStage.getSessionProvider(), routingKeyFactory, 
messageIdFactory,
-                openSearchMailboxConfiguration, new RecordingMetricFactory()))
+                openSearchMailboxConfiguration, new RecordingMetricFactory(),
+                ImmutableSet.of()))
             .noPreDeletionHooks()
             .storeQuotaManager()
             .build();
diff --git 
a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcherTest.java
 
b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcherTest.java
index 412f94414d..635f98b837 100644
--- 
a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcherTest.java
+++ 
b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcherTest.java
@@ -132,7 +132,8 @@ class OpenSearchSearcherTest {
                 new OpenSearchSearcher(client, new QueryConverter(new 
CriterionConverter()), SEARCH_SIZE, readAliasName, routingKeyFactory),
                 new MessageToOpenSearchJson(textExtractor, 
ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES),
                 preInstanciationStage.getSessionProvider(), routingKeyFactory, 
messageIdFactory,
-                OpenSearchMailboxConfiguration.builder().build(), new 
RecordingMetricFactory()))
+                OpenSearchMailboxConfiguration.builder().build(), new 
RecordingMetricFactory(),
+                ImmutableSet.of()))
             .noPreDeletionHooks()
             .storeQuotaManager()
             .build();
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
index 6c7d7f1fde..054717f020 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
@@ -141,7 +141,7 @@ public abstract class ListeningMessageSearchIndex 
implements MessageSearchIndex,
         return Flux.fromIterable(MessageRange.toRanges(added.getUids()))
             .concatMap(range -> retrieveMailboxMessages(session, mailbox, 
range, fetchType))
             .publishOn(Schedulers.parallel())
-            .concatMap(mailboxMessage -> add(session, mailbox, mailboxMessage))
+            .concatMap(mailboxMessage -> add(session, mailbox, mailboxMessage, 
added))
             .then();
     }
 
@@ -161,6 +161,10 @@ public abstract class ListeningMessageSearchIndex 
implements MessageSearchIndex,
      */
     public abstract Mono<Void> add(MailboxSession session, Mailbox mailbox, 
MailboxMessage message);
 
+    public Mono<Void> add(MailboxSession session, Mailbox mailbox, 
MailboxMessage message, Added added) {
+        return add(session, mailbox, message);
+    }
+
     /**
      * Delete the concerned UIDs for the given {@link Mailbox} from the index
      *
diff --git 
a/mpt/impl/imap-mailbox/opensearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/OpenSearchHostSystem.java
 
b/mpt/impl/imap-mailbox/opensearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/OpenSearchHostSystem.java
index 8ce82167ac..2a161caf37 100644
--- 
a/mpt/impl/imap-mailbox/opensearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/OpenSearchHostSystem.java
+++ 
b/mpt/impl/imap-mailbox/opensearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/OpenSearchHostSystem.java
@@ -108,7 +108,8 @@ public class OpenSearchHostSystem extends 
JamesImapHostSystem {
                     MailboxOpenSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, 
routingKeyFactory),
                 new MessageToOpenSearchJson(new DefaultTextExtractor(), 
ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES),
                 preInstanciationStage.getSessionProvider(), routingKeyFactory, 
messageIdFactory,
-                OpenSearchMailboxConfiguration.builder().build(), new 
RecordingMetricFactory()))
+                OpenSearchMailboxConfiguration.builder().build(), new 
RecordingMetricFactory(),
+                ImmutableSet.of()))
             .noPreDeletionHooks()
             .storeQuotaManager()
             .build();
diff --git 
a/server/container/guice/opensearch/src/main/java/org/apache/james/modules/mailbox/OpenSearchMailboxModule.java
 
b/server/container/guice/opensearch/src/main/java/org/apache/james/modules/mailbox/OpenSearchMailboxModule.java
index a36402ca70..4abde88eac 100644
--- 
a/server/container/guice/opensearch/src/main/java/org/apache/james/modules/mailbox/OpenSearchMailboxModule.java
+++ 
b/server/container/guice/opensearch/src/main/java/org/apache/james/modules/mailbox/OpenSearchMailboxModule.java
@@ -104,6 +104,8 @@ public class OpenSearchMailboxModule extends AbstractModule 
{
         Multibinder.newSetBinder(binder(), StartUpCheck.class)
             .addBinding()
             .to(OpenSearchStartUpCheck.class);
+
+        Multibinder.newSetBinder(binder(), 
OpenSearchListeningMessageSearchIndex.Indexer.class);
     }
 
     @Provides
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
index 5ff774c7bb..0f94c80b21 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
@@ -165,7 +165,8 @@ class MailboxesRoutesTest {
                 new OpenSearchSearcher(client, new QueryConverter(new 
CriterionConverter()), SEARCH_SIZE,
                     MailboxOpenSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, 
routingKeyFactory),
                 new MessageToOpenSearchJson(new DefaultTextExtractor(), 
ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES),
-                preInstanciationStage.getSessionProvider(), routingKeyFactory, 
messageIdFactory, OpenSearchMailboxConfiguration.builder().build(), new 
RecordingMetricFactory()))
+                preInstanciationStage.getSessionProvider(), routingKeyFactory, 
messageIdFactory, OpenSearchMailboxConfiguration.builder().build(), new 
RecordingMetricFactory(),
+                ImmutableSet.of()))
             .noPreDeletionHooks()
             .storeQuotaManager()
             .build();
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
index 7c82bdcb0c..e6ea677bd1 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
@@ -1521,7 +1521,8 @@ class UserMailboxesRoutesTest {
                     new OpenSearchSearcher(client, new QueryConverter(new 
CriterionConverter()), SEARCH_SIZE,
                         MailboxOpenSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, 
routingKeyFactory),
                     new MessageToOpenSearchJson(new DefaultTextExtractor(), 
ZoneId.of("Europe/Paris"), IndexAttachments.YES, IndexHeaders.YES),
-                    preInstanciationStage.getSessionProvider(), 
routingKeyFactory, messageIdFactory, 
OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory()))
+                    preInstanciationStage.getSessionProvider(), 
routingKeyFactory, messageIdFactory, 
OpenSearchMailboxConfiguration.builder().build(), new RecordingMetricFactory(),
+                    ImmutableSet.of()))
                 .noPreDeletionHooks()
                 .storeQuotaManager()
                 .build();


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to