Author: ritchiem
Date: Fri Oct  5 03:39:54 2007
New Revision: 582201

URL: http://svn.apache.org/viewvc?rev=582201&view=rev
Log:
QPID-624: Update to ensure all errors are correctly processed in 
BlockingMethodFrameListener.java

Modified:
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?rev=582201&r1=582200&r2=582201&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
 Fri Oct  5 03:39:54 2007
@@ -78,14 +78,22 @@
     /** This flag is used to indicate that the blocked for method has been 
received. */
     private volatile boolean _ready = false;
 
+    /** This flag is used to indicate that the received error has been 
processed. */
+    private volatile boolean _errorAck = false;
+
     /** Used to protect the shared event and ready flag between the producer 
and consumer. */
     private final ReentrantLock _lock = new ReentrantLock();
-        
+
     /**
      * Used to signal that a method has been received
      */
     private final Condition _receivedCondition = _lock.newCondition();
 
+    /**
+      * Used to signal that a error has been processed
+      */
+    private final Condition _errorConditionAck = _lock.newCondition();
+
     /** Used to hold the most recent exception that is passed to the [EMAIL 
PROTECTED] #error(Exception)} method. */
     private volatile Exception _error;
 
@@ -142,7 +150,7 @@
                 _ready = ready;
                 _receivedCondition.signal();
             }
-            finally 
+            finally
             {
                 _lock.unlock();
             }
@@ -174,13 +182,15 @@
     public AMQMethodEvent blockForFrame(long timeout) throws AMQException, 
FailoverException
     {
         long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);
-        
+
         _lock.lock();
+
         try
         {
             while (!_ready)
             {
-                try {
+                try
+                {
                     if (timeout == -1)
                     {
                         _receivedCondition.await();
@@ -195,7 +205,7 @@
                             _ready = true;
                         }
                     }
-                } 
+                }
                 catch (InterruptedException e)
                 {
                     // IGNORE    -- //fixme this isn't ideal as being 
interrupted isn't equivellant to sucess
@@ -206,29 +216,34 @@
                     // }
                 }
             }
+
+
+            if (_error != null)
+            {
+                if (_error instanceof AMQException)
+                {
+                    throw (AMQException) _error;
+                }
+                else if (_error instanceof FailoverException)
+                {
+                    // This should ensure that FailoverException is not 
wrapped and can be caught.
+                    throw (FailoverException) _error; // needed to expose 
FailoverException.
+                }
+                else
+                {
+                    throw new AMQException("Woken up due to " + 
_error.getClass(), _error);
+                }
+            }
+
         }
         finally
         {
+            _errorAck = true;
+            _errorConditionAck.signal();
+            _error = null;
             _lock.unlock();
         }
 
-        if (_error != null)
-        {
-            if (_error instanceof AMQException)
-            {
-                throw (AMQException) _error;
-            }
-            else if (_error instanceof FailoverException)
-            {
-                // This should ensure that FailoverException is not wrapped 
and can be caught.
-                throw (FailoverException) _error; // needed to expose 
FailoverException.
-            }
-            else
-            {
-                throw new AMQException("Woken up due to " + _error.getClass(), 
_error);
-            }
-        }
-
         return _doneEvt;
     }
 
@@ -242,13 +257,36 @@
     {
         // set the error so that the thread that is blocking (against 
blockForFrame())
         // can pick up the exception and rethrow to the caller
-        _error = e;
+
 
         _lock.lock();
+
+        if (_error == null)
+        {
+            _error = e;
+        }
+        else
+        {
+            System.err.println("WARNING: new error arrived while old one not 
yet processed");
+        }
+
         try
         {
             _ready = true;
             _receivedCondition.signal();
+
+            while (!_errorAck)
+            {
+                try
+                {
+                    _errorConditionAck.await();
+                }
+                catch (InterruptedException e1)
+                {
+                    //
+                }
+            }
+            _errorAck = false;
         }
         finally
         {


Reply via email to