Author: ritchiem
Date: Thu Jan 11 17:23:43 2007
New Revision: 495460

URL: http://svn.apache.org/viewvc?view=rev&rev=495460
Log:
QPID-276
Update to AMQChannel to remove race condition over UnacknowledgedMessageMap

Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=495460&r1=495459&r2=495460
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Thu Jan 11 17:23:43 2007
@@ -275,7 +275,7 @@
      * @throws AMQException                  if something goes wrong
      */
     public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, 
AMQProtocolSession session, boolean acks,
-                                   FieldTable filters, boolean noLocal) throws 
AMQException, ConsumerTagNotUniqueException
+                                           FieldTable filters, boolean 
noLocal) throws AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
@@ -326,20 +326,24 @@
     /**
      * Add a message to the channel-based list of unacknowledged messages
      *
-     * @param message the message that was delivered
+     * @param message     the message that was delivered
      * @param deliveryTag the delivery tag used when delivering the message 
(see protocol spec for description of
-     * the delivery tag)
-     * @param queue the queue from which the message was delivered
+     *                    the delivery tag)
+     * @param queue       the queue from which the message was delivered
      */
     public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, 
AMQShortString consumerTag, AMQQueue queue)
     {
-        _unacknowledgedMessageMap.add(deliveryTag, new 
UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
-        checkSuspension();
+        synchronized (_unacknowledgedMessageMap.getLock())
+        {
+            _unacknowledgedMessageMap.add(deliveryTag, new 
UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+            checkSuspension();
+        }
     }
 
     /**
      * Called to attempt re-enqueue all outstanding unacknowledged messages on 
the channel.
      * May result in delivery to this same channel or to other subscribers.
+     *
      * @throws org.apache.qpid.AMQException if the requeue fails
      */
     public void requeue() throws AMQException
@@ -427,8 +431,11 @@
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple) throws 
AMQException
     {
-        _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, 
_txnContext);
-        checkSuspension();
+        synchronized (_unacknowledgedMessageMap.getLock())
+        {
+            _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, 
multiple, _txnContext);
+            checkSuspension();
+        }
     }
 
     /**
@@ -450,6 +457,7 @@
     private void checkSuspension()
     {
         boolean suspend;
+        
         suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
 
         setSuspended(suspend);

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?view=diff&rev=495460&r1=495459&r2=495460
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
 Thu Jan 11 17:23:43 2007
@@ -43,6 +43,8 @@
 
     void visit(Visitor visitor) throws AMQException;
 
+    Object getLock();
+
     void add(long deliveryTag, UnacknowledgedMessage message);
 
     void collect(long deliveryTag, boolean multiple, 
List<UnacknowledgedMessage> msgs);
@@ -67,6 +69,7 @@
 
     /**
      * Get the set of delivery tags that are outstanding.
+     *
      * @return a set of delivery tags
      */
     Set<Long> getDeliveryTags();

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?view=diff&rev=495460&r1=495459&r2=495460
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 Thu Jan 11 17:23:43 2007
@@ -75,7 +75,7 @@
     {
         synchronized (_lock)
         {
-            for(UnacknowledgedMessage msg : msgs)
+            for (UnacknowledgedMessage msg : msgs)
             {
                 _map.remove(msg.deliveryTag);
             }
@@ -95,7 +95,7 @@
         synchronized (_lock)
         {
             Collection<UnacknowledgedMessage> currentEntries = _map.values();
-            for (UnacknowledgedMessage msg: currentEntries)
+            for (UnacknowledgedMessage msg : currentEntries)
             {
                 visitor.callback(msg);
             }
@@ -103,9 +103,14 @@
         }
     }
 
+    public Object getLock()
+    {
+        return _lock;
+    }
+
     public void add(long deliveryTag, UnacknowledgedMessage message)
     {
-        synchronized( _lock)
+        synchronized (_lock)
         {
             _map.put(deliveryTag, message);
             _lastDeliveryTag = deliveryTag;
@@ -209,7 +214,7 @@
     {
         synchronized (_lock)
         {
-            for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+            for (Map.Entry<Long, UnacknowledgedMessage> entry : 
_map.entrySet())
             {
                 msgs.add(entry.getValue());
                 if (entry.getKey() == key)

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=495460&r1=495459&r2=495460
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 Thu Jan 11 17:23:43 2007
@@ -134,6 +134,7 @@
     {
         _logger.info("Protocol Session closed");
         final AMQProtocolSession amqProtocolSession = 
AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
+        //fixme  -- this can be null
         amqProtocolSession.closeSession();
     }
 


Reply via email to