Author: rhs
Date: Tue Apr 29 15:28:31 2008
New Revision: 652173
URL: http://svn.apache.org/viewvc?rev=652173&view=rev
Log:
QPID-983: fixed deadlock between AMQConnection.close and FailoverHandler
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=652173&r1=652172&r2=652173&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Tue Apr 29 15:28:31 2008
@@ -914,6 +914,14 @@
public void close(List<AMQSession> sessions, long timeout) throws
JMSException
{
+ if (!_closed.getAndSet(true))
+ {
+ doClose(sessions, timeout);
+ }
+ }
+
+ private void doClose(List<AMQSession> sessions, long timeout) throws
JMSException
+ {
synchronized(_sessionCreationLock)
{
if(!sessions.isEmpty())
@@ -921,18 +929,16 @@
AMQSession session = sessions.remove(0);
synchronized(session.getMessageDeliveryLock())
{
- close(sessions, timeout);
+ doClose(sessions, timeout);
}
}
else
{
- if (!_closed.getAndSet(true))
+ synchronized (getFailoverMutex())
{
- synchronized (getFailoverMutex())
+ try
{
- try
- {
- long startCloseTime = System.currentTimeMillis();
+ long startCloseTime = System.currentTimeMillis();
closeAllSessions(null, timeout, startCloseTime);
@@ -941,41 +947,40 @@
if (!_taskPool.isTerminated())
{
- try
- {
- // adjust timeout
- long taskPoolTimeout =
adjustTimeout(timeout, startCloseTime);
-
-
_taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- _logger.info("Interrupted while shutting
down connection thread pool.");
- }
- }
+ try
+ {
+ // adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout,
startCloseTime);
- // adjust timeout
- timeout = adjustTimeout(timeout, startCloseTime);
- _delegate.closeConneciton(timeout);
-
- //If the taskpool hasn't shutdown by now then give
it shutdownNow.
- // This will interupt any running tasks.
- if (!_taskPool.isTerminated())
+ _taskPool.awaitTermination(taskPoolTimeout,
TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
{
- List<Runnable> tasks = _taskPool.shutdownNow();
- for (Runnable r : tasks)
- {
- _logger.warn("Connection close forced
taskpool to prevent execution:" + r);
- }
+ _logger.info("Interrupted while shutting down
connection thread pool.");
}
}
- catch (AMQException e)
+
+ // adjust timeout
+ timeout = adjustTimeout(timeout, startCloseTime);
+ _delegate.closeConneciton(timeout);
+
+ //If the taskpool hasn't shutdown by now then give it
shutdownNow.
+ // This will interupt any running tasks.
+ if (!_taskPool.isTerminated())
{
- JMSException jmse = new JMSException("Error
closing connection: " + e);
- jmse.setLinkedException(e);
- throw jmse;
+ List<Runnable> tasks = _taskPool.shutdownNow();
+ for (Runnable r : tasks)
+ {
+ _logger.warn("Connection close forced taskpool
to prevent execution:" + r);
+ }
}
}
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error closing
connection: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
}
}
}
@@ -1294,12 +1299,14 @@
}
}
+ boolean closer = false;
+
// in the case of an IOException, MINA has closed the protocol session
so we set _closed to true
// so that any generic client code that tries to close the connection
will not mess up this error
// handling sequence
if (cause instanceof IOException)
{
- _closed.set(true);
+ closer = !_closed.getAndSet(true);
}
if (_exceptionListener != null)
@@ -1320,8 +1327,11 @@
_logger.info("Closing AMQConnection due to :" +
cause.getMessage());
}
- _closed.set(true);
- closeAllSessions(cause, -1, -1); // FIXME: when doing this end
up with RejectedExecutionException from executor.
+ closer = (!_closed.getAndSet(true)) || closer;
+ if (closer)
+ {
+ closeAllSessions(cause, -1, -1); // FIXME: when doing this
end up with RejectedExecutionException from executor.
+ }
}
catch (JMSException e)
{