Author: ritchiem
Date: Wed Apr 30 08:32:42 2008
New Revision: 652399
URL: http://svn.apache.org/viewvc?rev=652399&view=rev
Log:
QPID-888,QPID-886 : Fixed all management uses of _lock.lock / _lock.unlock so
that they correctly call unlock from a finally block in the CSDM. There are two
issues that cover that. QPID-888 - Fix the management ones and QPID-886 to fix
the use in removeExpired.
Modified:
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified:
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=652399&r1=652398&r2=652399&view=diff
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Wed Apr 30 08:32:42 2008
@@ -214,30 +214,32 @@
public void removeExpired() throws AMQException
{
_lock.lock();
-
-
- // New Context to for dealing with the MessageStore.
- StoreContext context = new StoreContext();
-
- for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
+ try
{
- QueueEntry entry = iter.next();
- if(entry.expired())
+ // New Context to for dealing with the MessageStore.
+ StoreContext context = new StoreContext();
+
+ for(Iterator<QueueEntry> iter = _messages.iterator();
iter.hasNext();)
{
- // fixme: Currently we have to update the total byte size here
for the data in the queue
- _totalMessageSize.addAndGet(-entry.getSize());
+ QueueEntry entry = iter.next();
+ if(entry.expired())
+ {
+ // fixme: Currently we have to update the total byte size
here for the data in the queue
+ _totalMessageSize.addAndGet(-entry.getSize());
- // Remove the message from the queue in the MessageStore
- _queue.dequeue(context,entry);
+ // Remove the message from the queue in the MessageStore
+ _queue.dequeue(context,entry);
- // This queue nolonger needs a reference to this message
- entry.getMessage().decrementReference(context);
- iter.remove();
+ // This queue nolonger needs a reference to this message
+ entry.getMessage().decrementReference(context);
+ iter.remove();
+ }
}
- }
-
-
- _lock.unlock();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
/** @return the state of the async processor. */
@@ -253,14 +255,20 @@
*/
public List<QueueEntry> getMessages()
{
- _lock.lock();
- List<QueueEntry> list = new ArrayList<QueueEntry>();
+ List<QueueEntry> list = new ArrayList<QueueEntry>();
- for (QueueEntry entry : _messages)
+ _lock.lock();
+ try
{
- list.add(entry);
+ for (QueueEntry entry : _messages)
+ {
+ list.add(entry);
+ }
+ }
+ finally
+ {
+ _lock.unlock();
}
- _lock.unlock();
return list;
}
@@ -282,24 +290,28 @@
long maxMessageCount = toMessageId - fromMessageId + 1;
- _lock.lock();
-
List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
-
- for (QueueEntry entry : _messages)
+ _lock.lock();
+ try
{
- long msgId = entry.getMessage().getMessageId();
- if (msgId >= fromMessageId && msgId <= toMessageId)
+ for (QueueEntry entry : _messages)
{
- foundMessagesList.add(entry);
- }
- // break if the no of messages are found
- if (foundMessagesList.size() == maxMessageCount)
- {
- break;
+ long msgId = entry.getMessage().getMessageId();
+ if (msgId >= fromMessageId && msgId <= toMessageId)
+ {
+ foundMessagesList.add(entry);
+ }
+ // break if the no of messages are found
+ if (foundMessagesList.size() == maxMessageCount)
+ {
+ break;
+ }
}
}
- _lock.unlock();
+ finally
+ {
+ _lock.unlock();
+ }
return foundMessagesList;
}
@@ -449,51 +461,62 @@
{
_lock.lock();
- QueueEntry entry = _messages.poll();
-
- if (entry != null)
+ try
{
- queue.dequeue(storeContext, entry);
+ QueueEntry entry = _messages.poll();
+
+ if (entry != null)
+ {
+ queue.dequeue(storeContext, entry);
- _totalMessageSize.addAndGet(-entry.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
- //If this causes ref count to hit zero then data will be purged so
message.getSize() will NPE.
- entry.getMessage().decrementReference(storeContext);
+ //If this causes ref count to hit zero then data will be
purged so message.getSize() will NPE.
+ entry.getMessage().decrementReference(storeContext);
+ }
+ }
+ finally
+ {
+ _lock.unlock();
}
-
- _lock.unlock();
}
public long clearAllMessages(StoreContext storeContext) throws AMQException
{
long count = 0;
- _lock.lock();
- synchronized (_queueHeadLock)
+ _lock.lock();
+ try
{
- QueueEntry entry = getNextMessage();
+ synchronized (_queueHeadLock)
+ {
+ QueueEntry entry = getNextMessage();
- // todo: note: why do we need this? Why not reuse the passed
'storeContext'
- //Create a new StoreContext for decrementing the References
- StoreContext context = new StoreContext();
+ // todo: note: why do we need this? Why not reuse the passed
'storeContext'
+ //Create a new StoreContext for decrementing the References
+ StoreContext context = new StoreContext();
- while (entry != null)
- {
- //and remove it
- _messages.poll();
+ while (entry != null)
+ {
+ //and remove it
+ _messages.poll();
- // todo: NOTE: Why is this a different context to the new
local 'context'?
- _queue.dequeue(storeContext, entry);
+ // todo: NOTE: Why is this a different context to the new
local 'context'?
+ _queue.dequeue(storeContext, entry);
- entry.getMessage().decrementReference(context);
+ entry.getMessage().decrementReference(context);
- entry = getNextMessage();
- count++;
+ entry = getNextMessage();
+ count++;
+ }
+ _totalMessageSize.set(0L);
}
- _totalMessageSize.set(0L);
}
- _lock.unlock();
+ finally
+ {
+ _lock.unlock();
+ }
return count;
}
@@ -773,24 +796,30 @@
public void enqueueMovedMessages(StoreContext storeContext,
List<QueueEntry> movedMessageList)
{
_lock.lock();
- for (QueueEntry entry : movedMessageList)
- {
- addMessageToQueue(entry, false);
- }
-
- // enqueue on the pre delivery queues
- for (Subscription sub : _subscriptions.getSubscriptions())
+ try
{
for (QueueEntry entry : movedMessageList)
{
- // Only give the message to those that want them.
- if (sub.hasInterest(entry))
+ addMessageToQueue(entry, false);
+ }
+
+ // enqueue on the pre delivery queues
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+ for (QueueEntry entry : movedMessageList)
{
- sub.enqueueForPreDelivery(entry, true);
+ // Only give the message to those that want them.
+ if (sub.hasInterest(entry))
+ {
+ sub.enqueueForPreDelivery(entry, true);
+ }
}
}
}
- _lock.unlock();
+ finally
+ {
+ _lock.unlock();
+ }
}
/**