Author: ritchiem
Date: Tue Nov  7 03:05:25 2006
New Revision: 472060

URL: http://svn.apache.org/viewvc?view=rev&rev=472060
Log:
QPID-69
Made an interface from the current DeliveryManager.java. The original 
DeliveryManager is now the SynchronizedDeliveryManager.java where the deliver() 
method now has synchronization to solve the race condition.
An alternative DeliveryManager - ConcurrentDeliveryManager.java uses a modified 
ConcurrentLinkedQueue (Modified to maintain the current queue size) this uses a 
compare and swap methods to allow concurrent access to each end of the queue. 
Additional locking is required once the queue has been depleted to ensure that 
a thread is not in the process of appending to the queue. 

Added:
    
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
   (with props)
    
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
   (with props)
    
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
   (with props)
    
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
    
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java
    
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=472060&r1=472059&r2=472060
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
 Tue Nov  7 03:05:25 2006
@@ -580,7 +580,18 @@
         _managedObject.register();
         _subscribers = subscribers;
         _subscriptionFactory = subscriptionFactory;
-        _deliveryMgr = new DeliveryManager(_subscribers, this);
+
+        //fixme - Pick one.
+        if (Boolean.getBoolean("concurrentdeliverymanager"))
+        {
+            _logger.warn("Using ConcurrentDeliveryManager");
+            _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+        }
+        else
+        {
+            _logger.warn("Using SynchronizedDeliveryManager");
+            _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+        }
     }
 
     private AMQQueueMBean createMBean() throws AMQException
@@ -676,7 +687,7 @@
             _logger.info("Will not delete " + this + " as it is in use.");
             return 0;
         }
