Author: ritchiem
Date: Fri Jun 13 08:48:54 2008
New Revision: 667574

URL: http://svn.apache.org/viewvc?rev=667574&view=rev
Log:
Merged revisions 667561 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x

........
  r667561 | ritchiem | 2008-06-13 15:56:45 +0100 (Fri, 13 Jun 2008) | 3 lines
  
  QPID-1136 : Provided a fix for the leak in UnacknowledgedMessage when acking. 
Added a new InternalBrokerBaseCase for performing testing on the broker without 
using the client libraries. This allows for testing closer to AMQP.
  
  Further investigation is required to identify why the .Net was causing the 
refcounting problems that required the previous change to Unacknowledged 
message introducing this .
........

Added:
    
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
      - copied unchanged from r667561, 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
    
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
      - copied unchanged from r667561, 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
    
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
      - copied unchanged from r667561, 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
    
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
      - copied unchanged from r667561, 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
Removed:
    
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
    
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
Modified:
    incubator/qpid/branches/M2.1.x/   (props changed)
    
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
    
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java

Propchange: incubator/qpid/branches/M2.1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
 (original)
+++ 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
 Fri Jun 13 08:48:54 2008
@@ -26,6 +26,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.store.StoreContext;
 
 public interface UnacknowledgedMessageMap
 {
@@ -55,7 +56,7 @@
 
     UnacknowledgedMessage remove(long deliveryTag);
 
-    void drainTo(Collection<UnacknowledgedMessage> destination, long 
deliveryTag) throws AMQException;
+    public void drainTo(long deliveryTag, StoreContext storeContext) throws 
AMQException;
 
     Collection<UnacknowledgedMessage> cancelAllMessages();
 

Modified: 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 (original)
+++ 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 Fri Jun 13 08:48:54 2008
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.ack;
 
+import org.apache.qpid.server.store.StoreContext;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -169,7 +170,7 @@
         }
     }
 
-    public void drainTo(Collection<UnacknowledgedMessage> destination, long 
deliveryTag) throws AMQException
+    public void drainTo(long deliveryTag, StoreContext storeContext) throws 
AMQException
     {
         synchronized (_lock)
         {
@@ -185,10 +186,12 @@
                                            " When deliveryTag is:" + 
deliveryTag + "ES:" + _map.entrySet().toString());
                 }
 
+                //Message has been ack so discard it. This will dequeue and 
decrement the reference.
+                unacked.getValue().discard(storeContext);
+
                 it.remove();
                 _unackedSize -= unacked.getValue().getMessage().getSize();
 
-                destination.add(unacked.getValue());
                 if (unacked.getKey() == deliveryTag)
                 {
                     break;

Modified: 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 (original)
+++ 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 Fri Jun 13 08:48:54 2008
@@ -155,31 +155,13 @@
                     throw new AMQException("Multiple ack on delivery tag " + 
deliveryTag + " not known for channel");
                 }
 
-                LinkedList<UnacknowledgedMessage> acked = new 
LinkedList<UnacknowledgedMessage>();
-                unacknowledgedMessageMap.drainTo(acked, deliveryTag);
-                for (UnacknowledgedMessage msg : acked)
-                {
-                    if (!_browsedAcks.contains(deliveryTag))
-                    {
-                        if (_log.isDebugEnabled())
-                        {
-                            _log.debug("Discarding message: " + 
msg.getMessage().getMessageId());
-                        }
-
-                        //Message has been ack so discard it. This will 
dequeue and decrement the reference.
-                        msg.discard(_storeContext);
-                    }
-                    else
-                    {
-                        _browsedAcks.remove(deliveryTag);
-                    }
-                }
+                unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
             }
         }
         else
         {
             UnacknowledgedMessage msg;
-            msg = unacknowledgedMessageMap.remove(deliveryTag);
+            msg = unacknowledgedMessageMap.get(deliveryTag);
 
             if (msg == null)
             {
@@ -189,20 +171,10 @@
                                        _channel.getChannelId());
             }
 
-            if (!_browsedAcks.contains(deliveryTag))
-            {
-                if (_log.isDebugEnabled())
-                {
-                    _log.debug("Discarding message: " + 
msg.getMessage().getMessageId());
-                }
+            //Message has been ack so discard it. This will dequeue and 
decrement the reference.
+            msg.discard(_storeContext);
 
-                //Message has been ack so discard it. This will dequeue and 
decrement the reference.
-                msg.discard(_storeContext);
-            }
-            else
-            {
-                _browsedAcks.remove(deliveryTag);
-            }
+            unacknowledgedMessageMap.remove(deliveryTag);
 
             if (_log.isDebugEnabled())
             {

Modified: 
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 (original)
+++ 
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 Fri Jun 13 08:48:54 2008
@@ -32,7 +32,7 @@
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
 import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
@@ -172,7 +172,7 @@
     */
     public void testQueueDepthAlertWithSubscribers() throws Exception
     {
-        protocolSession = new TestMinaProtocolSession();
+        protocolSession = new InternalTestProtocolSession();
         AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore);
         protocolSession.addChannel(channel);
 

Modified: 
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 (original)
+++ 
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 Fri Jun 13 08:48:54 2008
@@ -29,7 +29,7 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -122,7 +122,7 @@
         assertTrue(_queueMBean.getActiveConsumerCount() == 0);
 
 
-        TestMinaProtocolSession protocolSession = new 
TestMinaProtocolSession();
+        InternalTestProtocolSession protocolSession = new 
InternalTestProtocolSession();
         AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
         protocolSession.addChannel(channel);
 
@@ -278,7 +278,7 @@
         _queue = new AMQQueue(new AMQShortString("testQueue"), false, new 
AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
         _queueMBean = new AMQQueueMBean(_queue);
 
-        _protocolSession = new TestMinaProtocolSession();
+        _protocolSession = new InternalTestProtocolSession();
     }
 
     private void sendMessages(int messageCount, boolean persistent) throws 
AMQException

Modified: 
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
 (original)
+++ 
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
 Fri Jun 13 08:48:54 2008
@@ -28,7 +28,6 @@
 import org.apache.qpid.server.store.SkeletonMessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
 import org.apache.qpid.server.util.NullApplicationRegistry;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;


Reply via email to