Author: ritchiem
Date: Fri Mar 14 03:51:40 2008
New Revision: 637048
URL: http://svn.apache.org/viewvc?rev=637048&view=rev
Log:
QPID-851 : Update to AMQChannel to prevent the memory leak when an autoclose
consumer closes in a second thread before the register gets a chance to add the
new session to the map.
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=637048&r1=637047&r2=637048&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Fri Mar 14 03:51:40 2008
@@ -33,7 +33,7 @@
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -43,17 +43,15 @@
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.configuration.Configurator;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
public class AMQChannel
{
@@ -91,7 +89,7 @@
private AMQMessage _currentMessage;
/** Maps from consumer tag to queue instance. Allows us to unsubscribe
from a queue. */
- private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new
HashMap<AMQShortString, AMQQueue>();
+ private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new
ConcurrentHashMap<AMQShortString, AMQQueue>();
private final MessageStore _messageStore;
@@ -333,9 +331,21 @@
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks, filters,
noLocal, exclusive);
+ // We add before we register as the Async Delivery process may
AutoClose the subscriber
+ // so calling _cT2QM.remove before we have done put which was after
the register succeeded.
+ // So to keep things straight we put before the call and catch all
exceptions from the register and tidy up.
_consumerTag2QueueMap.put(tag, queue);
+ try
+ {
+ queue.registerProtocolSession(session, _channelId, tag, acks,
filters, noLocal, exclusive);
+ }
+ catch (AMQException e)
+ {
+ _consumerTag2QueueMap.remove(tag);
+ throw e;
+ }
+
return tag;
}
@@ -822,7 +832,7 @@
{
message.discard(_storeContext);
message.setQueueDeleted(true);
-
+
}
catch (AMQException e)
{
@@ -967,7 +977,7 @@
public void processReturns(AMQProtocolSession session) throws AMQException
{
- if(!_returnMessages.isEmpty())
+ if (!_returnMessages.isEmpty())
{
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{