-        else if (checkEmpty && _deliveryMgr.getQueueMessageCount() > 0)
+        else if (checkEmpty && _deliveryMgr.hasQueuedMessages())
         {
             _logger.info("Will not delete " + this + " as it is not empty.");
             return 0;

Added: 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java?view=auto&rev=472060
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
 Tue Nov  7 03:05:25 2006
@@ -0,0 +1,345 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.configuration.Configurator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * Manages delivery of messages on behalf of a queue
+ */
+public class ConcurrentDeliveryManager implements DeliveryManager
+{
+    private static final Logger _log = 
Logger.getLogger(ConcurrentDeliveryManager.class);
+
+    @Configured(path = "advanced.compressBufferOnQueue",
+                defaultValue = "false")
+    public boolean compressBufferOnQueue;
+    /**
+     * Holds any queued messages
+     */
+    private final Queue<AMQMessage> _messages = new 
ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+    //private int _messageCount;
+    /**
+     * Ensures that only one asynchronous task is running for this manager at
+     * any time.
+     */
+    private final AtomicBoolean _processing = new AtomicBoolean();
+    /**
+     * The subscriptions on the queue to whom messages are delivered
+     */
+    private final SubscriptionManager _subscriptions;
+
+    /**
+     * A reference to the queue we are delivering messages for. We need this 
to be able
+     * to pass the code that handles acknowledgements a handle on the queue.
+     */
+    private final AMQQueue _queue;
+
+
+    /**
+     * Lock used to ensure that an channel that becomes unsuspended during the 
start of the queueing process is forced
+     * to wait till the first message is added to the queue. This will ensure 
that the _queue has messages to be delivered
+     * via the async thread.
+     * <p/>
+     * Lock is used to control access to hasQueuedMessages() and over the 
addition of messages to the queue.
+     */
+    private ReentrantLock _lock = new ReentrantLock();
+
+
+    ConcurrentDeliveryManager(SubscriptionManager subscriptions, AMQQueue 
queue)
+    {
+
+        //Set values from configuration
+        Configurator.configure(this);
+
+        if (compressBufferOnQueue)
+        {
+            _log.info("Compressing Buffers on queue.");
+        }
+
+        _subscriptions = subscriptions;
+        _queue = queue;
+    }
+
+    /**
+     * @return boolean if we are queueing
+     */
+    private boolean queueing()
+    {
+        return hasQueuedMessages();
+    }
+
+
+    /**
+     * @param msg to enqueue
+     * @return true if we are queue this message
+     */
+    private boolean enqueue(AMQMessage msg)
+    {
+        if (msg.isImmediate())
+        {
+            return false;
+        }
+        else
+        {
+            _lock.lock();
+            try
+            {
+                if (queueing())
+                {
+                    return addMessageToQueue(msg);
+                }
+                else
+                {
+                    return false;
+                }
+            }
+            finally
+            {
+                _lock.unlock();
+            }
+        }
+    }
+
+    private void startQueueing(AMQMessage msg)
+    {
+        if (!msg.isImmediate())
+        {
+            addMessageToQueue(msg);
+        }
+    }
+
+    private boolean addMessageToQueue(AMQMessage msg)
+    {
+        // Shrink the ContentBodies to their actual size to save memory.       
+        if (compressBufferOnQueue)
+        {
+            Iterator it = msg.getContentBodies().iterator();
+            while (it.hasNext())
+            {
+                ContentBody cb = (ContentBody) it.next();
+                cb.reduceBufferToFit();
+            }
+        }
+
+        _messages.offer(msg);
+
+        return true;
+    }
+
+
+    public boolean hasQueuedMessages()
+    {
+
+        _lock.lock();
+        try
+        {
+            return !_messages.isEmpty();
+        }
+        finally
+        {
+            _lock.unlock();
+        }
+
+
+    }
+
+    public int getQueueMessageCount()
+    {
+        return getMessageCount();
+    }
+
+    /**
+     * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue 
as it must run the queue to determine size.
+     * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the 
number of elements on the queue.
+     *
+     * @return int the number of messages in the delivery queue.
+     */
+    private int getMessageCount()
+    {
+        return _messages.size();
+    }
+
+
+    public synchronized List<AMQMessage> getMessages()
+    {
+        return new ArrayList<AMQMessage>(_messages);
+    }
+
+    public synchronized void removeAMessageFromTop() throws AMQException
+    {
+        AMQMessage msg = poll();
+        if (msg != null)
+        {
+            msg.dequeue(_queue);
+        }
+    }
+
+    public synchronized void clearAllMessages() throws AMQException
+    {
+        AMQMessage msg = poll();
+        while (msg != null)
+        {
+            msg.dequeue(_queue);
+            msg = poll();
+        }
+    }
+
+    /**
+     * Only one thread should ever execute this method concurrently, but
+     * it can do so while other threads invoke deliver().
+     */
+    private void processQueue()
+    {
+        try
+        {
+            boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+            AMQMessage message = peek();
+
+            //While we have messages to send and subscribers to send them to.
+            while (message != null && hasSubscribers)
+            {
+                // _log.debug("Have messages(" + _messages.size() + ") and 
subscribers");
+                Subscription next = _subscriptions.nextSubscriber(message);
+                //FIXME Is there still not the chance that this subscribe 
could be suspended between here and the send?
+
+                //We don't synchronize access to subscribers so need to 
re-check
+                if (next != null)
+                {
+                    next.send(message, _queue);
+                    poll();
+                    message = peek();
+                }
+                else
+                {
+                    hasSubscribers = false;
+                }
+            }
+        }
+        catch (FailedDequeueException e)
+        {
+            _log.error("Unable to deliver message as dequeue failed: " + e, e);
+        }
+        finally
+        {
+            _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" 
+ " subscribers:" + _subscriptions.hasActiveSubscribers());
+        }
+    }
+
+    private AMQMessage peek()
+    {
+        return _messages.peek();
+    }
+
+    private AMQMessage poll()
+    {
+        return _messages.poll();
+    }
+
+    Runner asyncDelivery = new Runner();
+
+    public void processAsync(Executor executor)
+    {
+        _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + 
getQueueMessageCount() + ")" +
+                   " Active:" + _subscriptions.hasActiveSubscribers() +
+                   " Processing:" + _processing.get());
+
+        if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+        {
+            //are we already running? if so, don't re-run
+            if (_processing.compareAndSet(false, true))
+            {
+                executor.execute(asyncDelivery);
+            }
+        }
+    }
+
+    public void deliver(String name, AMQMessage msg) throws 
FailedDequeueException
+    {
+        // first check whether we are queueing, and enqueue if we are
+        if (!enqueue(msg))
+        {
+            // not queueing so deliver message to 'next' subscriber
+            _lock.lock();
+            try
+            {
+                Subscription s = _subscriptions.nextSubscriber(msg);
+                if (s == null)
+                {
+                    if (!msg.isImmediate())
+                    {
+                        // no subscribers yet so enter 'queueing' mode and 
queue this message
+                        startQueueing(msg);
+                    }
+                }
+                else
+                {
+                    s.send(msg, _queue);
+                    msg.setDeliveredToConsumer();
+                }
+            }
+            finally
+            {
+                _lock.unlock();
+            }
+        }
+    }
+
+    private class Runner implements Runnable
+    {
+        public void run()
+        {
+            boolean running = true;
+            while (running)
+            {
+                processQueue();
+
+                //Check that messages have not been added since we did our 
last peek();
+                // Synchronize with the thread that adds to the queue.
+                // If the queue is still empty then we can exit
+                _lock.lock();
+                try
+                {
+                    if (!(hasQueuedMessages() && 
_subscriptions.hasActiveSubscribers()))
+                    {
+                        running = false;
+                        _processing.set(false);
+                    }
+                }
+                finally
+                {
+                    _lock.unlock();
+                }
+            }
+        }
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=472060&r1=472059&r2=472060
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
 Tue Nov  7 03:05:25 2006
@@ -17,226 +17,28 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.server.configuration.Configurator;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.List;
 
-/**
- * Manages delivery of messages on behalf of a queue
- */
-public class DeliveryManager
+interface DeliveryManager
 {
-    private static final Logger _log = Logger.getLogger(DeliveryManager.class);
-
-    @Configured(path = "advanced.compressBufferOnQueue",
-                defaultValue = "false")
-    public boolean compressBufferOnQueue;
-    /**
-     * Holds any queued messages
-     */
-    private final Queue<AMQMessage> _messages = new 
ConcurrentLinkedQueueAtomicSize<AMQMessage>();
-    //private int _messageCount;
-    /**
-     * Ensures that only one asynchronous task is running for this manager at
-     * any time.
-     */
-    private final AtomicBoolean _processing = new AtomicBoolean();
-    /**
-     * The subscriptions on the queue to whom messages are delivered
-     */
-    private final SubscriptionManager _subscriptions;
-
-    /**
-     * A reference to the queue we are delivering messages for. We need this 
to be able
-     * to pass the code that handles acknowledgements a handle on the queue.
-     */
-    private final AMQQueue _queue;
-
-
-    DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
-    {
-        //Set values from configuration
-        Configurator.configure(this);
-
-        if (compressBufferOnQueue)
-        {
-            _log.info("Compressing Buffers on queue.");
-        }
-
-        _subscriptions = subscriptions;
-        _queue = queue;
-    }
-
-    /**
-     * @return boolean if we are queueing
-     */
-    private boolean queueing()
-    {
-        return hasQueuedMessages();
-    }
-
-
-    /**
-     * @param msg to enqueue
-     * @return true if we are queue this message
-     */
-    private boolean enqueue(AMQMessage msg)
-    {
-        if (msg.isImmediate())
-        {
-            return false;
-        }
-        else
-        {
-            if (queueing())
-            {
-                return addMessageToQueue(msg);
-            }
-            else
-            {
-                return false;
-            }
-        }
-    }
-
-    private void startQueueing(AMQMessage msg)
-    {
-        if (!msg.isImmediate())
-        {
-            addMessageToQueue(msg);
-        }
-    }
-
-    private boolean addMessageToQueue(AMQMessage msg)
-    {
-        // Shrink the ContentBodies to their actual size to save memory.       
-        if (compressBufferOnQueue)
-        {
-            Iterator it = msg.getContentBodies().iterator();
-            while (it.hasNext())
-            {
-                ContentBody cb = (ContentBody) it.next();
-                cb.reduceBufferToFit();
-            }
-        }
-
-        _messages.offer(msg);
-
-        return true;
-    }
-
-
     /**
      * Determines whether there are queued messages. Sets _queueing to false if
      * there are no queued messages. This needs to be atomic.
      *
      * @return true if there are queued messages
      */
-    public boolean hasQueuedMessages()
-    {
-        return !_messages.isEmpty();
-    }
-
-    public int getQueueMessageCount()
-    {
-        return getMessageCount();
-    }
+    boolean hasQueuedMessages();
 
     /**
-     * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue 
as it must run the queue to determine size.
+     * This method should not be used to determin if there are messages in the 
queue.
      *
-     * @return int the number of messages in the delivery queue.
+     * @return int The number of messages in the queue
+     * @use hasQueuedMessages() for all controls relating to having messages 
on the queue.
      */
-
-    private int getMessageCount()
-    {
-        return _messages.size();
-    }
-
-
-    protected synchronized List<AMQMessage> getMessages()
-    {
-        return new ArrayList<AMQMessage>(_messages);
-    }
-
-    protected synchronized void removeAMessageFromTop() throws AMQException
-    {
-        AMQMessage msg = poll();
-        if (msg != null)
-        {
-            msg.dequeue(_queue);
-        }
-    }
-
-    protected synchronized void clearAllMessages() throws AMQException
-    {
-        AMQMessage msg = poll();
-        while (msg != null)
-        {
-            msg.dequeue(_queue);
-            msg = poll();
-        }
-    }
-
-    /**
-     * Only one thread should ever execute this method concurrently, but
-     * it can do so while other threads invoke deliver().
-     */
-    private void processQueue()
-    {
-        try
-        {
-            boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
-            while (hasQueuedMessages() && hasSubscribers)
-            {
-                // _log.debug("Have messages(" + _messages.size() + ") and 
subscribers");
-                Subscription next = _subscriptions.nextSubscriber(peek());
-
-                //We don't synchronize access to subscribers so need to 
re-check
-                if (next != null)
-                {
-                    next.send(peek(), _queue);
-                    poll();
-                }
-                else
-                {
-                    hasSubscribers = false;
-                }
-            }
-        }
-        catch (FailedDequeueException e)
-        {
-            _log.error("Unable to deliver message as dequeue failed: " + e, e);
-        }
-        finally
-        {
-            _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" 
+ " subscribers:" + _subscriptions.hasActiveSubscribers());
-            _processing.set(false);
-        }
-    }
-
-    private AMQMessage peek()
-    {
-        return _messages.peek();
-    }
-
-    private AMQMessage poll()
-    {
-        return _messages.poll();
-    }
-
-    Runner asyncDelivery = new Runner();
+    int getQueueMessageCount();
 
     /**
      * Requests that the delivery manager start processing the queue 
asynchronously
@@ -250,21 +52,7 @@
      *
      * @param executor the executor on which the delivery should take place
      */
-    void processAsync(Executor executor)
-    {
-        _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + 
getQueueMessageCount() + ")" +
-                   " Active:" + _subscriptions.hasActiveSubscribers() +
-                   " Processing:" + _processing.get());
-
-        if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
-        {
-            //are we already running? if so, don't re-run
-            if (_processing.compareAndSet(false, true))
-            {
-                executor.execute(asyncDelivery);
-            }
-        }
-    }
+    void processAsync(Executor executor);
 
     /**
      * Handles message delivery. The delivery manager is always in one of two 
modes;
@@ -273,52 +61,13 @@
      *
      * @param name the name of the entity on whose behalf we are delivering 
the message
      * @param msg  the message to deliver
-     * @throws FailedDequeueException if the message could not be dequeued
+     * @throws org.apache.qpid.server.queue.FailedDequeueException if the 
message could not be dequeued
      */
