Author: aidan
Date: Tue May  6 02:26:37 2008
New Revision: 653720

URL: http://svn.apache.org/viewvc?rev=653720&view=rev
Log:
Merged revisions 652388-652389,652399,652567-652568,653416 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x

........
  r652388 | ritchiem | 2008-04-30 15:40:18 +0100 (Wed, 30 Apr 2008) | 2 lines
  
  QPID-889 : Removed _reapingStoreContext from CSDM replaced with local 
StoreContext()s so they are not reused by different threads.
........
  r652389 | ritchiem | 2008-04-30 15:40:45 +0100 (Wed, 30 Apr 2008) | 1 line
  
  QPID-887 : Renamed QueueHouseKeeping threads so they can be identified in 
thread dump. Named Queue-housekeeping-<virtualhost name>
........
  r652399 | ritchiem | 2008-04-30 16:32:42 +0100 (Wed, 30 Apr 2008) | 1 line
  
  QPID-888,QPID-886 : Fixed all management uses of _lock.lock / _lock.unlock so 
that they correctly call unlock from a finally block in the CSDM. There are two 
issues that cover that. QPID-888 - Fix the management ones and QPID-886 to fix 
the use in removeExpired.
........
  r652567 | aidan | 2008-05-01 17:32:20 +0100 (Thu, 01 May 2008) | 1 line
  
  QPID-994 Dont wait for attain state as connection is closed by we get CloseOk
........
  r652568 | aidan | 2008-05-01 17:35:09 +0100 (Thu, 01 May 2008) | 1 line
  
  QPID-1001 dont set the expiration time if TTL is 0
........
  r653416 | aidan | 2008-05-05 11:24:50 +0100 (Mon, 05 May 2008) | 1 line
  
  QPID-1019 prevent messages being dequeued unecessarily, from rgodfrey
........

Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs?rev=653720&r1=653719&r2=653720&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs Tue 
May  6 02:26:37 2008
@@ -263,7 +263,6 @@
 
             _log.Debug("Blocking for connection close ok frame");
 
-            _stateManager.AttainState(AMQState.CONNECTION_CLOSED);
             Disconnect();
         }
 

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs?rev=653720&r1=653719&r2=653720&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs 
Tue May  6 02:26:37 2008
@@ -306,7 +306,10 @@
          if ( !_disableTimestamps )
          {
             message.Timestamp = DateTime.UtcNow.Ticks;
-            message.Expiration = message.Timestamp + timeToLive;
+            if (timeToLive != 0)
+            {
+                message.Expiration = message.Timestamp + timeToLive;
+            }
          } else
          {
             message.Expiration = 0;

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=653720&r1=653719&r2=653720&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Tue May  6 02:26:37 2008
@@ -469,7 +469,7 @@
 
         synchronized (_unacknowledgedMessageMap.getLock())
         {
-            _unacknowledgedMessageMap.add(deliveryTag, new 
UnacknowledgedMessage(entry, consumerTag, deliveryTag));
+            _unacknowledgedMessageMap.add(deliveryTag, new 
UnacknowledgedMessage(entry, consumerTag, 
deliveryTag,_unacknowledgedMessageMap));
             checkSuspension();
         }
     }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java?rev=653720&r1=653719&r2=653720&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
 Tue May  6 02:26:37 2008
@@ -34,13 +34,18 @@
     public final long deliveryTag;
 
     private boolean _queueDeleted;
+    private final UnacknowledgedMessageMap _unacknowledgeMessageMap;
 
 
-    public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag, 
long deliveryTag)
+    public UnacknowledgedMessage(QueueEntry entry,
+                                 AMQShortString consumerTag,
+                                 long deliveryTag,
+                                 final UnacknowledgedMessageMap 
unacknowledgedMessageMap)
     {
         this.entry = entry;
         this.consumerTag = consumerTag;
         this.deliveryTag = deliveryTag;
+        _unacknowledgeMessageMap = unacknowledgedMessageMap;
     }
 
     public String toString()
