This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit da774c0a0062ec55993eb126e7b2bf2069e1b062
Author: Gautier DI FOLCO <[email protected]>
AuthorDate: Wed Aug 28 16:05:52 2019 +0200

    JAMES-2813 Add cancel support in DistributedTaskManager
---
 .../distributed/RabbitMQWorkQueue.java             | 66 ++++++++++++++++++++--
 .../distributed/DistributedTaskManagerTest.java    | 55 +++++++++++++-----
 .../org/apache/james/task/TaskManagerContract.java |  6 +-
 3 files changed, 104 insertions(+), 23 deletions(-)

diff --git 
a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 
b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index e954c31..7b80e6d 100644
--- 
a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ 
b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -21,10 +21,11 @@
 package org.apache.james.task.eventsourcing.distributed;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.UUID;
 
 import javax.annotation.PreDestroy;
 
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.james.backend.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.backend.rabbitmq.SimpleConnectionPool;
 import org.apache.james.lifecycle.api.Startable;
@@ -42,7 +43,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Delivery;
+
+import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.UnicastProcessor;
 import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.BindingSpecification;
@@ -50,16 +55,22 @@ import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 
 public class RabbitMQWorkQueue implements WorkQueue, Startable {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RabbitMQWorkQueue.class);
 
-    static final Integer MAX_CHANNELS_NUMBER = 1;
+    // Need at least one by receivers plus a shared one for senders
+    static final Integer MAX_CHANNELS_NUMBER = 5;
     static final String EXCHANGE_NAME = "taskManagerWorkQueueExchange";
     static final String QUEUE_NAME = "taskManagerWorkQueue";
     static final String ROUTING_KEY = "taskManagerWorkQueueRoutingKey";
+
+    static final String CANCEL_REQUESTS_EXCHANGE_NAME = 
"taskManagerCancelRequestsExchange";
+    static final String CANCEL_REQUESTS_ROUTING_KEY = 
"taskManagerCancelRequestsRoutingKey";
+    private static final String CANCEL_REQUESTS_QUEUE_NAME_PREFIX = 
"taskManagerCancelRequestsQueue";
     public static final String TASK_ID = "taskId";
 
     private final TaskManagerWorker worker;
@@ -68,6 +79,8 @@ public class RabbitMQWorkQueue implements WorkQueue, 
Startable {
     private final JsonTaskSerializer taskSerializer;
     private Sender sender;
     private RabbitMQExclusiveConsumer receiver;
+    private UnicastProcessor<TaskId> sendCancelRequestsQueue;
+    private Disposable sendCancelRequestsQueueHandle;
 
     public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool 
simpleConnectionPool, JsonTaskSerializer taskSerializer) {
         this.worker = worker;
@@ -77,10 +90,12 @@ public class RabbitMQWorkQueue implements WorkQueue, 
Startable {
     }
 
     public void start() {
-        sender = channelPool.createSender();
-
-        receiver = new RabbitMQExclusiveConsumer(new 
ReceiverOptions().connectionMono(connectionMono));
+        startWorkqueue();
+        listenToCancelRequests();
+    }
 
+    private void startWorkqueue() {
+        sender = channelPool.createSender();
         
sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
         
sender.declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block();
         sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, 
QUEUE_NAME)).block();
@@ -89,6 +104,7 @@ public class RabbitMQWorkQueue implements WorkQueue, 
Startable {
     }
 
     private void consumeWorkqueue() {
+        receiver = new RabbitMQExclusiveConsumer(new 
ReceiverOptions().connectionMono(connectionMono));
         receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions())
             .subscribeOn(Schedulers.elastic())
             .flatMap(this::executeTask)
@@ -112,6 +128,43 @@ public class RabbitMQWorkQueue implements WorkQueue, 
Startable {
         }
     }
 
+    void listenToCancelRequests() {
+        Sender cancelRequestSender = channelPool.createSender();
+        String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + 
UUID.randomUUID().toString();
+
+        
cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
+        
cancelRequestSender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
+        
cancelRequestSender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME,
 CANCEL_REQUESTS_ROUTING_KEY, queueName)).block();
