Author: rgodfrey
Date: Fri Aug 10 06:52:48 2007
New Revision: 564593

URL: http://svn.apache.org/viewvc?view=rev&rev=564593
Log:
QPID-547 : Stop session closure while in message dispatch

Modified:
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=564593&r1=564592&r2=564593
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Fri Aug 10 06:52:48 2007
@@ -291,6 +291,7 @@
 
     /** Indicates that runtime exceptions should be generated on vilations of 
the strict AMQP. */
     private final boolean _strictAMQPFATAL;
+    private final Object _messageDeliveryLock = new Object();
 
     /**
      * Creates a new session on a connection.
@@ -512,6 +513,9 @@
                 + 
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
         }
 
+        synchronized(_messageDeliveryLock)
+        {
+
         // We must close down all producers and consumers in an orderly 
fashion. This is the only method
         // that can be called from a different thread of control from the one 
controlling the session.
         synchronized (_connection.getFailoverMutex())
@@ -558,6 +562,7 @@
                 }
             }
         }
+        }
     }
 
     /**
@@ -567,6 +572,8 @@
      */
     public void closed(Throwable e) throws JMSException
     {
+        synchronized(_messageDeliveryLock)
+        {
         synchronized (_connection.getFailoverMutex())
         {
             // An AMQException has an error code and message already and will 
be passed in when closure occurs as a
@@ -585,6 +592,7 @@
             _connection.deregisterSession(_channelId);
             closeProducersAndConsumers(amqe);
         }
+        }
     }
 
     /**
@@ -2662,7 +2670,10 @@
                             _lock.wait();
                         }
 
-                        dispatchMessage(message);
+                        synchronized(_messageDeliveryLock)
+                        {
+                            dispatchMessage(message);
+                        }
 
                         while (connectionStopped())
                         {


Reply via email to