This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 23243b83b7 JAMES-4080 Refactor EventDeadLetters redeliver code to work 
with multiple serializers (#2465)
23243b83b7 is described below

commit 23243b83b776cab8a0cd6d11963622a3ea683ce7
Author: hungphan227 <45198168+hungphan...@users.noreply.github.com>
AuthorDate: Mon Oct 28 10:30:49 2024 +0700

    JAMES-4080 Refactor EventDeadLetters redeliver code to work with multiple 
serializers (#2465)
---
 .../james/events/EventSerializersAggregator.java   | 70 ++++++++++++++++++++++
 .../org/apache/james/events/RabbitMQEventBus.java  |  2 -
 .../james/modules/event/JMAPEventBusModule.java    |  6 ++
 .../modules/event/RabbitMQEventBusModule.java      | 43 +++++++++----
 .../james/modules/mailbox/DefaultEventModule.java  | 16 ++---
 5 files changed, 117 insertions(+), 20 deletions(-)

diff --git 
a/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java
 
b/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java
new file mode 100644
index 0000000000..ca2e6b383d
--- /dev/null
+++ 
b/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java
@@ -0,0 +1,70 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.events;
+
+import java.util.Optional;
+import java.util.Set;
+
+import jakarta.inject.Inject;
+import jakarta.inject.Singleton;
+
+@Singleton
+public class EventSerializersAggregator implements EventSerializer {
+    private final Set<EventSerializer> allEventSerializers;
+
+    @Inject
+    public EventSerializersAggregator(Set<EventSerializer> 
allEventSerializers) {
+        this.allEventSerializers = allEventSerializers;
+    }
+
+    @Override
+    public String toJson(Event event) {
+        return allEventSerializers.stream()
+            .map(eventSerializer -> serialize(event, eventSerializer))
+            .flatMap(Optional::stream)
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("Could not serialize 
event: " + event));
+    }
+
+    @Override
+    public Event asEvent(String serialized) {
+        return allEventSerializers.stream()
+            .map(eventSerializer -> deserialize(serialized, eventSerializer))
+            .flatMap(Optional::stream)
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("Could not deserialize 
event: " + serialized));
+    }
+
+    private Optional<String> serialize(Event event, EventSerializer 
eventSerializer) {
+        try {
+            return Optional.of(eventSerializer.toJson(event));
+        } catch (Exception ex) {
+            return Optional.empty();
+        }
+    }
+
+    private Optional<Event> deserialize(String json, EventSerializer 
eventSerializer) {
+        try {
+            return Optional.of(eventSerializer.asEvent(json));
+        } catch (Exception ex) {
+            return Optional.empty();
+        }
+    }
+}
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index f115fd5eca..a81ec7a8a3 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Set;
 
 import jakarta.annotation.PreDestroy;
-import jakarta.inject.Inject;
 
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
@@ -62,7 +61,6 @@ public class RabbitMQEventBus implements EventBus, Startable {
     private KeyRegistrationHandler keyRegistrationHandler;
     private EventDispatcher eventDispatcher;
 
-    @Inject
     public RabbitMQEventBus(NamingStrategy namingStrategy, Sender sender, 
ReceiverProvider receiverProvider, EventSerializer eventSerializer,
                             RetryBackoffConfiguration retryBackoff,
                             RoutingKeyConverter routingKeyConverter,
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
index ec41bf49ec..3a50397202 100644
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
@@ -32,6 +32,7 @@ import org.apache.james.events.EventBus;
 import org.apache.james.events.EventBusId;
 import org.apache.james.events.EventBusReconnectionHandler;
 import org.apache.james.events.EventDeadLetters;
+import org.apache.james.events.EventSerializer;
 import org.apache.james.events.KeyReconnectionHandler;
 import org.apache.james.events.RabbitEventBusConsumerHealthCheck;
 import org.apache.james.events.RabbitMQEventBus;
@@ -120,4 +121,9 @@ public class JMAPEventBusModule extends AbstractModule {
     EventBus registerEventBus(@Named(InjectionKeys.JMAP) EventBus eventBus) {
         return eventBus;
     }
+
+    @ProvidesIntoSet
+    EventSerializer registerEventSerializers(JmapEventSerializer 
eventSerializer) {
+        return eventSerializer;
+    }
 }
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
index 599eb666a1..37522182a1 100644
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
@@ -21,13 +21,16 @@ package org.apache.james.modules.event;
 
 import static 
org.apache.james.events.NamingStrategy.MAILBOX_EVENT_NAMING_STRATEGY;
 
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.event.json.MailboxEventSerializer;
 import org.apache.james.events.EventBus;
 import org.apache.james.events.EventBusId;
 import org.apache.james.events.EventBusReconnectionHandler;
-import org.apache.james.events.EventSerializer;
+import org.apache.james.events.EventDeadLetters;
 import org.apache.james.events.KeyReconnectionHandler;
 import org.apache.james.events.NamingStrategy;
 import org.apache.james.events.RabbitEventBusConsumerHealthCheck;
@@ -35,29 +38,25 @@ import org.apache.james.events.RabbitMQEventBus;
 import 
org.apache.james.events.RabbitMQMailboxEventBusDeadLetterQueueHealthCheck;
 import org.apache.james.events.RegistrationKey;
 import org.apache.james.events.RetryBackoffConfiguration;
+import org.apache.james.events.RoutingKeyConverter;
 import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
+import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.utils.InitializationOperation;
 import org.apache.james.utils.InitilizationOperationBuilder;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.Scopes;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
 import com.google.inject.multibindings.Multibinder;
 import com.google.inject.multibindings.ProvidesIntoSet;
 
+import reactor.rabbitmq.Sender;
+
 public class RabbitMQEventBusModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        bind(MailboxEventSerializer.class).in(Scopes.SINGLETON);
-        bind(EventSerializer.class).to(MailboxEventSerializer.class);
-
         bind(NamingStrategy.class).toInstance(MAILBOX_EVENT_NAMING_STRATEGY);
-        bind(RabbitMQEventBus.class).in(Scopes.SINGLETON);
-        bind(EventBus.class).to(RabbitMQEventBus.class);
-
-        Multibinder.newSetBinder(binder(), EventBus.class)
-            .addBinding()
-            .to(EventBus.class);
 
         Multibinder.newSetBinder(binder(), RegistrationKey.Factory.class)
             .addBinding().to(MailboxIdRegistrationKey.Factory.class);
@@ -85,4 +84,26 @@ public class RabbitMQEventBusModule extends AbstractModule {
             .forClass(RabbitMQEventBus.class)
             .init(instance::start);
     }
+
+    @Provides
+    @Singleton
+    RabbitMQEventBus provideRabbitMQEventBus(NamingStrategy namingStrategy, 
Sender sender, ReceiverProvider receiverProvider, MailboxEventSerializer 
eventSerializer,
+                                         RetryBackoffConfiguration 
retryBackoff,
+                                         RoutingKeyConverter 
routingKeyConverter,
+                                         EventDeadLetters eventDeadLetters, 
MetricFactory metricFactory, ReactorRabbitMQChannelPool channelPool,
+                                         EventBusId eventBusId, 
RabbitMQConfiguration configuration) {
+        return new RabbitMQEventBus(namingStrategy, sender, receiverProvider, 
eventSerializer, retryBackoff, routingKeyConverter,
+            eventDeadLetters, metricFactory, channelPool, eventBusId, 
configuration);
+    }
+
+    @Provides
+    @Singleton
+    EventBus provideEventBus(RabbitMQEventBus rabbitMQEventBus) {
+        return rabbitMQEventBus;
+    }
+
+    @ProvidesIntoSet
+    EventBus registerEventBus(EventBus eventBus) {
+        return eventBus;
+    }
 }
diff --git 
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
 
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index c71c90a301..e68106e868 100644
--- 
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ 
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -24,6 +24,7 @@ import org.apache.james.event.json.MailboxEventSerializer;
 import org.apache.james.events.EventBus;
 import org.apache.james.events.EventListener;
 import org.apache.james.events.EventSerializer;
+import org.apache.james.events.EventSerializersAggregator;
 import org.apache.james.events.InVMEventBus;
 import org.apache.james.events.RetryBackoffConfiguration;
 import org.apache.james.events.delivery.EventDelivery;
@@ -44,25 +45,26 @@ import com.google.inject.multibindings.ProvidesIntoSet;
 public class DefaultEventModule extends AbstractModule {
     @Override
     protected void configure() {
-        bind(MailboxEventSerializer.class).in(Scopes.SINGLETON);
-        bind(EventSerializer.class).to(MailboxEventSerializer.class);
-
         bind(MailboxListenerFactory.class).in(Scopes.SINGLETON);
         bind(MailboxListenersLoaderImpl.class).in(Scopes.SINGLETON);
         bind(InVmEventDelivery.class).in(Scopes.SINGLETON);
         bind(InVMEventBus.class).in(Scopes.SINGLETON);
+        bind(MailboxEventSerializer.class).in(Scopes.SINGLETON);
+
+        bind(EventSerializer.class).to(EventSerializersAggregator.class);
 
         Multibinder.newSetBinder(binder(), 
GuiceProbe.class).addBinding().to(EventDeadLettersProbe.class);
         
bind(MailboxListenersLoader.class).to(MailboxListenersLoaderImpl.class);
         bind(EventDelivery.class).to(InVmEventDelivery.class);
         bind(EventBus.class).to(InVMEventBus.class);
 
-        Multibinder.newSetBinder(binder(), EventBus.class)
-            .addBinding()
-            .to(EventBus.class);
-
         
bind(RetryBackoffConfiguration.class).toInstance(RetryBackoffConfiguration.DEFAULT);
 
+        Multibinder.newSetBinder(binder(), EventSerializer.class)
+            .addBinding()
+            .to(MailboxEventSerializer.class);
+
+        Multibinder.newSetBinder(binder(), EventBus.class);
         Multibinder.newSetBinder(binder(), 
EventListener.GroupEventListener.class);
         Multibinder.newSetBinder(binder(), 
EventListener.ReactiveGroupEventListener.class);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to