This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5ff63262face2880e50cb1c775fcc9b1c13fc320 Author: Gautier DI FOLCO <[email protected]> AuthorDate: Tue Aug 27 16:10:42 2019 +0200 JAMES-2813 Add RabbitMQTerminationSubscriber --- .../apache/james/DistributedTaskManagerModule.java | 4 +- .../distributed/RabbitMQTerminationSubscriber.java | 146 +++++++++++++++++++++ .../distributed/DistributedTaskManagerTest.java | 15 ++- .../RabbitMQTerminationSubscriberTest.java | 73 +++++++++++ 4 files changed, 230 insertions(+), 8 deletions(-) diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java index 7dd21ef..76197db 100644 --- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java +++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java @@ -23,11 +23,11 @@ package org.apache.james; import org.apache.james.modules.server.HostnameModule; import org.apache.james.task.TaskManager; import org.apache.james.task.eventsourcing.EventSourcingTaskManager; -import org.apache.james.task.eventsourcing.MemoryTerminationSubscriber; import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection; import org.apache.james.task.eventsourcing.TerminationSubscriber; import org.apache.james.task.eventsourcing.WorkQueueSupplier; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection; +import org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber; import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueSupplier; import com.google.inject.AbstractModule; @@ -43,7 +43,7 @@ public class DistributedTaskManagerModule extends AbstractModule { bind(WorkQueueSupplier.class).in(Scopes.SINGLETON); bind(TaskExecutionDetailsProjection.class).to(CassandraTaskExecutionDetailsProjection.class); bind(TerminationSubscriber.class).in(Scopes.SINGLETON); - bind(TerminationSubscriber.class).toInstance(new MemoryTerminationSubscriber()); + bind(TerminationSubscriber.class).to(RabbitMQTerminationSubscriber.class); bind(TaskManager.class).to(EventSourcingTaskManager.class); bind(WorkQueueSupplier.class).to(RabbitMQWorkQueueSupplier.class); } diff --git a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java new file mode 100644 index 0000000..a3a8bb1 --- /dev/null +++ b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -0,0 +1,146 @@ +/** + * ************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ***************************************************************/ + +package org.apache.james.task.eventsourcing.distributed; + +import java.io.Closeable; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.UUID; + +import javax.annotation.PreDestroy; +import javax.inject.Inject; + +import org.apache.james.backend.rabbitmq.ReactorRabbitMQChannelPool; +import org.apache.james.backend.rabbitmq.SimpleConnectionPool; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer; +import org.apache.james.lifecycle.api.Startable; +import org.apache.james.task.eventsourcing.TerminationSubscriber; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.github.fge.lambdas.Throwing; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Delivery; +import reactor.core.Disposable; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Mono; +import reactor.core.publisher.UnicastProcessor; +import reactor.core.scheduler.Schedulers; +import reactor.rabbitmq.BindingSpecification; +import reactor.rabbitmq.ExchangeSpecification; +import reactor.rabbitmq.OutboundMessage; +import reactor.rabbitmq.QueueSpecification; +import reactor.rabbitmq.RabbitFlux; +import reactor.rabbitmq.Receiver; +import reactor.rabbitmq.ReceiverOptions; +import reactor.rabbitmq.Sender; +import reactor.rabbitmq.SenderOptions; + +public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class); + private static final Integer MAX_CHANNELS_NUMBER = 1; + private static final String EXCHANGE_NAME = "terminationSubscriberExchange"; + private static final String QUEUE_NAME_PREFIX = "terminationSubscriber"; + private static final String ROUTING_KEY = "terminationSubscriberRoutingKey"; + + private final JsonEventSerializer serializer; + private final Mono<Connection> connectionMono; + private final ReactorRabbitMQChannelPool channelPool; + private final String queueName; + private UnicastProcessor<OutboundMessage> sendQueue; + private DirectProcessor<Event> listener; + private Disposable sendQueueHandle; + private Disposable listenQueueHandle; + + @Inject + public RabbitMQTerminationSubscriber(SimpleConnectionPool simpleConnectionPool, JsonEventSerializer serializer) { + this.serializer = serializer; + this.connectionMono = simpleConnectionPool.getResilientConnection(); + this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER); + this.queueName = QUEUE_NAME_PREFIX + UUID.randomUUID().toString(); + } + + public void start() { + Sender sender = RabbitFlux.createSender(new SenderOptions() + .connectionMono(connectionMono) + .channelPool(channelPool) + .resourceManagementChannelMono( + connectionMono.map(Throwing.function(Connection::createChannel)).cache())); + + sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); + sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block(); + sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, queueName)).block(); + sendQueue = UnicastProcessor.create(); + sendQueueHandle = sender + .send(sendQueue) + .subscribeOn(Schedulers.elastic()) + .subscribe(); + + Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono)); + listener = DirectProcessor.create(); + listenQueueHandle = receiver + .consumeAutoAck(queueName) + .subscribeOn(Schedulers.elastic()) + .concatMap(this::toEvent) + .subscribe(listener::onNext); + } + + @Override + public void addEvent(Event event) { + try { + byte[] payload = serializer.serialize(event).getBytes(StandardCharsets.UTF_8); + AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().build(); + OutboundMessage message = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties, payload); + sendQueue.onNext(message); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @Override + public Publisher<Event> listenEvents() { + return listener + .share(); + } + + private Mono<Event> toEvent(Delivery delivery) { + String message = new String(delivery.getBody(), StandardCharsets.UTF_8); + try { + Event event = serializer.deserialize(message); + return Mono.just(event); + } catch (Exception e) { + LOGGER.error("Unable to deserialize '{}'", message, e); + return Mono.empty(); + } + } + + @Override + @PreDestroy + public void close() { + Optional.ofNullable(sendQueueHandle).ifPresent(Disposable::dispose); + Optional.ofNullable(listenQueueHandle).ifPresent(Disposable::dispose); + channelPool.close(); + } +} diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index ced6652..5db982d 100644 --- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -34,6 +34,7 @@ import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension; import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule; +import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer; import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; import org.apache.james.server.task.json.JsonTaskSerializer; import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore; @@ -48,7 +49,6 @@ import org.apache.james.task.TaskManager; import org.apache.james.task.TaskManagerContract; import org.apache.james.task.eventsourcing.EventSourcingTaskManager; import org.apache.james.task.eventsourcing.Hostname; -import org.apache.james.task.eventsourcing.MemoryTerminationSubscriber; import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection; import org.apache.james.task.eventsourcing.WorkQueueSupplier; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection; @@ -76,6 +76,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { private static final Hostname HOSTNAME = new Hostname("foo"); private static final Hostname HOSTNAME_2 = new Hostname("bar"); private static final Set<EventDTOModule> MODULES = TasksSerializationModule.MODULES.apply(TASK_SERIALIZER).stream().collect(Guavate.toImmutableSet()); + private static final JsonEventSerializer EVENT_SERIALIZER = new JsonEventSerializer(MODULES); static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension( CassandraModule.aggregateModules( @@ -104,17 +105,19 @@ class DistributedTaskManagerTest implements TaskManagerContract { } public EventSourcingTaskManager taskManager() { - return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, HOSTNAME); + return taskManager(HOSTNAME); } private EventSourcingTaskManager taskManager(Hostname hostname) { - return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, hostname, new MemoryTerminationSubscriber()); + RabbitMQTerminationSubscriber terminationSubscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitConnectionPool(), EVENT_SERIALIZER); + terminationSubscriber.start(); + return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, hostname, terminationSubscriber); } @Test void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() { try (EventSourcingTaskManager taskManager1 = taskManager(); - EventSourcingTaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, HOSTNAME_2)) { + EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) { TaskId taskId = taskManager1.submit(new CompletedTask()); Awaitility.await() .atMost(Duration.FIVE_SECONDS) @@ -142,7 +145,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { CountDownLatch waitingForFirstTaskLatch = new CountDownLatch(1); try (EventSourcingTaskManager taskManager1 = taskManager(); - EventSourcingTaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, HOSTNAME_2)) { + EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) { taskManager1.submit(new MemoryReferenceTask(() -> { waitingForFirstTaskLatch.await(); @@ -164,7 +167,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws InterruptedException { try (EventSourcingTaskManager taskManager1 = taskManager()) { Thread.sleep(100); - try (EventSourcingTaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, HOSTNAME_2)) { + try (EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) { TaskId taskId = taskManager2.submit(new CompletedTask()); diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java new file mode 100644 index 0000000..fe8690d --- /dev/null +++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java @@ -0,0 +1,73 @@ +/** + * ************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ***************************************************************/ + +package org.apache.james.task.eventsourcing.distributed; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Set; + +import org.apache.james.backend.rabbitmq.RabbitMQExtension; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer; +import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; +import org.apache.james.server.task.json.JsonTaskSerializer; +import org.apache.james.task.eventsourcing.TerminationSubscriber; +import org.apache.james.task.eventsourcing.TerminationSubscriberContract; + +import com.github.steveash.guavate.Guavate; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract { + private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer(); + private static final Set<EventDTOModule> MODULES = TasksSerializationModule.MODULES.apply(TASK_SERIALIZER).stream().collect(Guavate.toImmutableSet()); + private static final JsonEventSerializer SERIALIZER = new JsonEventSerializer(MODULES); + + @RegisterExtension + static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ(); + + @Override + public TerminationSubscriber subscriber() { + RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitConnectionPool(), SERIALIZER); + subscriber.start(); + return subscriber; + } + + @Test + void givenTwoTerminationSubscribersWhenAnEventIsSentItShouldBeReceivedByBoth() { + TerminationSubscriber subscriber1 = subscriber(); + TerminationSubscriber subscriber2 = subscriber(); + + sendEvents(subscriber1, COMPLETED_EVENT); + + List<List<Event>> listenedEvents = Flux.just(subscriber1, subscriber2) + .subscribeOn(Schedulers.elastic()) + .flatMap(this::collectEvents) + .collectList() + .block(); + assertThat(listenedEvents).hasSize(2); + assertThat(listenedEvents.get(0)).containsExactly(COMPLETED_EVENT); + assertThat(listenedEvents.get(1)).containsExactly(COMPLETED_EVENT); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
