franz1981 closed pull request #26: QPIDJMS-430 Lock-Free FifoMessageQueue
URL: https://github.com/apache/qpid-jms/pull/26
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
index 83fa532b..db699b5c 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
@@ -16,133 +16,392 @@
  */
 package org.apache.qpid.jms.util;
 
-import java.util.ArrayDeque;
-import java.util.Deque;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+abstract class FifoMessageQueuePad0 {
+    long p01, p02, p03, p04, p05, p06, p07;
+    long p10, p11, p12, p13, p14, p15, p16, p17;
+}
 
-/**
- * Simple first in / first out Message Queue.
- */
-public final class FifoMessageQueue implements MessageQueue {
+abstract class FifoMessageQueueProducerFields extends FifoMessageQueuePad0 {
+    private static final 
AtomicLongFieldUpdater<FifoMessageQueueProducerFields> TAIL_FIELD_UPDATER =
+            
AtomicLongFieldUpdater.newUpdater(FifoMessageQueueProducerFields.class, "tail");
+
+    private volatile long tail;
+    protected long producerLimit;
 
-    protected static final AtomicIntegerFieldUpdater<FifoMessageQueue> 
STATE_FIELD_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(FifoMessageQueue.class, 
"state");
+    protected final long lvTail() {
+        return tail;
+    }
+
+    protected final void soTail(long value) {
+        TAIL_FIELD_UPDATER.lazySet(this, value);
+    }
 
+    protected final void svTail(long value) {
+        tail = value;
+    }
+}
+
+abstract class FifoMessageQueuePad1 extends FifoMessageQueueProducerFields {
+    long p01, p02, p03, p04, p05, p06, p07, p08;
+    long p10, p11, p12, p13, p14, p15, p16, p17;
+}
+
+abstract class FifoMessageQueueConsumerFields extends FifoMessageQueuePad1 {
+    protected static final 
AtomicLongFieldUpdater<FifoMessageQueueConsumerFields> HEAD_FIELD_UPDATER =
+            
AtomicLongFieldUpdater.newUpdater(FifoMessageQueueConsumerFields.class, "head");
+
+    protected static final 
AtomicLongFieldUpdater<FifoMessageQueueConsumerFields> HEAD_LOCK_FIELD_UPDATER =
+            
AtomicLongFieldUpdater.newUpdater(FifoMessageQueueConsumerFields.class, 
"headLock");
+    protected static final 
AtomicIntegerFieldUpdater<FifoMessageQueueConsumerFields> STATE_FIELD_UPDATER =
+            
AtomicIntegerFieldUpdater.newUpdater(FifoMessageQueueConsumerFields.class, 
"state");
     protected static final int CLOSED = 0;
     protected static final int STOPPED = 1;
     protected static final int RUNNING = 2;
+    private volatile long head;
+    private volatile long headLock;
+    protected volatile int state = STOPPED;
 
-    private volatile int state = STOPPED;
+    protected final boolean tryLockHead() {
+        return HEAD_LOCK_FIELD_UPDATER.getAndIncrement(this) == 0;
+    }
 
-    protected final ReentrantLock lock = new ReentrantLock();
-    protected final Condition condition = lock.newCondition();
+    protected final void unlockLockHead() {
+        HEAD_LOCK_FIELD_UPDATER.lazySet(this, 0);
+    }
 
-    protected final Deque<JmsInboundMessageDispatch> queue;
+    protected final long lvHead() {
+        return head;
+    }
+
+    protected final void soHead(long value) {
+        HEAD_FIELD_UPDATER.lazySet(this, value);
+    }
+
+    protected final void svHead(long value) {
+        head = value;
+    }
+}
+
+abstract class FifoMessageQueuePad2 extends FifoMessageQueueConsumerFields {
+    long p01, p02, p03, p04, p05, p06, p07, p08;
+    long p10, p11, p12, p13, p14, p15, p16, p17;
+}
+
+abstract class FifoMessageQueueSharedFields extends FifoMessageQueuePad2 {
+    protected static final 
AtomicLongFieldUpdater<FifoMessageQueueSharedFields> 
WAITING_THREADS_FIELD_UPDATER =
+            
AtomicLongFieldUpdater.newUpdater(FifoMessageQueueSharedFields.class, 
"waitingThreads");
+    private volatile long waitingThreads = 0;
+    protected final ReentrantLock notEmptyLock = new ReentrantLock();
+    protected final Condition notEmptyCondition = notEmptyLock.newCondition();
+
+    public void signalNotEmpty() {
+        if (isWaiting()) {
+            notEmptyLock.lock();
+            try {
+                if (isWaiting()) {
+                    notEmptyCondition.signalAll();
+                }
+            } finally {
+                notEmptyLock.unlock();
+            }
+        }
+    }
+
+    private final boolean isWaiting() {
+        return WAITING_THREADS_FIELD_UPDATER.get(this) > 0;
+    }
+
+    protected final void startWaiting() {
+        WAITING_THREADS_FIELD_UPDATER.incrementAndGet(this);
+    }
+
+    protected final void stopWaiting() {
+        WAITING_THREADS_FIELD_UPDATER.decrementAndGet(this);
+    }
+}
+
+abstract class FifoMessageQueuePad3 extends FifoMessageQueueSharedFields {
+    long p01, p02, p03, p04, p05, p06, p07, p08;
+    long p10, p11, p12, p13, p14, p15, p16, p17;
+}
+
+/**
+ * Simple lock-free first in / first out Message Queue.<br>
+ * It is single producer on {@link #enqueue(JmsInboundMessageDispatch)} and 
{@link #enqueueFirst(JmsInboundMessageDispatch)} while
+ * multi consumer on any dequeue operation although it will suffer contention 
in that case.
+ */
+public final class FifoMessageQueue extends FifoMessageQueuePad3 implements 
MessageQueue {
+
+    /**
+     * Note on concurrent fields accessors notation:
+     * <p>
+     * lvXXX:   load volatile of XXX
+     * soXXX:   store ordered of XXX
+     * svXXX:   store volatile of XXX
+     */
+
+    private final AtomicReferenceArray<JmsInboundMessageDispatch> messages;
+    private final long mask;
+    //this pad is needed to avoid false sharing with other regions in the JVM 
heap
+    private static final int ARRAY_PAD = 128 / Integer.BYTES;
+    private final int lookAheadStep;
 
     public FifoMessageQueue(int prefetchSize) {
-        this.queue = new ArrayDeque<JmsInboundMessageDispatch>(Math.max(1, 
prefetchSize));
+        final int size = Math.max(4, prefetchSize);
+        final int nextPow2Size = 1 << 32 - Integer.numberOfLeadingZeros(size - 
1);
+        messages = new AtomicReferenceArray<>(nextPow2Size + ARRAY_PAD * 2);
+        soHead(0);
+        soTail(0);
+        mask = nextPow2Size - 1;
+        lookAheadStep = nextPow2Size / 4;
+        producerLimit = nextPow2Size;
     }
 
-    @Override
-    public void enqueueFirst(JmsInboundMessageDispatch envelope) {
-        lock.lock();
+    private static int elementIndex(long sequence, long mask) {
+        return ARRAY_PAD + (int) (sequence & mask);
+    }
+
+    private static void 
soElement(AtomicReferenceArray<JmsInboundMessageDispatch> messages, long mask, 
long sequence, JmsInboundMessageDispatch dispatch) {
+        messages.lazySet(elementIndex(sequence, mask), dispatch);
+    }
+
+    private static JmsInboundMessageDispatch 
lvElement(AtomicReferenceArray<JmsInboundMessageDispatch> messages, long mask, 
long sequence) {
+        return messages.get(elementIndex(sequence, mask));
+    }
+
+    private boolean exclusiveOffer(final 
AtomicReferenceArray<JmsInboundMessageDispatch> messages, long mask, final 
JmsInboundMessageDispatch e) {
+        final long currentTail = lvTail();
+        if (currentTail >= producerLimit && !exclusiveOfferSlowPath(messages, 
mask, currentTail)) {
+            return false;
+        }
+        soElement(messages, mask, currentTail, e);
+        //using a full barrier to avoid signalNotEmpty to happen before a 
completed offer
+        svTail(currentTail + 1);
+        return true;
+    }
+
+    private boolean 
exclusiveOffer(AtomicReferenceArray<JmsInboundMessageDispatch> messages, long 
mask, JmsInboundMessageDispatch e, long currentTail) {
+        if (currentTail >= producerLimit && !exclusiveOfferSlowPath(messages, 
mask, currentTail)) {
+            return false;
+        }
+        soElement(messages, mask, currentTail, e);
+        //using a full barrier to avoid signalNotEmpty to happen before a 
completed offer
+        svTail(currentTail + 1);
+        return true;
+    }
+
+    private boolean 
exclusiveOfferSlowPath(AtomicReferenceArray<JmsInboundMessageDispatch> 
messages, long mask, long currentTail) {
+        final int lookAheadStep = this.lookAheadStep;
+        final long lookAheadTail = currentTail + lookAheadStep;
+
+        if (lvElement(messages, mask, lookAheadTail) == null) {
+            producerLimit = lookAheadTail;
+        } else {
+            if (lvElement(messages, mask, currentTail) != null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean exclusiveOfferFirst(final JmsInboundMessageDispatch e) {
+        final AtomicReferenceArray<JmsInboundMessageDispatch> messages = 
this.messages;
+        final long mask = this.mask;
+        final long currentTail = lvTail();
+        if (currentTail == lvHead()) {
+            return exclusiveOffer(messages, mask, e, currentTail);
+        }
+        while (!tryLockHead()) {
+            Thread.yield();
+        }
+        final long currentHead = lvHead();
+        //both head and tail cannot change here
+        if (currentTail == currentHead) {
+            //i can unlock it because the queue is empty for real: any 
consumer is waiting
+            unlockLockHead();
+            return exclusiveOffer(messages, mask, e, currentTail);
+        }
         try {
-            queue.addFirst(envelope);
-            condition.signal();
+            final long previousHead = currentHead - 1;
+            //full
+            if (lvElement(messages, mask, previousHead) != null) {
+                return false;
+            }
+            //adjust the producer limit to be right before the consumer
+            producerLimit = previousHead + mask + 1;
+            soElement(messages, mask, previousHead, e);
+            //using a full barrier to avoid signalNotEmpty to happen before a 
completed offerFirst
+            svHead(previousHead);
+            return true;
         } finally {
-            lock.unlock();
+            unlockLockHead();
         }
     }
 
-    @Override
-    public void enqueue(JmsInboundMessageDispatch envelope) {
-        lock.lock();
+    private JmsInboundMessageDispatch 
sharedPoll(AtomicReferenceArray<JmsInboundMessageDispatch> messages, long mask) 
{
+        while (!tryLockHead()) {
+            Thread.yield();
+        }
         try {
-            queue.addLast(envelope);
-            condition.signal();
+            final long currentHead = lvHead();
+            final JmsInboundMessageDispatch e = lvElement(messages, mask, 
currentHead);
+            if (e == null) {
+                return null;
+            }
+            soElement(messages, mask, currentHead, null);
+            soHead(currentHead + 1);
+            return e;
         } finally {
-            lock.unlock();
+            unlockLockHead();
         }
     }
 
+    private void sharedClear(AtomicReferenceArray<JmsInboundMessageDispatch> 
messages, long mask) {
+        while (!tryLockHead()) {
+            Thread.yield();
+        }
+        try {
+            long currentHead = lvHead();
+            while (true) {
+                final JmsInboundMessageDispatch e = lvElement(messages, mask, 
currentHead);
+                if (e == null) {
+                    return;
+                }
+                soElement(messages, mask, currentHead, null);
+                currentHead++;
+                soHead(currentHead);
+            }
+        } finally {
+            unlockLockHead();
+        }
+    }
 
     @Override
-    public JmsInboundMessageDispatch dequeue(long timeout) throws 
InterruptedException {
-        lock.lock();
-        try {
-            // Wait until the consumer is ready to deliver messages.
-            while (timeout != 0 && isRunning() && queue.isEmpty()) {
-                if (timeout == -1) {
-                    condition.await();
-                } else {
-                    long start = System.currentTimeMillis();
-                    condition.await(timeout, TimeUnit.MILLISECONDS);
-                    timeout = Math.max(timeout + start - 
System.currentTimeMillis(), 0);
+    public void enqueueFirst(JmsInboundMessageDispatch envelope) {
+        while (!exclusiveOfferFirst(envelope)) {
+            Thread.yield();
+        }
+        signalNotEmpty();
+    }
+
+    public boolean tryEnqueue(JmsInboundMessageDispatch envelope) {
+        final AtomicReferenceArray<JmsInboundMessageDispatch> messages = 
this.messages;
+        final long mask = this.mask;
+        if (exclusiveOffer(messages, mask, envelope)) {
+            signalNotEmpty();
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void enqueue(JmsInboundMessageDispatch envelope) {
+        final AtomicReferenceArray<JmsInboundMessageDispatch> messages = 
this.messages;
+        final long mask = this.mask;
+        while (!exclusiveOffer(messages, mask, envelope)) {
+            Thread.yield();
+        }
+        signalNotEmpty();
+    }
+
+
+    private JmsInboundMessageDispatch blockingDequeue() throws 
InterruptedException {
+        final AtomicReferenceArray<JmsInboundMessageDispatch> messages = 
this.messages;
+        final long mask = this.mask;
+        JmsInboundMessageDispatch e = null;
+        while (isRunning() && (e = sharedPoll(messages, mask)) == null) {
+            startWaiting();
+            //startWaiting must contain a full memory barrier to avoid
+            //a producer may think the consumer is still active
+            //when in fact it has decided to go to sleep.
+            notEmptyLock.lock();
+            try {
+                final JmsInboundMessageDispatch lastChancePoll = 
sharedPoll(messages, mask);
+                if (lastChancePoll != null) {
+                    return lastChancePoll;
                 }
+                notEmptyCondition.await();
+            } finally {
+                notEmptyLock.unlock();
+                stopWaiting();
             }
+        }
+        return e;
+    }
 
-            if (!isRunning()) {
-                return null;
+    private JmsInboundMessageDispatch blockingDequeue(long timeoutInNanos) 
throws InterruptedException {
+        final AtomicReferenceArray<JmsInboundMessageDispatch> messages = 
this.messages;
+        final long mask = this.mask;
+        long nanosLeftToWait = timeoutInNanos;
+        JmsInboundMessageDispatch e = null;
+        while (isRunning() && (e = sharedPoll(messages, mask)) == null && 
nanosLeftToWait > 0) {
+            startWaiting();
+            //startWaiting must contain a full memory barrier to avoid
+            //a producer may think the consumer is still active
+            //when in fact it has decided to go to sleep.
+            notEmptyLock.lock();
+            try {
+                final JmsInboundMessageDispatch lastChancePoll = 
sharedPoll(messages, mask);
+                if (lastChancePoll != null) {
+                    return lastChancePoll;
+                }
+                nanosLeftToWait = 
notEmptyCondition.awaitNanos(nanosLeftToWait);
+            } finally {
+                notEmptyLock.unlock();
+                stopWaiting();
             }
+        }
+        return e;
+    }
 
-            return queue.pollFirst();
-        } finally {
-            lock.unlock();
+    @Override
+    public JmsInboundMessageDispatch dequeue(long timeout) throws 
InterruptedException {
+        if (timeout == -1) {
+            return blockingDequeue();
+        }
+        if (timeout == 0) {
+            return dequeueNoWait();
         }
+        return blockingDequeue(TimeUnit.MILLISECONDS.toNanos(timeout));
     }
 
     @Override
     public final JmsInboundMessageDispatch dequeueNoWait() {
-        lock.lock();
-        try {
-            if (!isRunning()) {
-                return null;
-            }
-
-            return queue.pollFirst();
-        } finally {
-            lock.unlock();
+        if (!isRunning()) {
+            return null;
         }
+        final AtomicReferenceArray<JmsInboundMessageDispatch> messages = 
this.messages;
+        final long mask = this.mask;
+        return sharedPoll(messages, mask);
     }
 
     @Override
     public final void start() {
         if (STATE_FIELD_UPDATER.compareAndSet(this, STOPPED, RUNNING)) {
-            lock.lock();
-            try {
-                condition.signalAll();
-            } finally {
-                lock.unlock();
-            }
+            signalNotEmpty();
         }
     }
 
     @Override
     public final void stop() {
         if (STATE_FIELD_UPDATER.compareAndSet(this, RUNNING, STOPPED)) {
-            lock.lock();
-            try {
-                condition.signalAll();
-            } finally {
-                lock.unlock();
-            }
+            signalNotEmpty();
         }
     }
 
     @Override
     public final void close() {
         if (STATE_FIELD_UPDATER.getAndSet(this, CLOSED) > CLOSED) {
-            lock.lock();
-            try {
-                condition.signalAll();
-            } finally {
-                lock.unlock();
-            }
+            signalNotEmpty();
         }
     }
 
@@ -156,43 +415,53 @@ public final boolean isClosed() {
         return state == CLOSED;
     }
 
-    @Override
-    public boolean isEmpty() {
-        lock.lock();
-        try {
-            return queue.isEmpty();
-        } finally {
-            lock.unlock();
-        }
+    public int capacity() {
+        return (int) mask + 1;
     }
 
     @Override
     public int size() {
-        lock.lock();
-        try {
-            return queue.size();
-        } finally {
-            lock.unlock();
+        /*
+         * It is possible for a thread to be interrupted or reschedule between 
the read of the producer and
+         * consumer indices, therefore protection is required to ensure size 
is within valid range. In the
+         * event of concurrent polls/offers to this method the size is OVER 
estimated as we read consumer
+         * index BEFORE the producer index.
+         */
+        long after = lvHead();
+        long size;
+        while (true) {
+            final long before = after;
+            final long currentProducerIndex = lvTail();
+            after = lvHead();
+            if (before == after) {
+                size = (currentProducerIndex - after);
+                //protection against offerFirst
+                if (size >= 0) {
+                    break;
+                }
+            }
         }
+        //both long and int overflow are impossible (it is a bounded q)
+        return (int) size;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        // Order matters!
+        // Loading consumer before producer allows for producer increments 
after consumer index is read.
+        // This ensures this method is conservative in it's estimate.
+        return (lvHead() == lvTail());
     }
 
     @Override
     public void clear() {
-        lock.lock();
-        try {
-            queue.clear();
-        } finally {
-            lock.unlock();
-        }
+        final AtomicReferenceArray<JmsInboundMessageDispatch> messages = 
this.messages;
+        final long mask = this.mask;
+        sharedClear(messages, mask);
     }
 
     @Override
     public String toString() {
-        lock.lock();
-        try {
-            return queue.toString();
-        } finally {
-            lock.unlock();
-        }
+        return "FifoMessageQueue size = " + size();
     }
 }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
index 590e9e9c..22cbd1d3 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
@@ -22,14 +22,12 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.JMSException;
 
@@ -80,6 +78,43 @@ public void testClose() {
         queue.close();
     }
 
+    @Test(timeout = 10000)
+    public void testCannotEnqueueOverCapacity() {
+        assumeTrue(queue.capacity() >= 4);
+        assertFalse(queue.isClosed());
+        assertTrue(queue.isRunning());
+        JmsInboundMessageDispatch message = createEnvelope();
+        queue.enqueue(message);
+        assertEquals(1, queue.size());
+        final int remaining = queue.capacity() - queue.size();
+        for (int i = 0; i < remaining; i++) {
+            queue.enqueue(message);
+        }
+        assertEquals(queue.capacity(), queue.size());
+        assertFalse(queue.tryEnqueue(message));
+        queue.clear();
+        queue.close();
+    }
+
+    @Test(timeout = 10000)
+    public void testCannotEnqueueOverCapacityUsingEnqueueFirst() {
+        assumeTrue(queue.capacity() >= 4);
+        assertFalse(queue.isClosed());
+        assertTrue(queue.isRunning());
+        JmsInboundMessageDispatch message = createEnvelope();
+        queue.enqueue(message);
+        queue.enqueueFirst(message);
+        assertEquals(2, queue.size());
+        final int remaining = queue.capacity() - queue.size();
+        for (int i = 0; i < remaining; i++) {
+            queue.enqueue(message);
+        }
+        assertEquals(queue.capacity(), queue.size());
+        assertFalse(queue.tryEnqueue(message));
+        queue.clear();
+        queue.close();
+    }
+
     @Test
     public void testDequeueNoWaitWhenQueueIsClosed() {
         JmsInboundMessageDispatch message = createEnvelope();
@@ -235,7 +270,7 @@ public void run() {
                 }
 
                 try {
-                    singalQueue(queue);
+                    queue.signalNotEmpty();
                 } catch (Exception e1) {
                     return;
                 }
@@ -329,35 +364,4 @@ private JmsMessage createMessage(int priority) {
         return message;
     }
 
-    private void singalQueue(FifoMessageQueue queue) throws Exception {
-        Field lock = null;
-        Field condition = null;
-        Class<?> queueType = queue.getClass();
-
-        while (queueType != null && lock == null) {
-            try {
-                lock = queueType.getDeclaredField("lock");
-                condition = queueType.getDeclaredField("condition");
-            } catch (NoSuchFieldException error) {
-                queueType = queueType.getSuperclass();
-                if (Object.class.equals(queueType)) {
-                    queueType = null;
-                }
-            }
-        }
-
-        assertNotNull("MessageQueue implementation unknown", lock);
-        lock.setAccessible(true);
-        condition.setAccessible(true);
-
-        ReentrantLock lockView = (ReentrantLock) lock.get(queue);
-        Condition conditionView = (Condition) condition.get(queue);
-
-        lockView.lock();
-        try {
-            conditionView.signal();
-        } finally {
-            lockView.unlock();
-        }
-    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to