Author: ritchiem
Date: Fri Mar 21 04:12:29 2008
New Revision: 639598

URL: http://svn.apache.org/viewvc?rev=639598&view=rev
Log:
QPID-866 : Based on Patch from ASkinner. Only the FailoverException makes sence 
to process this way so remove list and synchronized so we either do an add or 
throw the set FailoverException.

Modified:
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=639598&r1=639597&r2=639598&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
 Fri Mar 21 04:12:29 2008
@@ -120,13 +120,17 @@
         // We wake up listeners. If they can handle failover, they will extend 
the
         // FailoverRetrySupport class and will in turn block on the latch 
until failover
         // has completed before retrying the operation.
-        _amqProtocolHandler.propagateExceptionToWaiters(new 
FailoverException("Failing over about to start"));
+        _amqProtocolHandler.notifyFailoverStarting();
 
         // Since failover impacts several structures we protect them all with 
a single mutex. These structures
         // are also in child objects of the connection. This allows us to 
manipulate them without affecting
         // client code which runs in a separate thread.
         synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
         {
+            //Clear the exception now that we have the failover mutex there 
can be no one else waiting for a frame so
+            // we can clear the exception.
+            _amqProtocolHandler.failoverInProgress();
+
             // We switch in a new state manager temporarily so that the 
interaction to get to the "connection open"
             // state works, without us having to terminate any existing "state 
waiters". We could theoretically
             // have a state waiter waiting until the connection is closed for 
some reason. Or in future we may have

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=639598&r1=639597&r2=639598&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 Fri Mar 21 04:12:29 2008
@@ -153,6 +153,10 @@
     /** Used to provide a condition to wait upon for operations that are 
required to wait for failover to complete. */
     private CountDownLatch _failoverLatch;
 
+
+    /** The last failover exception that occured */
+    private FailoverException _lastFailoverException;
+
     /** Defines the default timeout to use for synchronous protocol commands. 
*/
     private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
 
@@ -419,6 +423,24 @@
         }
     }
 
+    public void notifyFailoverStarting()
+    {
+        // Set the last exception in the sync block to ensure the ordering 
with add.
+        // either this gets done and the add does the ml.error
+        // or the add completes first and the iterator below will do ml.error
+        synchronized (_frameListeners)
+        {
+            _lastFailoverException = new FailoverException("Failing over about 
to start");
+        }
+
+        propagateExceptionToWaiters(_lastFailoverException);
+    }
+
+    public void failoverInProgress()
+    {
+        _lastFailoverException = null;
+    }
+
     private static int _messageReceivedCount;
 
     public void messageReceived(IoSession session, Object message) throws 
Exception
@@ -471,11 +493,13 @@
                 new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) 
bodyFrame);
 
         try
-                    {
+        {
 
-                        boolean wasAnyoneInterested = 
getStateManager().methodReceived(evt);
+            boolean wasAnyoneInterested = 
getStateManager().methodReceived(evt);
             if (!_frameListeners.isEmpty())
             {
+                //This iterator is safe from the error state as the frame 
listeners always add before they send so their
+                // will be ready and waiting for this response.
                 Iterator it = _frameListeners.iterator();
                 while (it.hasNext())
                 {
@@ -592,7 +616,15 @@
     {
         try
         {
-            _frameListeners.add(listener);
+            synchronized (_frameListeners)
+            {
+                if (_lastFailoverException != null)
+                {
+                    throw _lastFailoverException;
+                }
+                
+                _frameListeners.add(listener);
+            }
             _protocolSession.writeFrame(frame);
 
             AMQMethodEvent e = listener.blockForFrame(timeout);
@@ -600,10 +632,6 @@
             return e;
             // When control resumes before this line, a reply will have been 
received
             // that matches the criteria defined in the blocking listener
-        }
-        catch (AMQException e)
-        {
-            throw e;
         }
         finally
         {


Reply via email to