Author: rgodfrey
Date: Fri Aug 10 06:52:48 2007
New Revision: 564593
URL: http://svn.apache.org/viewvc?view=rev&rev=564593
Log:
QPID-547 : Stop session closure while in message dispatch
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=564593&r1=564592&r2=564593
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Fri Aug 10 06:52:48 2007
@@ -291,6 +291,7 @@
/** Indicates that runtime exceptions should be generated on vilations of
the strict AMQP. */
private final boolean _strictAMQPFATAL;
+ private final Object _messageDeliveryLock = new Object();
/**
* Creates a new session on a connection.
@@ -512,6 +513,9 @@
+
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
+ synchronized(_messageDeliveryLock)
+ {
+
// We must close down all producers and consumers in an orderly
fashion. This is the only method
// that can be called from a different thread of control from the one
controlling the session.
synchronized (_connection.getFailoverMutex())
@@ -558,6 +562,7 @@
}
}
}
+ }
}
/**
@@ -567,6 +572,8 @@
*/
public void closed(Throwable e) throws JMSException
{
+ synchronized(_messageDeliveryLock)
+ {
synchronized (_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will
be passed in when closure occurs as a
@@ -585,6 +592,7 @@
_connection.deregisterSession(_channelId);
closeProducersAndConsumers(amqe);
}
+ }
}
/**
@@ -2662,7 +2670,10 @@
_lock.wait();
}
- dispatchMessage(message);
+ synchronized(_messageDeliveryLock)
+ {
+ dispatchMessage(message);
+ }
while (connectionStopped())
{