Author: ritchiem
Date: Wed Apr 30 08:32:42 2008
New Revision: 652399

URL: http://svn.apache.org/viewvc?rev=652399&view=rev
Log:
QPID-888,QPID-886 : Fixed all management uses of _lock.lock / _lock.unlock so 
that they correctly call unlock from a finally block in the CSDM. There are two 
issues that cover that. QPID-888 - Fix the management ones and QPID-886 to fix 
the use in removeExpired.

Modified:
    
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Modified: 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=652399&r1=652398&r2=652399&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Wed Apr 30 08:32:42 2008
@@ -214,30 +214,32 @@
     public void removeExpired() throws AMQException
     {
         _lock.lock();
-
-
-        // New Context to for dealing with the MessageStore.
-        StoreContext context = new StoreContext();
-
-        for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
+        try
         {
-            QueueEntry entry = iter.next();
-            if(entry.expired())
+            // New Context to for dealing with the MessageStore.
+            StoreContext context = new StoreContext();
+
+            for(Iterator<QueueEntry> iter = _messages.iterator(); 
iter.hasNext();)
             {
-                // fixme: Currently we have to update the total byte size here 
for the data in the queue  
-                _totalMessageSize.addAndGet(-entry.getSize());
+                QueueEntry entry = iter.next();
+                if(entry.expired())
+                {
+                    // fixme: Currently we have to update the total byte size 
here for the data in the queue
+                    _totalMessageSize.addAndGet(-entry.getSize());
 
-                // Remove the message from the queue in the MessageStore
-                _queue.dequeue(context,entry);
+                    // Remove the message from the queue in the MessageStore
+                    _queue.dequeue(context,entry);
 
-                // This queue nolonger needs a reference to this message
-                entry.getMessage().decrementReference(context);
-                iter.remove();
+                    // This queue nolonger needs a reference to this message
+                    entry.getMessage().decrementReference(context);
+                    iter.remove();
+                }
             }
-           }
-
-
-        _lock.unlock();
+        }
+        finally
+        {
+            _lock.unlock();
+        }
     }
 
     /** @return the state of the async processor. */
@@ -253,14 +255,20 @@
      */
     public List<QueueEntry> getMessages()
     {
-        _lock.lock();
-        List<QueueEntry> list = new ArrayList<QueueEntry>();
+         List<QueueEntry> list = new ArrayList<QueueEntry>();
 
-        for (QueueEntry entry : _messages)
+        _lock.lock();
+        try
         {
-            list.add(entry);
+            for (QueueEntry entry : _messages)
+            {
+                list.add(entry);
+            }
+        }
+        finally
+        {
+            _lock.unlock();
         }
-        _lock.unlock();
 
         return list;
     }
@@ -282,24 +290,28 @@
 
         long maxMessageCount = toMessageId - fromMessageId + 1;
 
-        _lock.lock();
-
         List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
-
-        for (QueueEntry entry : _messages)
+        _lock.lock();
+        try
         {
-            long msgId = entry.getMessage().getMessageId();
-            if (msgId >= fromMessageId && msgId <= toMessageId)
+            for (QueueEntry entry : _messages)
             {
-                foundMessagesList.add(entry);
-            }
-            // break if the no of messages are found
-            if (foundMessagesList.size() == maxMessageCount)
-            {
-                break;
+                long msgId = entry.getMessage().getMessageId();
+                if (msgId >= fromMessageId && msgId <= toMessageId)
+                {
+                    foundMessagesList.add(entry);
+                }
+                // break if the no of messages are found
+                if (foundMessagesList.size() == maxMessageCount)
+                {
+                    break;
+                }
             }
         }
-        _lock.unlock();
+        finally
+        {
+            _lock.unlock();
+        }
 
         return foundMessagesList;
     }
@@ -449,51 +461,62 @@
     {
         _lock.lock();
 
-        QueueEntry entry = _messages.poll();
-
-        if (entry != null)
+        try
         {
-            queue.dequeue(storeContext, entry);
+            QueueEntry entry = _messages.poll();
+
+            if (entry != null)
+            {
+                queue.dequeue(storeContext, entry);
 
-            _totalMessageSize.addAndGet(-entry.getSize());
+                _totalMessageSize.addAndGet(-entry.getSize());
 
-            //If this causes ref count to hit zero then data will be purged so 
message.getSize() will NPE.
-            entry.getMessage().decrementReference(storeContext);
+                //If this causes ref count to hit zero then data will be 
purged so message.getSize() will NPE.
+                entry.getMessage().decrementReference(storeContext);
 
+            }
+        }
+        finally
+        {
+            _lock.unlock();
         }
-
-        _lock.unlock();
     }
 
     public long clearAllMessages(StoreContext storeContext) throws AMQException
     {
         long count = 0;
-        _lock.lock();
 
-        synchronized (_queueHeadLock)
+        _lock.lock();
+        try
         {
-            QueueEntry entry = getNextMessage();
+            synchronized (_queueHeadLock)
+            {
+                QueueEntry entry = getNextMessage();
 
-            // todo: note: why do we need this? Why not reuse the passed 
'storeContext'
-            //Create a new StoreContext for decrementing the References
-            StoreContext context = new StoreContext();
+                // todo: note: why do we need this? Why not reuse the passed 
'storeContext'
+                //Create a new StoreContext for decrementing the References
+                StoreContext context = new StoreContext();
 
-            while (entry != null)
-            {
-                //and remove it
-                _messages.poll();
+                while (entry != null)
+                {
+                    //and remove it
+                    _messages.poll();
 
-                // todo: NOTE: Why is this a different context to the new 
local 'context'?
-                _queue.dequeue(storeContext, entry);
+                    // todo: NOTE: Why is this a different context to the new 
local 'context'?
+                    _queue.dequeue(storeContext, entry);
 
-                entry.getMessage().decrementReference(context);
+                    entry.getMessage().decrementReference(context);
 
-                entry = getNextMessage();
-                count++;
+                    entry = getNextMessage();
+                    count++;
+                }
+                _totalMessageSize.set(0L);
             }
-            _totalMessageSize.set(0L);
         }
-        _lock.unlock();
+        finally
+        {
+            _lock.unlock();
+        }
         return count;
     }
 
@@ -773,24 +796,30 @@
     public void enqueueMovedMessages(StoreContext storeContext, 
List<QueueEntry> movedMessageList)
     {
         _lock.lock();
-        for (QueueEntry entry : movedMessageList)
-        {
-            addMessageToQueue(entry, false);
-        }
-
-        // enqueue on the pre delivery queues
-        for (Subscription sub : _subscriptions.getSubscriptions())
+        try
         {
             for (QueueEntry entry : movedMessageList)
             {
-                // Only give the message to those that want them.
-                if (sub.hasInterest(entry))
+                addMessageToQueue(entry, false);
+            }
+
+            // enqueue on the pre delivery queues
+            for (Subscription sub : _subscriptions.getSubscriptions())
+            {
+                for (QueueEntry entry : movedMessageList)
                 {
-                    sub.enqueueForPreDelivery(entry, true);
+                    // Only give the message to those that want them.
+                    if (sub.hasInterest(entry))
+                    {
+                        sub.enqueueForPreDelivery(entry, true);
+                    }
                 }
             }
         }
-        _lock.unlock();
+        finally
+        {
+            _lock.unlock();
+        }
     }
 
     /**


Reply via email to