@@ -60,12 +65,20 @@
 
     public void discard(StoreContext storeContext) throws AMQException
     {
-        if (entry.getQueue() != null)
+        synchronized(_unacknowledgeMessageMap)
         {
-            entry.getQueue().dequeue(storeContext, entry);
+            if(_unacknowledgeMessageMap.contains(deliveryTag))
+            {
+
+                if (entry.getQueue() != null)
+                {
+                    entry.getQueue().dequeue(storeContext, entry);
+                }
+                //if the queue is null then the message is waiting to be 
acked, but has been removed.
+                entry.getMessage().decrementReference(storeContext);
+            }
         }
-        //if the queue is null then the message is waiting to be acked, but 
has been removed.
-        entry.getMessage().decrementReference(storeContext);
+        
     }
 
     public AMQMessage getMessage()

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=653720&r1=653719&r2=653720&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Tue May  6 02:26:37 2008
@@ -87,10 +87,6 @@
     private final Object _queueHeadLock = new Object();
     private String _processingThreadName = "";
 
-
-    /** Used by any reaping thread to purge messages */
-    private StoreContext _reapingStoreContext = new StoreContext();
-
     ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, 
AMQQueue queue)
     {
 
@@ -218,22 +214,32 @@
     public void removeExpired() throws AMQException
     {
         _lock.lock();
-
-
-           for(Iterator<QueueEntry> iter = _messages.iterator(); 
iter.hasNext();)
+        try
         {
-            QueueEntry entry = iter.next();
-            if(entry.expired())
+            // New Context to for dealing with the MessageStore.
+            StoreContext context = new StoreContext();
+
+            for(Iterator<QueueEntry> iter = _messages.iterator(); 
iter.hasNext();)
             {
-                // fixme: Currently we have to update the total byte size here 
for the data in the queue  
-                _totalMessageSize.addAndGet(-entry.getSize());
-                _queue.dequeue(_reapingStoreContext,entry);
-                iter.remove();
-            }
-           }
+                QueueEntry entry = iter.next();
+                if(entry.expired())
+                {
+                    // fixme: Currently we have to update the total byte size 
here for the data in the queue
+                    _totalMessageSize.addAndGet(-entry.getSize());
 
+                    // Remove the message from the queue in the MessageStore
+                    _queue.dequeue(context,entry);
 
-        _lock.unlock();
+                    // This queue nolonger needs a reference to this message
+                    entry.getMessage().decrementReference(context);
+                    iter.remove();
+                }
+            }
+        }
+        finally
+        {
+            _lock.unlock();
+        }
     }
 
     /** @return the state of the async processor. */
@@ -249,14 +255,20 @@
      */
     public List<QueueEntry> getMessages()
     {
-        _lock.lock();
-        List<QueueEntry> list = new ArrayList<QueueEntry>();
+         List<QueueEntry> list = new ArrayList<QueueEntry>();
 
-        for (QueueEntry entry : _messages)
+        _lock.lock();
+        try
         {
-            list.add(entry);
+            for (QueueEntry entry : _messages)
+            {
+                list.add(entry);
+            }
+        }
+        finally
+        {
+            _lock.unlock();
         }
-        _lock.unlock();
 
         return list;
     }
@@ -278,24 +290,28 @@
 
         long maxMessageCount = toMessageId - fromMessageId + 1;
 
-        _lock.lock();
-
         List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
-
-        for (QueueEntry entry : _messages)
+        _lock.lock();
+        try
         {
-            long msgId = entry.getMessage().getMessageId();
-            if (msgId >= fromMessageId && msgId <= toMessageId)
+            for (QueueEntry entry : _messages)
             {
-                foundMessagesList.add(entry);
-            }
-            // break if the no of messages are found
-            if (foundMessagesList.size() == maxMessageCount)
-            {
-                break;
+                long msgId = entry.getMessage().getMessageId();
+                if (msgId >= fromMessageId && msgId <= toMessageId)
+                {
+                    foundMessagesList.add(entry);
+                }
+                // break if the no of messages are found
+                if (foundMessagesList.size() == maxMessageCount)
+                {
+                    break;
+                }
             }
         }
-        _lock.unlock();
+        finally
+        {
+            _lock.unlock();
+        }
 
         return foundMessagesList;
     }