+        registerCancelRequestsListener(queueName);
+
+        sendCancelRequestsQueue = UnicastProcessor.create();
+        sendCancelRequestsQueueHandle = cancelRequestSender
+            .send(sendCancelRequestsQueue.map(this::makeCancelRequestMessage))
+            .subscribeOn(Schedulers.elastic())
+            .subscribe();
+    }
+
+    private void registerCancelRequestsListener(String queueName) {
+        RabbitFlux
+            .createReceiver(new 
ReceiverOptions().connectionMono(connectionMono))
+            .consumeAutoAck(queueName)
+            .subscribeOn(Schedulers.elastic())
+            .map(this::readCancelRequestMessage)
+            .doOnNext(worker::cancelTask)
+            .subscribe();
+    }
+
+    private TaskId readCancelRequestMessage(Delivery delivery) {
+        String message = new String(delivery.getBody(), 
StandardCharsets.UTF_8);
+        return TaskId.fromString(message);
+    }
+
+    private OutboundMessage makeCancelRequestMessage(TaskId taskId) {
+        byte[] payload = taskId.asString().getBytes(StandardCharsets.UTF_8);
+        AMQP.BasicProperties basicProperties = new 
AMQP.BasicProperties.Builder().build();
+        return new OutboundMessage(CANCEL_REQUESTS_EXCHANGE_NAME, 
CANCEL_REQUESTS_ROUTING_KEY, basicProperties, payload);
+    }
+
     @Override
     public void submit(TaskWithId taskWithId) {
         try {
@@ -128,12 +181,13 @@ public class RabbitMQWorkQueue implements WorkQueue, 
Startable {
 
     @Override
     public void cancel(TaskId taskId) {
-         throw new NotImplementedException("Cancel not done yet");
+        sendCancelRequestsQueue.onNext(taskId);
     }
 
     @Override
     @PreDestroy
     public void close() {
+        
Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
         channelPool.close();
     }
 }
diff --git 
a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
 
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 5db982d..e38edd9 100644
--- 
a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ 
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backend.rabbitmq.RabbitMQExtension;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -54,16 +55,15 @@ import 
org.apache.james.task.eventsourcing.WorkQueueSupplier;
 import 
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
 import 
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO;
 import 
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
+
+import com.github.steveash.guavate.Guavate;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import com.github.steveash.guavate.Guavate;
-
 @ExtendWith(CountDownLatchExtension.class)
 class DistributedTaskManagerTest implements TaskManagerContract {
 
@@ -131,17 +131,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
     }
 
     @Test
-    @Disabled("Cancelling is not supported yet")
-    public void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) {
-    }
-
-    @Test
-    @Disabled("Cancelling is not supported yet")
-    public void getStatusShouldBeCancelledWhenCancelled(CountDownLatch 
countDownLatch) {
-    }
-
-    @Test
-    void givenTwoTaskManagersAndTwoTasksOnlyOneTaskShouldRunAtTheSameTime() {
+    void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() 
throws InterruptedException {
         CountDownLatch waitingForFirstTaskLatch = new CountDownLatch(1);
 
         try (EventSourcingTaskManager taskManager1 = taskManager();
@@ -182,4 +172,41 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
             }
         }
     }
+
+    @Test
+    void 
givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch
 countDownLatch) {
+        TaskManager taskManager1 = taskManager(HOSTNAME);
+        TaskManager taskManager2 = taskManager(HOSTNAME_2);
+        TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> {
+            countDownLatch.await();
+            return Task.Result.COMPLETED;
+        }));
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, 
taskManager1);
+        Hostname runningNode = 
taskManager1.getExecutionDetails(id).getRanNode().get();
+
+        Pair<Hostname, TaskManager> remoteTaskManager = 
getOtherTaskManager(runningNode, Pair.of(HOSTNAME, taskManager1), 
Pair.of(HOSTNAME_2, taskManager2));
+        remoteTaskManager.getValue().cancel(id);
+
+        awaitAtMostFiveSeconds.untilAsserted(() ->
+            assertThat(taskManager1.getExecutionDetails(id).getStatus())
+                .isIn(TaskManager.Status.CANCELLED, 
TaskManager.Status.CANCEL_REQUESTED));
+
+        countDownLatch.countDown();
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED, 
taskManager1);
+        assertThat(taskManager1.getExecutionDetails(id).getStatus())
+            .isEqualTo(TaskManager.Status.CANCELLED);
+
+        
assertThat(taskManager1.getExecutionDetails(id).getCancelRequestedNode())
+            .contains(remoteTaskManager.getKey());
+    }
+
+    private Pair<Hostname, TaskManager> getOtherTaskManager(Hostname node, 
Pair<Hostname, TaskManager> taskManager1, Pair<Hostname, TaskManager>  
taskManager2) {
+        if (node.equals(taskManager1.getKey())) {
+            return taskManager2;
+        } else {
+            return taskManager1;
+        }
+    }
 }
diff --git 
a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java 
b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
index 31def77..2fa605d 100644
--- a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -141,15 +141,15 @@ public interface TaskManagerContract {
         awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, 
taskManager);
         taskManager.cancel(id);
 
-        assertThat(taskManager.getExecutionDetails(id).getStatus())
-            .isIn(TaskManager.Status.CANCELLED, 
TaskManager.Status.CANCEL_REQUESTED);
+        awaitAtMostFiveSeconds.untilAsserted(() ->
+            assertThat(taskManager.getExecutionDetails(id).getStatus())
+                .isIn(TaskManager.Status.CANCELLED, 
TaskManager.Status.CANCEL_REQUESTED));
 
         countDownLatch.countDown();
 
         awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED, taskManager);
         assertThat(taskManager.getExecutionDetails(id).getStatus())
             .isEqualTo(TaskManager.Status.CANCELLED);
-
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to