Author: ritchiem
Date: Fri Mar 14 03:51:40 2008
New Revision: 637048

URL: http://svn.apache.org/viewvc?rev=637048&view=rev
Log:
QPID-851 : Update to AMQChannel to prevent the memory leak when an autoclose 
consumer closes in a second thread before the register gets a chance to add the 
new session to the map.

Modified:
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=637048&r1=637047&r2=637048&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Fri Mar 14 03:51:40 2008
@@ -33,7 +33,7 @@
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.exchange.NoRouteException;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -43,17 +43,15 @@
 import org.apache.qpid.server.txn.LocalTransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.configuration.Configurator;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 public class AMQChannel
 {
@@ -91,7 +89,7 @@
     private AMQMessage _currentMessage;
 
     /** Maps from consumer tag to queue instance. Allows us to unsubscribe 
from a queue. */
-    private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new 
HashMap<AMQShortString, AMQQueue>();
+    private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new 
ConcurrentHashMap<AMQShortString, AMQQueue>();
 
     private final MessageStore _messageStore;
 
@@ -333,9 +331,21 @@
             throw new ConsumerTagNotUniqueException();
         }
 
-        queue.registerProtocolSession(session, _channelId, tag, acks, filters, 
noLocal, exclusive);
+        // We add before we register as the Async Delivery process may 
AutoClose the subscriber
+        // so calling _cT2QM.remove before we have done put which was after 
the register succeeded.
+        // So to keep things straight we put before the call and catch all 
exceptions from the register and tidy up.
         _consumerTag2QueueMap.put(tag, queue);
 
+        try
+        {
+            queue.registerProtocolSession(session, _channelId, tag, acks, 
filters, noLocal, exclusive);
+        }
+        catch (AMQException e)
+        {
+            _consumerTag2QueueMap.remove(tag);
+            throw e;
+        }
+
         return tag;
     }
 
@@ -822,7 +832,7 @@
                     {
                         message.discard(_storeContext);
                         message.setQueueDeleted(true);
-                        
+
                     }
                     catch (AMQException e)
                     {
@@ -967,7 +977,7 @@
 
     public void processReturns(AMQProtocolSession session) throws AMQException
     {
-        if(!_returnMessages.isEmpty())
+        if (!_returnMessages.isEmpty())
         {
             for (RequiredDeliveryException bouncedMessage : _returnMessages)
             {


Reply via email to