Repository: qpid-jms Updated Branches: refs/heads/master 02c76f64c -> c66d88811
QPIDJMS-396 Allow for faster reaction times on sync operations For sync operations from the JMS layer into the provider we can more quickly process the events by using a spin-wait future that checks in a short spin for the completion of the target event. The spin will back off and eventually back down to a parked wait that will be signalled by the normal wait / notify pattern. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c66d8881 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c66d8881 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c66d8881 Branch: refs/heads/master Commit: c66d888114021da31d9032c841c08903dd31cc89 Parents: 02c76f6 Author: Timothy Bish <tabish...@gmail.com> Authored: Mon Jun 25 13:05:07 2018 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Mon Jun 25 13:05:07 2018 -0400 ---------------------------------------------------------------------- .../qpid/jms/provider/ProviderFuture.java | 164 ++++++++++++++++--- .../qpid/jms/provider/ProviderFutureTest.java | 11 ++ 2 files changed, 156 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c66d8881/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java index 01b3c6e..c708593 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java @@ -17,9 +17,9 @@ package org.apache.qpid.jms.provider; import java.io.IOException; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.locks.LockSupport; import org.apache.qpid.jms.util.IOExceptionSupport; @@ -28,15 +28,30 @@ import org.apache.qpid.jms.util.IOExceptionSupport; */ public class ProviderFuture implements AsyncResult { - private static final AtomicIntegerFieldUpdater<ProviderFuture> COMPLETE_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(ProviderFuture.class, "complete"); + // Using a progressive wait strategy helps to avoid await happening before countDown + // and avoids expensive thread signaling + private static final int SPIN_COUNT = 10; + private static final int YIELD_COUNT = 100; + private static final int TINY_PARK_COUNT = 1000; + private static final int TINY_PARK_NANOS = 1; + private static final int SMALL_PARK_COUNT = 101_000; + private static final int SMALL_PARK_NANOS = 10_000; - private final CountDownLatch latch = new CountDownLatch(1); - private final ProviderSynchronization synchronization; + // States used to track progress of this future + private static final int INCOMPLETE = 0; + private static final int COMPLETING = 1; + private static final int SUCCESS = 2; + private static final int FAILURE = 3; + + private static final AtomicIntegerFieldUpdater<ProviderFuture> STATE_FIELD_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ProviderFuture.class,"state"); + + private volatile int state = INCOMPLETE; private volatile Throwable error; - @SuppressWarnings("unused") - private volatile int complete; + private int waiting; + + private final ProviderSynchronization synchronization; public ProviderFuture() { this(null); @@ -48,27 +63,41 @@ public class ProviderFuture implements AsyncResult { @Override public boolean isComplete() { - return latch.getCount() == 0; + return state > COMPLETING; } @Override public void onFailure(Throwable result) { - if (COMPLETE_UPDATER.compareAndSet(this, 0, 1)) { + if (STATE_FIELD_UPDATER.compareAndSet(this, INCOMPLETE, COMPLETING)) { error = result; if (synchronization != null) { synchronization.onPendingFailure(error); } - latch.countDown(); + + STATE_FIELD_UPDATER.lazySet(this, FAILURE); + + synchronized(this) { + if (waiting > 0) { + notifyAll(); + } + } } } @Override public void onSuccess() { - if (COMPLETE_UPDATER.compareAndSet(this, 0, 1)) { + if (STATE_FIELD_UPDATER.compareAndSet(this, INCOMPLETE, COMPLETING)) { if (synchronization != null) { synchronization.onPendingSuccess(); } - latch.countDown(); + + STATE_FIELD_UPDATER.lazySet(this, SUCCESS); + + synchronized(this) { + if (waiting > 0) { + notifyAll(); + } + } } } @@ -86,16 +115,70 @@ public class ProviderFuture implements AsyncResult { * @throws IOException if an error occurs while waiting for the response. */ public boolean sync(long amount, TimeUnit unit) throws IOException { - boolean result = false; try { - result = latch.await(amount, unit); + if (isComplete() || amount == 0) { + failOnError(); + return true; + } + + final Thread currentThread = Thread.currentThread(); + final long timeout = unit.toNanos(amount); + long maxParkNanos = timeout / 8; + maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout; + final long tinyParkNanos = Math.min(maxParkNanos, TINY_PARK_NANOS); + final long smallParkNanos = Math.min(maxParkNanos, SMALL_PARK_NANOS); + final long startTime = System.nanoTime(); + int idleCount = 0; + + while (true) { + if (currentThread.isInterrupted()) { + throw new InterruptedException(); + } + + final long elapsed = System.nanoTime() - startTime; + final long diff = elapsed - timeout; + + if (diff > 0) { + failOnError(); + return isComplete(); + } + + if (isComplete()) { + failOnError(); + return true; + } + + if (idleCount < SPIN_COUNT) { + idleCount++; + } else if (idleCount < YIELD_COUNT) { + Thread.yield(); + idleCount++; + } else if (idleCount < TINY_PARK_COUNT) { + LockSupport.parkNanos(tinyParkNanos); + idleCount++; + } else if (idleCount < SMALL_PARK_COUNT) { + LockSupport.parkNanos(smallParkNanos); + idleCount++; + } else { + synchronized (this) { + if (isComplete()) { + failOnError(); + return true; + } + + waiting++; + try { + wait(-diff / 1000000, (int) (-diff % 1000000)); + } finally { + waiting--; + } + } + } + } } catch (InterruptedException e) { Thread.interrupted(); throw IOExceptionSupport.create(e); } - failOnError(); - - return result; } /** @@ -105,12 +188,55 @@ public class ProviderFuture implements AsyncResult { */ public void sync() throws IOException { try { - latch.await(); + if (isComplete()) { + failOnError(); + return; + } + + final Thread currentThread = Thread.currentThread(); + int idleCount = 0; + + while (true) { + if (currentThread.isInterrupted()) { + throw new InterruptedException(); + } + + if (isComplete()) { + failOnError(); + return; + } + + if (idleCount < SPIN_COUNT) { + idleCount++; + } else if (idleCount < YIELD_COUNT) { + Thread.yield(); + idleCount++; + } else if (idleCount < TINY_PARK_COUNT) { + LockSupport.parkNanos(TINY_PARK_NANOS); + idleCount++; + } else if (idleCount < SMALL_PARK_COUNT) { + LockSupport.parkNanos(SMALL_PARK_NANOS); + idleCount++; + } else { + synchronized (this) { + if (isComplete()) { + failOnError(); + return; + } + + waiting++; + try { + wait(); + } finally { + waiting--; + } + } + } + } } catch (InterruptedException e) { Thread.interrupted(); throw IOExceptionSupport.create(e); } - failOnError(); } private void failOnError() throws IOException { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c66d8881/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java index 210077c..42258ed 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java @@ -51,6 +51,17 @@ public class ProviderFutureTest { } } + @Test(timeout = 90000) + public void testTimedSync() { + ProviderFuture future = new ProviderFuture(); + + try { + assertFalse(future.sync(1, TimeUnit.SECONDS)); + } catch (IOException cause) { + fail("Should throw an error"); + } + } + @Test(timeout = 10000) public void testOnFailure() { ProviderFuture future = new ProviderFuture(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org