Author: rgreig
Date: Sat Sep 22 15:05:30 2007
New Revision: 578509
URL: http://svn.apache.org/viewvc?rev=578509&view=rev
Log:
QPID-609 : dispatcher thread was being restarted by the code that closed the
consumer due to the receipt of a basic.cancel frame. Move the dispatcher
shutdown to the end of the consumer close process. Also rename the dispatcher
_closed field since it clashes with a field in the container class.
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=578509&r1=578508&r2=578509&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Sat Sep 22 15:05:30 2007
@@ -673,11 +673,10 @@
else
{
_logger.info("Dispatcher is null so created stopped
dispatcher");
-
startDistpatcherIfNecessary(true);
}
- _dispatcher.rejectPending(consumer);
+ _dispatcher.rejectPending(consumer);
}
else
{
@@ -1954,11 +1953,6 @@
*/
private void closeConsumers(Throwable error) throws JMSException
{
- if (_dispatcher != null)
- {
- _dispatcher.close();
- _dispatcher = null;
- }
// we need to clone the list of consumers since the close() method
updates the _consumers collection
// which would result in a concurrent modification exception
final ArrayList<BasicMessageConsumer> clonedConsumers = new
ArrayList<BasicMessageConsumer>(_consumers.values());
@@ -1977,6 +1971,11 @@
}
}
// at this point the _consumers map will be empty
+ if (_dispatcher != null)
+ {
+ _dispatcher.close();
+ _dispatcher = null;
+ }
}
/**
@@ -2567,7 +2566,7 @@
{
/** Track the 'stopped' state of the dispatcher, a session starts in
the stopped state. */
- private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final AtomicBoolean _dispatcherClosed = new
AtomicBoolean(false);
private final Object _lock = new Object();
private final AtomicLong _rollbackMark = new AtomicLong(-1);
@@ -2583,7 +2582,7 @@
public void close()
{
- _closed.set(true);
+ _dispatcherClosed.set(true);
interrupt();
// fixme awaitTermination
@@ -2678,7 +2677,7 @@
try
{
- while (!_closed.get())
+ while (!_dispatcherClosed.get())
{
message = (UnprocessedMessage) _queue.poll(1000,
TimeUnit.MILLISECONDS);
if (message != null)
@@ -2768,7 +2767,7 @@
}
}
// Don't reject if we're already closing
- if (!_closed.get())
+ if (!_dispatcherClosed.get())
{
rejectMessage(message, true);
}