@@ -445,45 +461,62 @@
     {
         _lock.lock();
 
-        QueueEntry entry = _messages.poll();
-
-        if (entry != null)
+        try
         {
-            queue.dequeue(storeContext, entry);
+            QueueEntry entry = _messages.poll();
 
-            _totalMessageSize.addAndGet(-entry.getSize());
+            if (entry != null)
+            {
+                queue.dequeue(storeContext, entry);
 
-            //If this causes ref count to hit zero then data will be purged so 
message.getSize() will NPE.
-            entry.getMessage().decrementReference(storeContext);
+                _totalMessageSize.addAndGet(-entry.getSize());
 
-        }
+                //If this causes ref count to hit zero then data will be 
purged so message.getSize() will NPE.
+                entry.getMessage().decrementReference(storeContext);
 
-        _lock.unlock();
+            }
+        }
+        finally
+        {
+            _lock.unlock();
+        }
     }
 
     public long clearAllMessages(StoreContext storeContext) throws AMQException
     {
         long count = 0;
-        _lock.lock();
 
-        synchronized (_queueHeadLock)
+        _lock.lock();
+        try
         {
-            QueueEntry entry = getNextMessage();
-            while (entry != null)
+            synchronized (_queueHeadLock)
             {
-                //and remove it
-                _messages.poll();
+                QueueEntry entry = getNextMessage();
 
-                _queue.dequeue(storeContext, entry);
+                // todo: note: why do we need this? Why not reuse the passed 
'storeContext'
+                //Create a new StoreContext for decrementing the References
+                StoreContext context = new StoreContext();
+
+                while (entry != null)
+                {
+                    //and remove it
+                    _messages.poll();
 
-                entry.getMessage().decrementReference(_reapingStoreContext);
+                    // todo: NOTE: Why is this a different context to the new 
local 'context'?
+                    _queue.dequeue(storeContext, entry);
 
-                entry = getNextMessage();
-                count++;
+                    entry.getMessage().decrementReference(context);
+
+                    entry = getNextMessage();
+                    count++;
+                }
+                _totalMessageSize.set(0L);
             }
-            _totalMessageSize.set(0L);
         }
-        _lock.unlock();
+        finally
+        {
+            _lock.unlock();
+        }
         return count;
     }
 
@@ -518,10 +551,13 @@
             {
                 _totalMessageSize.addAndGet(-entry.getSize());
 
+                // New Store Context for removing expired messages
+                StoreContext storeContext = new StoreContext();
+
                 // Use the reapingStoreContext as any sub(if we have one) may 
be in a tx.
-                _queue.dequeue(_reapingStoreContext, entry);
+                _queue.dequeue(storeContext, entry);
 
-                message.decrementReference(_reapingStoreContext);
+                message.decrementReference(storeContext);
 
                 if (_log.isInfoEnabled())
                 {
@@ -760,24 +796,30 @@
     public void enqueueMovedMessages(StoreContext storeContext, 
List<QueueEntry> movedMessageList)
     {
         _lock.lock();
-        for (QueueEntry entry : movedMessageList)
-        {
-            addMessageToQueue(entry, false);
-        }
-
-        // enqueue on the pre delivery queues
-        for (Subscription sub : _subscriptions.getSubscriptions())
+        try
         {
             for (QueueEntry entry : movedMessageList)
             {
-                // Only give the message to those that want them.
-                if (sub.hasInterest(entry))
+                addMessageToQueue(entry, false);
+            }
+
+            // enqueue on the pre delivery queues
+            for (Subscription sub : _subscriptions.getSubscriptions())
+            {
+                for (QueueEntry entry : movedMessageList)
                 {
-                    sub.enqueueForPreDelivery(entry, true);
+                    // Only give the message to those that want them.
+                    if (sub.hasInterest(entry))
+                    {
+                        sub.enqueueForPreDelivery(entry, true);
+                    }
                 }
             }
         }
-        _lock.unlock();
+        finally
+        {
+            _lock.unlock();
+        }
     }
 
     /**

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=653720&r1=653719&r2=653720&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
 Tue May  6 02:26:37 2008
@@ -139,7 +139,7 @@
                 };
 
                 TestMessage message = new TestMessage(deliveryTag, i, info, 
txnContext);
-                _map.add(deliveryTag, new UnacknowledgedMessage(new 
QueueEntry(null,message), null, deliveryTag));
+                _map.add(deliveryTag, new UnacknowledgedMessage(new 
QueueEntry(null,message), null, deliveryTag, _map));
             }
             _acked = acked;
             _unacked = unacked;


Reply via email to