This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit da774c0a0062ec55993eb126e7b2bf2069e1b062 Author: Gautier DI FOLCO <[email protected]> AuthorDate: Wed Aug 28 16:05:52 2019 +0200 JAMES-2813 Add cancel support in DistributedTaskManager --- .../distributed/RabbitMQWorkQueue.java | 66 ++++++++++++++++++++-- .../distributed/DistributedTaskManagerTest.java | 55 +++++++++++++----- .../org/apache/james/task/TaskManagerContract.java | 6 +- 3 files changed, 104 insertions(+), 23 deletions(-) diff --git a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index e954c31..7b80e6d 100644 --- a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -21,10 +21,11 @@ package org.apache.james.task.eventsourcing.distributed; import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.UUID; import javax.annotation.PreDestroy; -import org.apache.commons.lang3.NotImplementedException; import org.apache.james.backend.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.backend.rabbitmq.SimpleConnectionPool; import org.apache.james.lifecycle.api.Startable; @@ -42,7 +43,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableMap; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Delivery; + +import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.publisher.UnicastProcessor; import reactor.core.scheduler.Schedulers; import reactor.rabbitmq.AcknowledgableDelivery; import reactor.rabbitmq.BindingSpecification; @@ -50,16 +55,22 @@ import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.ExchangeSpecification; import reactor.rabbitmq.OutboundMessage; import reactor.rabbitmq.QueueSpecification; +import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.ReceiverOptions; import reactor.rabbitmq.Sender; public class RabbitMQWorkQueue implements WorkQueue, Startable { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class); - static final Integer MAX_CHANNELS_NUMBER = 1; + // Need at least one by receivers plus a shared one for senders + static final Integer MAX_CHANNELS_NUMBER = 5; static final String EXCHANGE_NAME = "taskManagerWorkQueueExchange"; static final String QUEUE_NAME = "taskManagerWorkQueue"; static final String ROUTING_KEY = "taskManagerWorkQueueRoutingKey"; + + static final String CANCEL_REQUESTS_EXCHANGE_NAME = "taskManagerCancelRequestsExchange"; + static final String CANCEL_REQUESTS_ROUTING_KEY = "taskManagerCancelRequestsRoutingKey"; + private static final String CANCEL_REQUESTS_QUEUE_NAME_PREFIX = "taskManagerCancelRequestsQueue"; public static final String TASK_ID = "taskId"; private final TaskManagerWorker worker; @@ -68,6 +79,8 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { private final JsonTaskSerializer taskSerializer; private Sender sender; private RabbitMQExclusiveConsumer receiver; + private UnicastProcessor<TaskId> sendCancelRequestsQueue; + private Disposable sendCancelRequestsQueueHandle; public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool simpleConnectionPool, JsonTaskSerializer taskSerializer) { this.worker = worker; @@ -77,10 +90,12 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { } public void start() { - sender = channelPool.createSender(); - - receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono)); + startWorkqueue(); + listenToCancelRequests(); + } + private void startWorkqueue() { + sender = channelPool.createSender(); sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); sender.declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block(); sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block(); @@ -89,6 +104,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { } private void consumeWorkqueue() { + receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono)); receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions()) .subscribeOn(Schedulers.elastic()) .flatMap(this::executeTask) @@ -112,6 +128,43 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { } } + void listenToCancelRequests() { + Sender cancelRequestSender = channelPool.createSender(); + String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + UUID.randomUUID().toString(); + + cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block(); + cancelRequestSender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block(); + cancelRequestSender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, queueName)).block(); + registerCancelRequestsListener(queueName); + + sendCancelRequestsQueue = UnicastProcessor.create(); + sendCancelRequestsQueueHandle = cancelRequestSender + .send(sendCancelRequestsQueue.map(this::makeCancelRequestMessage)) + .subscribeOn(Schedulers.elastic()) + .subscribe(); + } + + private void registerCancelRequestsListener(String queueName) { + RabbitFlux + .createReceiver(new ReceiverOptions().connectionMono(connectionMono)) + .consumeAutoAck(queueName) + .subscribeOn(Schedulers.elastic()) + .map(this::readCancelRequestMessage) + .doOnNext(worker::cancelTask) + .subscribe(); + } + + private TaskId readCancelRequestMessage(Delivery delivery) { + String message = new String(delivery.getBody(), StandardCharsets.UTF_8); + return TaskId.fromString(message); + } + + private OutboundMessage makeCancelRequestMessage(TaskId taskId) { + byte[] payload = taskId.asString().getBytes(StandardCharsets.UTF_8); + AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().build(); + return new OutboundMessage(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, basicProperties, payload); + } + @Override public void submit(TaskWithId taskWithId) { try { @@ -128,12 +181,13 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { @Override public void cancel(TaskId taskId) { - throw new NotImplementedException("Cancel not done yet"); + sendCancelRequestsQueue.onNext(taskId); } @Override @PreDestroy public void close() { + Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose); channelPool.close(); } } diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index 5db982d..e38edd9 100644 --- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backend.rabbitmq.RabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; @@ -54,16 +55,15 @@ import org.apache.james.task.eventsourcing.WorkQueueSupplier; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule; + +import com.github.steveash.guavate.Guavate; import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; -import com.github.steveash.guavate.Guavate; - @ExtendWith(CountDownLatchExtension.class) class DistributedTaskManagerTest implements TaskManagerContract { @@ -131,17 +131,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - @Disabled("Cancelling is not supported yet") - public void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) { - } - - @Test - @Disabled("Cancelling is not supported yet") - public void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) { - } - - @Test - void givenTwoTaskManagersAndTwoTasksOnlyOneTaskShouldRunAtTheSameTime() { + void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() throws InterruptedException { CountDownLatch waitingForFirstTaskLatch = new CountDownLatch(1); try (EventSourcingTaskManager taskManager1 = taskManager(); @@ -182,4 +172,41 @@ class DistributedTaskManagerTest implements TaskManagerContract { } } } + + @Test + void givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch countDownLatch) { + TaskManager taskManager1 = taskManager(HOSTNAME); + TaskManager taskManager2 = taskManager(HOSTNAME_2); + TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> { + countDownLatch.await(); + return Task.Result.COMPLETED; + })); + + awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, taskManager1); + Hostname runningNode = taskManager1.getExecutionDetails(id).getRanNode().get(); + + Pair<Hostname, TaskManager> remoteTaskManager = getOtherTaskManager(runningNode, Pair.of(HOSTNAME, taskManager1), Pair.of(HOSTNAME_2, taskManager2)); + remoteTaskManager.getValue().cancel(id); + + awaitAtMostFiveSeconds.untilAsserted(() -> + assertThat(taskManager1.getExecutionDetails(id).getStatus()) + .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED)); + + countDownLatch.countDown(); + + awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED, taskManager1); + assertThat(taskManager1.getExecutionDetails(id).getStatus()) + .isEqualTo(TaskManager.Status.CANCELLED); + + assertThat(taskManager1.getExecutionDetails(id).getCancelRequestedNode()) + .contains(remoteTaskManager.getKey()); + } + + private Pair<Hostname, TaskManager> getOtherTaskManager(Hostname node, Pair<Hostname, TaskManager> taskManager1, Pair<Hostname, TaskManager> taskManager2) { + if (node.equals(taskManager1.getKey())) { + return taskManager2; + } else { + return taskManager1; + } + } } diff --git a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java index 31def77..2fa605d 100644 --- a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java +++ b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java @@ -141,15 +141,15 @@ public interface TaskManagerContract { awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, taskManager); taskManager.cancel(id); - assertThat(taskManager.getExecutionDetails(id).getStatus()) - .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED); + awaitAtMostFiveSeconds.untilAsserted(() -> + assertThat(taskManager.getExecutionDetails(id).getStatus()) + .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED)); countDownLatch.countDown(); awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED, taskManager); assertThat(taskManager.getExecutionDetails(id).getStatus()) .isEqualTo(TaskManager.Status.CANCELLED); - } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