-    void deliver(String name, AMQMessage msg) throws FailedDequeueException
-    {
-        // first check whether we are queueing, and enqueue if we are
-        if (!enqueue(msg))
-        {
-            // not queueing so deliver message to 'next' subscriber
-            Subscription s = _subscriptions.nextSubscriber(msg);
-            if (s == null)
-            {
-                if (!msg.isImmediate())
-                {
-                    if (_subscriptions instanceof SubscriptionSet)
-                    {
-                        if (_log.isDebugEnabled())
-                        {
-                            _log.debug("Start Queueing messages Active Subs:" 
+ _subscriptions.hasActiveSubscribers()
-                                       + " Size :" + ((SubscriptionSet) 
_subscriptions).size()
-                                       + " Empty :" + ((SubscriptionSet) 
_subscriptions).isEmpty());
-                        }
-                    }
-                    else
-                    {
-                        if (_log.isDebugEnabled())
-                        {
-                            _log.debug("Start Queueing messages Active Subs:" 
+ _subscriptions.hasActiveSubscribers());
-                        }
-                    }
-                    // no subscribers yet so enter 'queueing' mode and queue 
this message
-                    startQueueing(msg);
-                }
-            }
-            else
-            {
-                s.send(msg, _queue);
-                msg.setDeliveredToConsumer();
-            }
-        }
-    }
+    void deliver(String name, AMQMessage msg) throws FailedDequeueException;
+
+    void removeAMessageFromTop() throws AMQException;
+
+    void clearAllMessages() throws AMQException;
 
