Repository: qpid-jms
Updated Branches:
  refs/heads/master 02c76f64c -> c66d88811


QPIDJMS-396 Allow for faster reaction times on sync operations

For sync operations from the JMS layer into the provider we can
more quickly process the events by using a spin-wait future that
checks in a short spin for the completion of the target event.
The spin will back off and eventually back down to a parked wait
that will be signalled by the normal wait / notify pattern.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c66d8881
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c66d8881
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c66d8881

Branch: refs/heads/master
Commit: c66d888114021da31d9032c841c08903dd31cc89
Parents: 02c76f6
Author: Timothy Bish <tabish...@gmail.com>
Authored: Mon Jun 25 13:05:07 2018 -0400
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Mon Jun 25 13:05:07 2018 -0400

----------------------------------------------------------------------
 .../qpid/jms/provider/ProviderFuture.java       | 164 ++++++++++++++++---
 .../qpid/jms/provider/ProviderFutureTest.java   |  11 ++
 2 files changed, 156 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c66d8881/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
index 01b3c6e..c708593 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
@@ -17,9 +17,9 @@
 package org.apache.qpid.jms.provider;
 
 import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
 
 import org.apache.qpid.jms.util.IOExceptionSupport;
 
@@ -28,15 +28,30 @@ import org.apache.qpid.jms.util.IOExceptionSupport;
  */
 public class ProviderFuture implements AsyncResult {
 
-    private static final AtomicIntegerFieldUpdater<ProviderFuture> 
COMPLETE_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(ProviderFuture.class, 
"complete");
+    // Using a progressive wait strategy helps to avoid await happening before 
countDown
+    // and avoids expensive thread signaling
+    private static final int SPIN_COUNT = 10;
+    private static final int YIELD_COUNT = 100;
+    private static final int TINY_PARK_COUNT = 1000;
+    private static final int TINY_PARK_NANOS = 1;
+    private static final int SMALL_PARK_COUNT = 101_000;
+    private static final int SMALL_PARK_NANOS = 10_000;
 
-    private final CountDownLatch latch = new CountDownLatch(1);
-    private final ProviderSynchronization synchronization;
+    // States used to track progress of this future
+    private static final int INCOMPLETE = 0;
+    private static final int COMPLETING = 1;
+    private static final int SUCCESS = 2;
+    private static final int FAILURE = 3;
+
+    private static final AtomicIntegerFieldUpdater<ProviderFuture> 
STATE_FIELD_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ProviderFuture.class,"state");
+
+    private volatile int state = INCOMPLETE;
     private volatile Throwable error;
 
