This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.6.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 87ddafc13d78605814c486178b39037df42ca3a0 Author: Michael André Pearce <michael.andre.pea...@me.com> AuthorDate: Wed Jan 23 09:59:15 2019 +0000 ARTEMIS-2236 Address Latency Impact caused by ARTEMIS-1451 Readdress ARTEMIS-1451 concern of sync blocks, remove synchronization by simplifying original code and using atomics. (cherry picked from commit d4c41e45bc762f2d8b7831529678bc4061bf9cd3) --- .../artemis/utils/ActiveMQThreadPoolExecutor.java | 55 ++++++---------------- 1 file changed, 15 insertions(+), 40 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java index c3b1988..da7de12 100755 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java @@ -20,6 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /* * ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines @@ -46,15 +47,13 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { private ActiveMQThreadPoolExecutor executor = null; - // lock object to synchronize on - private final Object lock = new Object(); - // keep track of the difference between the number of idle threads and // the number of queued tasks. If the delta is > 0, we have more // idle threads than queued tasks and can add more tasks into the queue. // The delta is incremented if a thread becomes idle or if a task is taken from the queue. // The delta is decremented if a thread leaves idle state or if a task is added to the queue. - private int threadTaskDelta = 0; + private static final AtomicIntegerFieldUpdater<ThreadPoolQueue> DELTA_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThreadPoolQueue.class, "threadTaskDelta"); + private volatile int threadTaskDelta = 0; public void setExecutor(ActiveMQThreadPoolExecutor executor) { this.executor = executor; @@ -64,20 +63,15 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { public boolean offer(Runnable runnable) { boolean retval = false; - // Need to lock for 2 reasons: - // 1. to safely handle poll timeouts - // 2. to protect the delta from parallel updates - synchronized (lock) { - if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || (threadTaskDelta > 0)) { - // A new task will be added to the queue if the maximum number of threads has been reached - // or if the delta is > 0, which means that there are enough idle threads. + if (threadTaskDelta > 0 || (executor.getPoolSize() >= executor.getMaximumPoolSize())) { + // A new task will be added to the queue if the maximum number of threads has been reached + // or if the delta is > 0, which means that there are enough idle threads. - retval = super.offer(runnable); + retval = super.offer(runnable); - // Only decrement the delta if the task has actually been added to the queue - if (retval) - threadTaskDelta--; - } + // Only decrement the delta if the task has actually been added to the queue + if (retval) + DELTA_UPDATER.decrementAndGet(this); } return retval; @@ -87,9 +81,8 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { public Runnable take() throws InterruptedException { // Increment the delta as a thread becomes idle // by waiting for a task to take from the queue - synchronized (lock) { - threadTaskDelta++; - } + DELTA_UPDATER.incrementAndGet(this); + Runnable runnable = null; @@ -102,9 +95,7 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { // (decremented by the thread and incremented by the taken task) // Only if no task had been taken, we have to decrement the delta. if (runnable == null) { - synchronized (lock) { - threadTaskDelta--; - } + DELTA_UPDATER.decrementAndGet(this); } } } @@ -113,34 +104,18 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException { // Increment the delta as a thread becomes idle // by waiting for a task to poll from the queue - synchronized (lock) { - threadTaskDelta++; - } + DELTA_UPDATER.incrementAndGet(this); Runnable runnable = null; - boolean timedOut = false; try { runnable = super.poll(arg0, arg2); - timedOut = (runnable == null); } finally { // Now the thread is no longer idle waiting for a task // If it had taken a task, the delta remains the same // (decremented by the thread and incremented by the taken task) if (runnable == null) { - synchronized (lock) { - // If the poll called timed out, we check again within a synchronized block - // to make sure all offer calls have been completed. - // This is to handle a newly queued task if the timeout occurred while an offer call - // added that task to the queue instead of creating a new thread. - if (timedOut) - runnable = super.poll(); - - // Only if no task had been taken (either no timeout, or no task from after-timeout poll), - // we have to decrement the delta. - if (runnable == null) - threadTaskDelta--; - } + DELTA_UPDATER.decrementAndGet(this); } }