Author: ritchiem
Date: Wed Apr 30 07:40:18 2008
New Revision: 652388

URL: http://svn.apache.org/viewvc?rev=652388&view=rev
Log:
QPID-889 : Removed _reapingStoreContext from CSDM replaced with local 
StoreContext()s so they are not reused by different threads.

Modified:
    
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Modified: 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=652388&r1=652387&r2=652388&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Wed Apr 30 07:40:18 2008
@@ -87,10 +87,6 @@
     private final Object _queueHeadLock = new Object();
     private String _processingThreadName = "";
 
-
-    /** Used by any reaping thread to purge messages */
-    private StoreContext _reapingStoreContext = new StoreContext();
-
     ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, 
AMQQueue queue)
     {
 
@@ -220,14 +216,22 @@
         _lock.lock();
 
 
-           for(Iterator<QueueEntry> iter = _messages.iterator(); 
iter.hasNext();)
+        // New Context to for dealing with the MessageStore.
+        StoreContext context = new StoreContext();
+
+        for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
         {
             QueueEntry entry = iter.next();
             if(entry.expired())
             {
                 // fixme: Currently we have to update the total byte size here 
for the data in the queue  
                 _totalMessageSize.addAndGet(-entry.getSize());
-                _queue.dequeue(_reapingStoreContext,entry);
+
+                // Remove the message from the queue in the MessageStore
+                _queue.dequeue(context,entry);
+
+                // This queue nolonger needs a reference to this message
+                entry.getMessage().decrementReference(context);
                 iter.remove();
             }
            }
@@ -469,14 +473,20 @@
         synchronized (_queueHeadLock)
         {
             QueueEntry entry = getNextMessage();
+
+            // todo: note: why do we need this? Why not reuse the passed 
'storeContext'
+            //Create a new StoreContext for decrementing the References
+            StoreContext context = new StoreContext();
+
             while (entry != null)
             {
                 //and remove it
                 _messages.poll();
 
+                // todo: NOTE: Why is this a different context to the new 
local 'context'?
                 _queue.dequeue(storeContext, entry);
 
-                entry.getMessage().decrementReference(_reapingStoreContext);
+                entry.getMessage().decrementReference(context);
 
                 entry = getNextMessage();
                 count++;
@@ -518,10 +528,13 @@
             {
                 _totalMessageSize.addAndGet(-entry.getSize());
 
+                // New Store Context for removing expired messages
+                StoreContext storeContext = new StoreContext();
+
                 // Use the reapingStoreContext as any sub(if we have one) may 
be in a tx.
-                _queue.dequeue(_reapingStoreContext, entry);
+                _queue.dequeue(storeContext, entry);
 
-                message.decrementReference(_reapingStoreContext);
+                message.decrementReference(storeContext);
 
                 if (_log.isInfoEnabled())
                 {


Reply via email to