-    @SuppressWarnings("unused")
-    private volatile int complete;
+    private int waiting;
+
+    private final ProviderSynchronization synchronization;
 
     public ProviderFuture() {
         this(null);
@@ -48,27 +63,41 @@ public class ProviderFuture implements AsyncResult {
 
     @Override
     public boolean isComplete() {
-        return latch.getCount() == 0;
+        return state > COMPLETING;
     }
 
     @Override
     public void onFailure(Throwable result) {
-        if (COMPLETE_UPDATER.compareAndSet(this, 0, 1)) {
+        if (STATE_FIELD_UPDATER.compareAndSet(this, INCOMPLETE, COMPLETING)) {
             error = result;
             if (synchronization != null) {
                 synchronization.onPendingFailure(error);
             }
-            latch.countDown();
+
+            STATE_FIELD_UPDATER.lazySet(this, FAILURE);
+
+            synchronized(this) {
+                if (waiting > 0) {
+                    notifyAll();
+                }
+            }
         }
     }
 
     @Override
     public void onSuccess() {
-        if (COMPLETE_UPDATER.compareAndSet(this, 0, 1)) {
+        if (STATE_FIELD_UPDATER.compareAndSet(this, INCOMPLETE, COMPLETING)) {
             if (synchronization != null) {
                 synchronization.onPendingSuccess();
             }
-            latch.countDown();
+
+            STATE_FIELD_UPDATER.lazySet(this, SUCCESS);
+
+            synchronized(this) {
+                if (waiting > 0) {
+                    notifyAll();
+                }
+            }
         }
     }
 
@@ -86,16 +115,70 @@ public class ProviderFuture implements AsyncResult {
      * @throws IOException if an error occurs while waiting for the response.
      */
     public boolean sync(long amount, TimeUnit unit) throws IOException {
-        boolean result = false;
         try {
-            result = latch.await(amount, unit);
+            if (isComplete() || amount == 0) {
+                failOnError();
+                return true;
+            }
+
+            final Thread currentThread = Thread.currentThread();
+            final long timeout = unit.toNanos(amount);
+            long maxParkNanos = timeout / 8;
+            maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout;
+            final long tinyParkNanos = Math.min(maxParkNanos, TINY_PARK_NANOS);
+            final long smallParkNanos = Math.min(maxParkNanos, 
SMALL_PARK_NANOS);
+            final long startTime = System.nanoTime();
+            int idleCount = 0;
+
+            while (true) {
+                if (currentThread.isInterrupted()) {
+                    throw new InterruptedException();
+                }
+
+                final long elapsed = System.nanoTime() - startTime;
+                final long diff = elapsed - timeout;
+
+                if (diff > 0) {
+                    failOnError();
+                    return isComplete();
+                }
+
+                if (isComplete()) {
+                    failOnError();
+                    return true;
+                }
+
+                if (idleCount < SPIN_COUNT) {
+                    idleCount++;
+                } else if (idleCount < YIELD_COUNT) {
+                    Thread.yield();
+                    idleCount++;
+                } else if (idleCount < TINY_PARK_COUNT) {
+                    LockSupport.parkNanos(tinyParkNanos);
+                    idleCount++;
+                } else if (idleCount < SMALL_PARK_COUNT) {
+                    LockSupport.parkNanos(smallParkNanos);
+                    idleCount++;
+                } else {
+                    synchronized (this) {
+                        if (isComplete()) {
+                            failOnError();
+                            return true;
+                        }
+
+                        waiting++;
+                        try {
+                            wait(-diff / 1000000, (int) (-diff % 1000000));
+                        } finally {
+                            waiting--;
+                        }
+                    }
+                }
+            }
         } catch (InterruptedException e) {
             Thread.interrupted();
             throw IOExceptionSupport.create(e);
         }
-        failOnError();
-
-        return result;
     }
 
     /**
@@ -105,12 +188,55 @@ public class ProviderFuture implements AsyncResult {
      */
     public void sync() throws IOException {
         try {
-            latch.await();
+            if (isComplete()) {
+                failOnError();
+                return;
+            }
+
+            final Thread currentThread = Thread.currentThread();
+            int idleCount = 0;
+
+            while (true) {
+                if (currentThread.isInterrupted()) {
+                    throw new InterruptedException();
+                }
+
+                if (isComplete()) {
+                    failOnError();
+                    return;
+                }
+
+                if (idleCount < SPIN_COUNT) {
+                    idleCount++;
+                } else if (idleCount < YIELD_COUNT) {
+                    Thread.yield();
+                    idleCount++;
+                } else if (idleCount < TINY_PARK_COUNT) {
+                    LockSupport.parkNanos(TINY_PARK_NANOS);
+                    idleCount++;
+                } else if (idleCount < SMALL_PARK_COUNT) {
+                    LockSupport.parkNanos(SMALL_PARK_NANOS);
+                    idleCount++;
+                } else {
+                    synchronized (this) {
+                        if (isComplete()) {
+                            failOnError();
+                            return;
+                        }
+
+                        waiting++;
+                        try {
+                            wait();
+                        } finally {
+                            waiting--;
+                        }
+                    }
+                }
+            }
         } catch (InterruptedException e) {
             Thread.interrupted();
             throw IOExceptionSupport.create(e);
         }
-        failOnError();
     }
 
     private void failOnError() throws IOException {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c66d8881/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
index 210077c..42258ed 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
@@ -51,6 +51,17 @@ public class ProviderFutureTest {
         }
     }
 
+    @Test(timeout = 90000)
+    public void testTimedSync() {
+        ProviderFuture future = new ProviderFuture();
+
+        try {
+            assertFalse(future.sync(1, TimeUnit.SECONDS));
+        } catch (IOException cause) {
+            fail("Should throw an error");
+        }
+    }
+
     @Test(timeout = 10000)
     public void testOnFailure() {
         ProviderFuture future = new ProviderFuture();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to