This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 87ddafc13d78605814c486178b39037df42ca3a0
Author: Michael André Pearce <michael.andre.pea...@me.com>
AuthorDate: Wed Jan 23 09:59:15 2019 +0000

    ARTEMIS-2236 Address Latency Impact caused by ARTEMIS-1451
    
    Readdress ARTEMIS-1451 concern of sync blocks, remove synchronization by 
simplifying original code and using atomics.
    (cherry picked from commit d4c41e45bc762f2d8b7831529678bc4061bf9cd3)
---
 .../artemis/utils/ActiveMQThreadPoolExecutor.java  | 55 ++++++----------------
 1 file changed, 15 insertions(+), 40 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
index c3b1988..da7de12 100755
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
@@ -20,6 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 /*
  * ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines
@@ -46,15 +47,13 @@ public class ActiveMQThreadPoolExecutor extends 
ThreadPoolExecutor {
 
       private ActiveMQThreadPoolExecutor executor = null;
 
-      // lock object to synchronize on
-      private final Object lock = new Object();
-
       // keep track of the difference between the number of idle threads and
       // the number of queued tasks. If the delta is > 0, we have more
       // idle threads than queued tasks and can add more tasks into the queue.
       // The delta is incremented if a thread becomes idle or if a task is 
taken from the queue.
       // The delta is decremented if a thread leaves idle state or if a task 
is added to the queue.
-      private int threadTaskDelta = 0;
+      private static final AtomicIntegerFieldUpdater<ThreadPoolQueue> 
DELTA_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThreadPoolQueue.class, 
"threadTaskDelta");
+      private volatile int threadTaskDelta = 0;
 
       public void setExecutor(ActiveMQThreadPoolExecutor executor) {
          this.executor = executor;
@@ -64,20 +63,15 @@ public class ActiveMQThreadPoolExecutor extends 
ThreadPoolExecutor {
       public boolean offer(Runnable runnable) {
          boolean retval = false;
 
-         // Need to lock for 2 reasons:
-         // 1. to safely handle poll timeouts
-         // 2. to protect the delta from parallel updates
-         synchronized (lock) {
-            if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || 
(threadTaskDelta > 0)) {
-               // A new task will be added to the queue if the maximum number 
of threads has been reached
-               // or if the delta is > 0, which means that there are enough 
idle threads.
+         if (threadTaskDelta > 0 || (executor.getPoolSize() >= 
executor.getMaximumPoolSize())) {
+            // A new task will be added to the queue if the maximum number of 
threads has been reached
+            // or if the delta is > 0, which means that there are enough idle 
threads.
 
-               retval = super.offer(runnable);
+            retval = super.offer(runnable);
 
-               // Only decrement the delta if the task has actually been added 
to the queue
-               if (retval)
-                  threadTaskDelta--;
-            }
+            // Only decrement the delta if the task has actually been added to 
the queue
+            if (retval)
+               DELTA_UPDATER.decrementAndGet(this);
          }
 
          return retval;
@@ -87,9 +81,8 @@ public class ActiveMQThreadPoolExecutor extends 
ThreadPoolExecutor {
       public Runnable take() throws InterruptedException {
          // Increment the delta as a thread becomes idle
          // by waiting for a task to take from the queue
-         synchronized (lock) {
-            threadTaskDelta++;
-         }
+         DELTA_UPDATER.incrementAndGet(this);
+
 
          Runnable runnable = null;
 
@@ -102,9 +95,7 @@ public class ActiveMQThreadPoolExecutor extends 
ThreadPoolExecutor {
             // (decremented by the thread and incremented by the taken task)
             // Only if no task had been taken, we have to decrement the delta.
             if (runnable == null) {
-               synchronized (lock) {
-                  threadTaskDelta--;
-               }
+               DELTA_UPDATER.decrementAndGet(this);
             }
          }
       }
@@ -113,34 +104,18 @@ public class ActiveMQThreadPoolExecutor extends 
ThreadPoolExecutor {
       public Runnable poll(long arg0, TimeUnit arg2) throws 
InterruptedException {
          // Increment the delta as a thread becomes idle
          // by waiting for a task to poll from the queue
-         synchronized (lock) {
-            threadTaskDelta++;
-         }
+         DELTA_UPDATER.incrementAndGet(this);
 
          Runnable runnable = null;
-         boolean timedOut = false;
 
          try {
             runnable = super.poll(arg0, arg2);
-            timedOut = (runnable == null);
          } finally {
             // Now the thread is no longer idle waiting for a task
             // If it had taken a task, the delta remains the same
             // (decremented by the thread and incremented by the taken task)
             if (runnable == null) {
-               synchronized (lock) {
-                  // If the poll called timed out, we check again within a 
synchronized block
-                  // to make sure all offer calls have been completed.
-                  // This is to handle a newly queued task if the timeout 
occurred while an offer call
-                  // added that task to the queue instead of creating a new 
thread.
-                  if (timedOut)
-                     runnable = super.poll();
-
-                  // Only if no task had been taken (either no timeout, or no 
task from after-timeout poll),
-                  // we have to decrement the delta.
-                  if (runnable == null)
-                     threadTaskDelta--;
-               }
+               DELTA_UPDATER.decrementAndGet(this);
             }
          }
 

Reply via email to