-    private class Runner implements Runnable
-    {
-        public void run()
-        {
-            processQueue();
-        }
-    }
+    List<AMQMessage> getMessages();
 }

Added: 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=auto&rev=472060
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
 Tue Nov  7 03:05:25 2006
@@ -0,0 +1,252 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Manages delivery of messages on behalf of a queue
+ */
+class SynchronizedDeliveryManager implements DeliveryManager
+{
+    private static final Logger _log = 
Logger.getLogger(ConcurrentDeliveryManager.class);
+
+    /**
+     * Holds any queued messages
+     */
+    private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>();
+    /**
+     * Ensures that only one asynchronous task is running for this manager at
+     * any time.
+     */
+    private final AtomicBoolean _processing = new AtomicBoolean();
+    /**
+     * The subscriptions on the queue to whom messages are delivered
+     */
+    private final SubscriptionManager _subscriptions;
+
+    /**
+     * An indication of the mode we are in. If this is true then messages are
+     * being queued up in _messages for asynchronous delivery. If it is false
+     * then messages can be delivered directly as they come in.
+     */
+    private volatile boolean _queueing;
+
+    /**
+     * A reference to the queue we are delivering messages for. We need this 
to be able
+     * to pass the code that handles acknowledgements a handle on the queue.
+     */
+    private final AMQQueue _queue;
+
+    SynchronizedDeliveryManager(SubscriptionManager subscriptions, AMQQueue 
queue)
+    {
+        _subscriptions = subscriptions;
+        _queue = queue;
+    }
+
+    private synchronized boolean enqueue(AMQMessage msg)
+    {
+        if (msg.isImmediate())
+        {
+            return false;
+        }
+        else
+        {
+            if (_queueing)
+            {
+                _messages.offer(msg);
+                return true;
+            }
+            else
+            {
+                return false;
+            }
+        }
+    }
+
+    private synchronized void startQueueing(AMQMessage msg)
+    {
+        _queueing = true;
+        enqueue(msg);
+    }
+
+    /**
+     * Determines whether there are queued messages. Sets _queueing to false if
+     * there are no queued messages. This needs to be atomic.
+     *
+     * @return true if there are queued messages
+     */
+    public synchronized boolean hasQueuedMessages()
+    {
+        boolean empty = _messages.isEmpty();
+        if (empty)
+        {
+            _queueing = false;
+        }
+        return !empty;
+    }
+
+    public synchronized int getQueueMessageCount()
+    {
+        return _messages.size();
+    }
+
+    public synchronized List<AMQMessage> getMessages()
+    {
+        return new ArrayList<AMQMessage>(_messages);
+    }
+
+    public synchronized void removeAMessageFromTop() throws AMQException
+    {
+        AMQMessage msg = poll();
+        if (msg != null)
+        {
+            msg.dequeue(_queue);
+        }
+    }
+
+    public synchronized void clearAllMessages() throws AMQException
+    {
+        AMQMessage msg = poll();
+        while (msg != null)
+        {
+            msg.dequeue(_queue);
+            msg = poll();
+        }
+    }
+
+    /**
+     * Only one thread should ever execute this method concurrently, but
+     * it can do so while other threads invoke deliver().
+     */
+    private void processQueue()
+    {
+        try
+        {
+            boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+            while (hasQueuedMessages() && hasSubscribers)
+            {
+                Subscription next = _subscriptions.nextSubscriber(peek());
+                //We don't synchronize access to subscribers so need to 
re-check
+                if (next != null)
+                {
+                    try
+                    {
+                        next.send(poll(), _queue);
+                    }
+                    catch (AMQException e)
+                    {
+                        _log.error("Unable to deliver message: " + e, e);
+                    }
+                }
+                else
+                {
+                    hasSubscribers = false;
+                }
+            }
+        }
+        finally
+        {
+            _processing.set(false);
+        }
+    }
+
+    private synchronized AMQMessage peek()
+    {
+        return _messages.peek();
+    }
+
+    private synchronized AMQMessage poll()
+    {
+        return _messages.poll();
+    }
+
+    /**
+     * Requests that the delivery manager start processing the queue 
asynchronously
+     * if there is work that can be done (i.e. there are messages queued up and
+     * subscribers that can receive them.
+     * <p/>
+     * This should be called when subscribers are added, but only after the 
consume-ok
+     * message has been returned as message delivery may start immediately. It 
should also
+     * be called after unsuspending a client.
+     * <p/>
+     *
+     * @param executor the executor on which the delivery should take place
+     */
+    public void processAsync(Executor executor)
+    {
+        if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+        {
+            //are we already running? if so, don't re-run
+            if (_processing.compareAndSet(false, true))
+            {
+                executor.execute(new Runner());
+            }
+        }
+    }
+
+    /**
+     * Handles message delivery. The delivery manager is always in one of two 
modes;
+     * it is either queueing messages for asynchronous delivery or delivering
+     * directly.
+     *
+     * @param name the name of the entity on whose behalf we are delivering 
the message
+     * @param msg  the message to deliver
+     * @throws NoConsumersException if there are no active subscribers to 
deliver
+     *                              the message to
+     */
+    public void deliver(String name, AMQMessage msg) throws 
FailedDequeueException
+    {
+        // first check whether we are queueing, and enqueue if we are
+        if (!enqueue(msg))
+        {
+            synchronized(this)
+            {
+                // not queueing so deliver message to 'next' subscriber
+                Subscription s = _subscriptions.nextSubscriber(msg);
+                if (s == null)
+                {
+                    // no subscribers yet so enter 'queueing' mode and queue 
this message
+                    startQueueing(msg);
+                }
+                else
+                {
+                    s.send(msg, _queue);
+                    msg.setDeliveredToConsumer();
+                }
+            }
+        }
+
+    }
+
+    private class Runner implements Runnable
+    {
+        public void run()
+        {
+            processQueue();
+        }
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java?view=diff&rev=472060&r1=472059&r2=472060
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java
 Tue Nov  7 03:05:25 2006
@@ -54,7 +54,7 @@
 
     public ConcurrencyTest() throws Exception
     {
-        _deliveryMgr = new DeliveryManager(_subscriptionMgr, new 
AMQQueue("myQ", false, "guest", false,
+        _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new 
AMQQueue("myQ", false, "guest", false,
                                                                           new 
DefaultQueueRegistry()));
     }
 
@@ -165,7 +165,7 @@
             }
             else
             {
-                if (_deliveryMgr.getQueueMessageCount() == 0) {
+                if (!_deliveryMgr.hasQueuedMessages()) {
                     isComplete = true;
                 }
                 return null;

Added: 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java?view=auto&rev=472060
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
 Tue Nov  7 03:05:25 2006
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.DeliveryManagerTest;
+import org.apache.qpid.AMQException;
+import junit.framework.JUnit4TestAdapter;
+
+public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest
+{
+    public ConcurrentDeliveryManagerTest() throws Exception
+    {
+        try
+        {
+            System.setProperty("concurrentdeliverymanager","true");
+            _mgr = new ConcurrentDeliveryManager(_subscriptions, new 
AMQQueue("myQ", false, "guest", false,
+                                                                              
new DefaultQueueRegistry()));
+        }
+        catch (Throwable t)
+        {
+            t.printStackTrace();
+            throw new AMQException("Could not initialise delivery manager", t);
+        }
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(ConcurrentDeliveryManagerTest.class);
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=472060&r1=472059&r2=472060
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
 Tue Nov  7 03:05:25 2006
@@ -20,40 +20,38 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import org.junit.Test;
+import org.junit.runners.Suite;
+import org.junit.runner.RunWith;
 import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
 import org.apache.qpid.AMQException;
 import junit.framework.JUnit4TestAdapter;
 
-public class DeliveryManagerTest extends MessageTestHelper
[EMAIL PROTECTED](Suite.class)
[EMAIL PROTECTED]({
+        ConcurrentDeliveryManagerTest.class,
+        SynchronizedDeliveryManagerTest.class})
+
+abstract public class DeliveryManagerTest extends MessageTestHelper
 {
-    private final SubscriptionSet _subscriptions = new SubscriptionSet();
-    private final DeliveryManager _mgr;
+    protected final SubscriptionSet _subscriptions = new SubscriptionSet();
+    protected DeliveryManager _mgr;
 
     public DeliveryManagerTest() throws Exception
     {
-        try
-        {
-            _mgr = new DeliveryManager(_subscriptions, new AMQQueue("myQ", 
false, "guest", false,
-                                                                new 
DefaultQueueRegistry()));
-        }
-        catch(Throwable t)
-        {
-            t.printStackTrace();
-            throw new AMQException("Could not initialise delivery manager", t);
-        }
     }
 
+
     @Test
     public void startInQueueingMode() throws AMQException
     {
         AMQMessage[] messages = new AMQMessage[10];
-        for(int i = 0; i < messages.length; i++)
+        for (int i = 0; i < messages.length; i++)
         {
             messages[i] = message();
         }
         int batch = messages.length / 2;
 
-        for(int i = 0; i < batch; i++)
+        for (int i = 0; i < batch; i++)
         {
             _mgr.deliver("Me", messages[i]);
         }
@@ -63,7 +61,7 @@
         _subscriptions.addSubscriber(s1);
         _subscriptions.addSubscriber(s2);
 
-        for(int i = batch; i < messages.length; i++)
+        for (int i = batch; i < messages.length; i++)
         {
             _mgr.deliver("Me", messages[i]);
         }
@@ -76,9 +74,9 @@
         assertEquals(messages.length / 2, s1.getMessages().size());
         assertEquals(messages.length / 2, s2.getMessages().size());
 
-        for(int i = 0; i < messages.length; i++)
+        for (int i = 0; i < messages.length; i++)
         {
-            if(i % 2 == 0)
+            if (i % 2 == 0)
             {
                 assertTrue(s1.getMessages().get(i / 2) == messages[i]);
             }
@@ -93,7 +91,7 @@
     public void startInDirectMode() throws AMQException
     {
         AMQMessage[] messages = new AMQMessage[10];
-        for(int i = 0; i < messages.length; i++)
+        for (int i = 0; i < messages.length; i++)
         {
             messages[i] = message();
         }
@@ -102,13 +100,13 @@
         TestSubscription s1 = new TestSubscription("1");
         _subscriptions.addSubscriber(s1);
 
-        for(int i = 0; i < batch; i++)
+        for (int i = 0; i < batch; i++)
         {
             _mgr.deliver("Me", messages[i]);
         }
 
         assertEquals(batch, s1.getMessages().size());
-        for(int i = 0; i < batch; i++)
+        for (int i = 0; i < batch; i++)
         {
             assertTrue(messages[i] == s1.getMessages().get(i));
         }
@@ -116,7 +114,7 @@
         assertEquals(0, s1.getMessages().size());
 
         s1.setSuspended(true);
-        for(int i = batch; i < messages.length; i++)
+        for (int i = batch; i < messages.length; i++)
         {
             _mgr.deliver("Me", messages[i]);
         }
@@ -128,22 +126,22 @@
         _mgr.processAsync(new OnCurrentThreadExecutor());
         assertEquals(messages.length - batch, s1.getMessages().size());
 
-        for(int i = batch; i < messages.length; i++)
+        for (int i = batch; i < messages.length; i++)
         {
             assertTrue(messages[i] == s1.getMessages().get(i - batch));
         }
 
     }
 
-    @Test (expected=NoConsumersException.class)
+    @Test(expected = NoConsumersException.class)
     public void noConsumers() throws AMQException
     {
         AMQMessage msg = message(true);
         _mgr.deliver("Me", msg);
-        msg.checkDeliveredToConsumer();        
+        msg.checkDeliveredToConsumer();
     }
 
-    @Test (expected=NoConsumersException.class)
+    @Test(expected = NoConsumersException.class)
     public void noActiveConsumers() throws AMQException
     {
         TestSubscription s = new TestSubscription("A");

Added: 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java?view=auto&rev=472060
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
 Tue Nov  7 03:05:25 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.queue.SynchronizedDeliveryManager;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
+import org.apache.qpid.server.queue.DeliveryManagerTest;
+import org.apache.qpid.AMQException;
+import junit.framework.JUnit4TestAdapter;
+
+public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest
+{
+    public SynchronizedDeliveryManagerTest() throws Exception
+    {
+        try
+        {
+            System.setProperty("concurrentdeliverymanager","false");
+            _mgr = new SynchronizedDeliveryManager(_subscriptions, new 
AMQQueue("myQ", false, "guest", false,
+                                                                               
 new DefaultQueueRegistry()));
+        }
+        catch (Throwable t)
+        {
+            t.printStackTrace();
+            throw new AMQException("Could not initialise delivery manager", t);
+        }
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(SynchronizedDeliveryManagerTest.class);
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to