This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7098dce4626c8a2c43e01a8074c179a0b917936c
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Mon Sep 16 14:00:16 2019 +0700

    JAMES-2813 MemoryWorkQueue should rely on reactor primitive
    
    The memory work queue consumed in a blocking fashion a LinkedBlockingQueue.
    This operation will still be ongoing upon thread shutdown and will result in
    an exception being thrown and logged by default reactor consumer, which is 
really verbose.
    
    We should rather use the reactor primitives for cross-thread interactions.
---
 .../org/apache/james/task/TaskManagerContract.java    |  3 +--
 .../java/org/apache/james/task/MemoryWorkQueue.java   | 19 +++++++------------
 2 files changed, 8 insertions(+), 14 deletions(-)

diff --git 
a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
 
b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
index 6078752..44feaba 100644
--- 
a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ 
b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -242,8 +242,7 @@ public interface TaskManagerContract {
                 waitingForResultLatch.await();
                 return Task.Result.COMPLETED;
             }));
-        TaskId waitingId = taskManager.submit(
-            new CompletedTask());
+        TaskId waitingId = taskManager.submit(new CompletedTask());
 
         latch1.await();
 
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java
index fe2f80b..a782f9b 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java
@@ -20,25 +20,24 @@
 package org.apache.james.task;
 
 import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.UnicastProcessor;
 import reactor.core.scheduler.Schedulers;
 
 public class MemoryWorkQueue implements WorkQueue {
-
     private final TaskManagerWorker worker;
     private final Disposable subscription;
-    private final LinkedBlockingQueue<TaskWithId> tasks;
+    private final UnicastProcessor<TaskWithId> tasks;
 
     public MemoryWorkQueue(TaskManagerWorker worker) {
         this.worker = worker;
-        this.tasks = new LinkedBlockingQueue<>();
-        this.subscription = Mono.fromCallable(tasks::take)
-            .repeat()
+        this.tasks = UnicastProcessor.create();
+        this.subscription = tasks
             .subscribeOn(Schedulers.boundedElastic())
-            .flatMapSequential(this::dispatchTaskToWorker)
+            .limitRate(1)
+            .concatMap(this::dispatchTaskToWorker)
             .subscribe();
     }
 
@@ -47,11 +46,7 @@ public class MemoryWorkQueue implements WorkQueue {
     }
 
     public void submit(TaskWithId taskWithId) {
-        try {
-            tasks.put(taskWithId);
-        } catch (InterruptedException e) {
-            worker.cancelTask(taskWithId.getId());
-        }
+        tasks.onNext(taskWithId);
     }
 
     public void cancel(TaskId taskId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to