This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 23243b83b7 JAMES-4080 Refactor EventDeadLetters redeliver code to work with multiple serializers (#2465) 23243b83b7 is described below commit 23243b83b776cab8a0cd6d11963622a3ea683ce7 Author: hungphan227 <45198168+hungphan...@users.noreply.github.com> AuthorDate: Mon Oct 28 10:30:49 2024 +0700 JAMES-4080 Refactor EventDeadLetters redeliver code to work with multiple serializers (#2465) --- .../james/events/EventSerializersAggregator.java | 70 ++++++++++++++++++++++ .../org/apache/james/events/RabbitMQEventBus.java | 2 - .../james/modules/event/JMAPEventBusModule.java | 6 ++ .../modules/event/RabbitMQEventBusModule.java | 43 +++++++++---- .../james/modules/mailbox/DefaultEventModule.java | 16 ++--- 5 files changed, 117 insertions(+), 20 deletions(-) diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java b/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java new file mode 100644 index 0000000000..ca2e6b383d --- /dev/null +++ b/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java @@ -0,0 +1,70 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.events; + +import java.util.Optional; +import java.util.Set; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +@Singleton +public class EventSerializersAggregator implements EventSerializer { + private final Set<EventSerializer> allEventSerializers; + + @Inject + public EventSerializersAggregator(Set<EventSerializer> allEventSerializers) { + this.allEventSerializers = allEventSerializers; + } + + @Override + public String toJson(Event event) { + return allEventSerializers.stream() + .map(eventSerializer -> serialize(event, eventSerializer)) + .flatMap(Optional::stream) + .findFirst() + .orElseThrow(() -> new RuntimeException("Could not serialize event: " + event)); + } + + @Override + public Event asEvent(String serialized) { + return allEventSerializers.stream() + .map(eventSerializer -> deserialize(serialized, eventSerializer)) + .flatMap(Optional::stream) + .findFirst() + .orElseThrow(() -> new RuntimeException("Could not deserialize event: " + serialized)); + } + + private Optional<String> serialize(Event event, EventSerializer eventSerializer) { + try { + return Optional.of(eventSerializer.toJson(event)); + } catch (Exception ex) { + return Optional.empty(); + } + } + + private Optional<Event> deserialize(String json, EventSerializer eventSerializer) { + try { + return Optional.of(eventSerializer.asEvent(json)); + } catch (Exception ex) { + return Optional.empty(); + } + } +} diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java index f115fd5eca..a81ec7a8a3 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Set; import jakarta.annotation.PreDestroy; -import jakarta.inject.Inject; import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; @@ -62,7 +61,6 @@ public class RabbitMQEventBus implements EventBus, Startable { private KeyRegistrationHandler keyRegistrationHandler; private EventDispatcher eventDispatcher; - @Inject public RabbitMQEventBus(NamingStrategy namingStrategy, Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer, RetryBackoffConfiguration retryBackoff, RoutingKeyConverter routingKeyConverter, diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java index ec41bf49ec..3a50397202 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java @@ -32,6 +32,7 @@ import org.apache.james.events.EventBus; import org.apache.james.events.EventBusId; import org.apache.james.events.EventBusReconnectionHandler; import org.apache.james.events.EventDeadLetters; +import org.apache.james.events.EventSerializer; import org.apache.james.events.KeyReconnectionHandler; import org.apache.james.events.RabbitEventBusConsumerHealthCheck; import org.apache.james.events.RabbitMQEventBus; @@ -120,4 +121,9 @@ public class JMAPEventBusModule extends AbstractModule { EventBus registerEventBus(@Named(InjectionKeys.JMAP) EventBus eventBus) { return eventBus; } + + @ProvidesIntoSet + EventSerializer registerEventSerializers(JmapEventSerializer eventSerializer) { + return eventSerializer; + } } diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java index 599eb666a1..37522182a1 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java @@ -21,13 +21,16 @@ package org.apache.james.modules.event; import static org.apache.james.events.NamingStrategy.MAILBOX_EVENT_NAMING_STRATEGY; +import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; +import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.event.json.MailboxEventSerializer; import org.apache.james.events.EventBus; import org.apache.james.events.EventBusId; import org.apache.james.events.EventBusReconnectionHandler; -import org.apache.james.events.EventSerializer; +import org.apache.james.events.EventDeadLetters; import org.apache.james.events.KeyReconnectionHandler; import org.apache.james.events.NamingStrategy; import org.apache.james.events.RabbitEventBusConsumerHealthCheck; @@ -35,29 +38,25 @@ import org.apache.james.events.RabbitMQEventBus; import org.apache.james.events.RabbitMQMailboxEventBusDeadLetterQueueHealthCheck; import org.apache.james.events.RegistrationKey; import org.apache.james.events.RetryBackoffConfiguration; +import org.apache.james.events.RoutingKeyConverter; import org.apache.james.mailbox.events.MailboxIdRegistrationKey; +import org.apache.james.metrics.api.MetricFactory; import org.apache.james.utils.InitializationOperation; import org.apache.james.utils.InitilizationOperationBuilder; import com.google.inject.AbstractModule; -import com.google.inject.Scopes; +import com.google.inject.Provides; +import com.google.inject.Singleton; import com.google.inject.multibindings.Multibinder; import com.google.inject.multibindings.ProvidesIntoSet; +import reactor.rabbitmq.Sender; + public class RabbitMQEventBusModule extends AbstractModule { @Override protected void configure() { - bind(MailboxEventSerializer.class).in(Scopes.SINGLETON); - bind(EventSerializer.class).to(MailboxEventSerializer.class); - bind(NamingStrategy.class).toInstance(MAILBOX_EVENT_NAMING_STRATEGY); - bind(RabbitMQEventBus.class).in(Scopes.SINGLETON); - bind(EventBus.class).to(RabbitMQEventBus.class); - - Multibinder.newSetBinder(binder(), EventBus.class) - .addBinding() - .to(EventBus.class); Multibinder.newSetBinder(binder(), RegistrationKey.Factory.class) .addBinding().to(MailboxIdRegistrationKey.Factory.class); @@ -85,4 +84,26 @@ public class RabbitMQEventBusModule extends AbstractModule { .forClass(RabbitMQEventBus.class) .init(instance::start); } + + @Provides + @Singleton + RabbitMQEventBus provideRabbitMQEventBus(NamingStrategy namingStrategy, Sender sender, ReceiverProvider receiverProvider, MailboxEventSerializer eventSerializer, + RetryBackoffConfiguration retryBackoff, + RoutingKeyConverter routingKeyConverter, + EventDeadLetters eventDeadLetters, MetricFactory metricFactory, ReactorRabbitMQChannelPool channelPool, + EventBusId eventBusId, RabbitMQConfiguration configuration) { + return new RabbitMQEventBus(namingStrategy, sender, receiverProvider, eventSerializer, retryBackoff, routingKeyConverter, + eventDeadLetters, metricFactory, channelPool, eventBusId, configuration); + } + + @Provides + @Singleton + EventBus provideEventBus(RabbitMQEventBus rabbitMQEventBus) { + return rabbitMQEventBus; + } + + @ProvidesIntoSet + EventBus registerEventBus(EventBus eventBus) { + return eventBus; + } } diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java index c71c90a301..e68106e868 100644 --- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java +++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java @@ -24,6 +24,7 @@ import org.apache.james.event.json.MailboxEventSerializer; import org.apache.james.events.EventBus; import org.apache.james.events.EventListener; import org.apache.james.events.EventSerializer; +import org.apache.james.events.EventSerializersAggregator; import org.apache.james.events.InVMEventBus; import org.apache.james.events.RetryBackoffConfiguration; import org.apache.james.events.delivery.EventDelivery; @@ -44,25 +45,26 @@ import com.google.inject.multibindings.ProvidesIntoSet; public class DefaultEventModule extends AbstractModule { @Override protected void configure() { - bind(MailboxEventSerializer.class).in(Scopes.SINGLETON); - bind(EventSerializer.class).to(MailboxEventSerializer.class); - bind(MailboxListenerFactory.class).in(Scopes.SINGLETON); bind(MailboxListenersLoaderImpl.class).in(Scopes.SINGLETON); bind(InVmEventDelivery.class).in(Scopes.SINGLETON); bind(InVMEventBus.class).in(Scopes.SINGLETON); + bind(MailboxEventSerializer.class).in(Scopes.SINGLETON); + + bind(EventSerializer.class).to(EventSerializersAggregator.class); Multibinder.newSetBinder(binder(), GuiceProbe.class).addBinding().to(EventDeadLettersProbe.class); bind(MailboxListenersLoader.class).to(MailboxListenersLoaderImpl.class); bind(EventDelivery.class).to(InVmEventDelivery.class); bind(EventBus.class).to(InVMEventBus.class); - Multibinder.newSetBinder(binder(), EventBus.class) - .addBinding() - .to(EventBus.class); - bind(RetryBackoffConfiguration.class).toInstance(RetryBackoffConfiguration.DEFAULT); + Multibinder.newSetBinder(binder(), EventSerializer.class) + .addBinding() + .to(MailboxEventSerializer.class); + + Multibinder.newSetBinder(binder(), EventBus.class); Multibinder.newSetBinder(binder(), EventListener.GroupEventListener.class); Multibinder.newSetBinder(binder(), EventListener.ReactiveGroupEventListener.class); } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org