Author: rhs
Date: Tue Apr 29 15:28:31 2008
New Revision: 652173

URL: http://svn.apache.org/viewvc?rev=652173&view=rev
Log:
QPID-983: fixed deadlock between AMQConnection.close and FailoverHandler

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=652173&r1=652172&r2=652173&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Tue Apr 29 15:28:31 2008
@@ -914,6 +914,14 @@
 
     public void close(List<AMQSession> sessions, long timeout) throws 
JMSException
     {
+        if (!_closed.getAndSet(true))
+        {
+            doClose(sessions, timeout);
+        }
+    }
+
+    private void doClose(List<AMQSession> sessions, long timeout) throws 
JMSException
+    {
         synchronized(_sessionCreationLock)
         {
             if(!sessions.isEmpty())
@@ -921,18 +929,16 @@
                 AMQSession session = sessions.remove(0);
                 synchronized(session.getMessageDeliveryLock())
                 {
-                    close(sessions, timeout);
+                    doClose(sessions, timeout);
                 }
             }
             else
             {
-                if (!_closed.getAndSet(true))
+                synchronized (getFailoverMutex())
                 {
-                    synchronized (getFailoverMutex())
+                    try
                     {
-                        try
-                        {
-                            long startCloseTime = System.currentTimeMillis();
+                        long startCloseTime = System.currentTimeMillis();
 
                         closeAllSessions(null, timeout, startCloseTime);
 
@@ -941,41 +947,40 @@
 
                         if (!_taskPool.isTerminated())
                         {
-                                try
-                                {
-                                    // adjust timeout
-                                    long taskPoolTimeout = 
adjustTimeout(timeout, startCloseTime);
-
-                                    
_taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
-                                }
-                                catch (InterruptedException e)
-                                {
-                                    _logger.info("Interrupted while shutting 
down connection thread pool.");
-                                }
-                            }
+                            try
+                            {
+                                // adjust timeout
+                                long taskPoolTimeout = adjustTimeout(timeout, 
startCloseTime);
 
-                            // adjust timeout
-                            timeout = adjustTimeout(timeout, startCloseTime);
-                            _delegate.closeConneciton(timeout);
-
-                            //If the taskpool hasn't shutdown by now then give 
it shutdownNow.
-                            // This will interupt any running tasks.
-                            if (!_taskPool.isTerminated())
+                                _taskPool.awaitTermination(taskPoolTimeout, 
TimeUnit.MILLISECONDS);
+                            }
+                            catch (InterruptedException e)
                             {
-                                List<Runnable> tasks = _taskPool.shutdownNow();
-                                for (Runnable r : tasks)
-                                {
-                                    _logger.warn("Connection close forced 
taskpool to prevent execution:" + r);
-                                }
+                                _logger.info("Interrupted while shutting down 
connection thread pool.");
                             }
                         }
-                        catch (AMQException e)
+
+                        // adjust timeout
+                        timeout = adjustTimeout(timeout, startCloseTime);
+                        _delegate.closeConneciton(timeout);
+
+                        //If the taskpool hasn't shutdown by now then give it 
shutdownNow.
+                        // This will interupt any running tasks.
+                        if (!_taskPool.isTerminated())
                         {
-                            JMSException jmse = new JMSException("Error 
closing connection: " + e);
-                            jmse.setLinkedException(e);
-                            throw jmse;
+                            List<Runnable> tasks = _taskPool.shutdownNow();
+                            for (Runnable r : tasks)
+                            {
+                                _logger.warn("Connection close forced taskpool 
to prevent execution:" + r);
+                            }
                         }
                     }
+                    catch (AMQException e)
+                    {
+                        JMSException jmse = new JMSException("Error closing 
connection: " + e);
+                        jmse.setLinkedException(e);
+                        throw jmse;
+                    }
                 }
             }
         }
@@ -1294,12 +1299,14 @@
             }
         }
 
+        boolean closer = false;
+
         // in the case of an IOException, MINA has closed the protocol session 
so we set _closed to true
         // so that any generic client code that tries to close the connection 
will not mess up this error
         // handling sequence
         if (cause instanceof IOException)
         {
-            _closed.set(true);
+            closer = !_closed.getAndSet(true);
         }
 
         if (_exceptionListener != null)
@@ -1320,8 +1327,11 @@
                     _logger.info("Closing AMQConnection due to :" + 
cause.getMessage());
                 }
 
-                _closed.set(true);
-                closeAllSessions(cause, -1, -1); // FIXME: when doing this end 
up with RejectedExecutionException from executor.
+                closer = (!_closed.getAndSet(true)) || closer;
+                if (closer)
+                {
+                    closeAllSessions(cause, -1, -1); // FIXME: when doing this 
end up with RejectedExecutionException from executor.
+                }
             }
             catch (JMSException e)
             {


Reply via email to