Author: rgreig
Date: Sat Sep 22 15:05:30 2007
New Revision: 578509

URL: http://svn.apache.org/viewvc?rev=578509&view=rev
Log:
QPID-609 : dispatcher thread was being restarted by the code that closed the 
consumer due to the receipt of a basic.cancel frame. Move the dispatcher 
shutdown to the end of the consumer close process. Also rename the dispatcher 
_closed field since it clashes with a field in the container class.

Modified:
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=578509&r1=578508&r2=578509&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Sat Sep 22 15:05:30 2007
@@ -673,11 +673,10 @@
                 else
                 {
                     _logger.info("Dispatcher is null so created stopped 
dispatcher");
-
                     startDistpatcherIfNecessary(true);
                 }
 
-                _dispatcher.rejectPending(consumer);
+                _dispatcher.rejectPending(consumer);                
             }
             else
             {
@@ -1954,11 +1953,6 @@
      */
     private void closeConsumers(Throwable error) throws JMSException
     {
-        if (_dispatcher != null)
-        {
-            _dispatcher.close();
-            _dispatcher = null;
-        }
         // we need to clone the list of consumers since the close() method 
updates the _consumers collection
         // which would result in a concurrent modification exception
         final ArrayList<BasicMessageConsumer> clonedConsumers = new 
ArrayList<BasicMessageConsumer>(_consumers.values());
@@ -1977,6 +1971,11 @@
             }
         }
         // at this point the _consumers map will be empty
+        if (_dispatcher != null)
+        {
+            _dispatcher.close();
+            _dispatcher = null;
+        }
     }
 
     /**
@@ -2567,7 +2566,7 @@
     {
 
         /** Track the 'stopped' state of the dispatcher, a session starts in 
the stopped state. */
-        private final AtomicBoolean _closed = new AtomicBoolean(false);
+        private final AtomicBoolean _dispatcherClosed = new 
AtomicBoolean(false);
 
         private final Object _lock = new Object();
         private final AtomicLong _rollbackMark = new AtomicLong(-1);
@@ -2583,7 +2582,7 @@
 
         public void close()
         {
-            _closed.set(true);
+            _dispatcherClosed.set(true);
             interrupt();
 
             // fixme awaitTermination
@@ -2678,7 +2677,7 @@
 
             try
             {
-                while (!_closed.get())
+                while (!_dispatcherClosed.get())
                 {
                     message = (UnprocessedMessage) _queue.poll(1000, 
TimeUnit.MILLISECONDS);
                     if (message != null)
@@ -2768,7 +2767,7 @@
                         }
                     }
                     // Don't reject if we're already closing
-                    if (!_closed.get())
+                    if (!_dispatcherClosed.get())
                     {
                         rejectMessage(message, true);
                     }


Reply via email to