Author: ritchiem
Date: Fri Oct 5 03:39:54 2007
New Revision: 582201
URL: http://svn.apache.org/viewvc?rev=582201&view=rev
Log:
QPID-624: Update to ensure all errors are correctly processed in
BlockingMethodFrameListener.java
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?rev=582201&r1=582200&r2=582201&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
Fri Oct 5 03:39:54 2007
@@ -78,14 +78,22 @@
/** This flag is used to indicate that the blocked for method has been
received. */
private volatile boolean _ready = false;
+ /** This flag is used to indicate that the received error has been
processed. */
+ private volatile boolean _errorAck = false;
+
/** Used to protect the shared event and ready flag between the producer
and consumer. */
private final ReentrantLock _lock = new ReentrantLock();
-
+
/**
* Used to signal that a method has been received
*/
private final Condition _receivedCondition = _lock.newCondition();
+ /**
+ * Used to signal that a error has been processed
+ */
+ private final Condition _errorConditionAck = _lock.newCondition();
+
/** Used to hold the most recent exception that is passed to the [EMAIL
PROTECTED] #error(Exception)} method. */
private volatile Exception _error;
@@ -142,7 +150,7 @@
_ready = ready;
_receivedCondition.signal();
}
- finally
+ finally
{
_lock.unlock();
}
@@ -174,13 +182,15 @@
public AMQMethodEvent blockForFrame(long timeout) throws AMQException,
FailoverException
{
long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);
-
+
_lock.lock();
+
try
{
while (!_ready)
{
- try {
+ try
+ {
if (timeout == -1)
{
_receivedCondition.await();
@@ -195,7 +205,7 @@
_ready = true;
}
}
- }
+ }
catch (InterruptedException e)
{
// IGNORE -- //fixme this isn't ideal as being
interrupted isn't equivellant to sucess
@@ -206,29 +216,34 @@
// }
}
}
+
+
+ if (_error != null)
+ {
+ if (_error instanceof AMQException)
+ {
+ throw (AMQException) _error;
+ }
+ else if (_error instanceof FailoverException)
+ {
+ // This should ensure that FailoverException is not
wrapped and can be caught.
+ throw (FailoverException) _error; // needed to expose
FailoverException.
+ }
+ else
+ {
+ throw new AMQException("Woken up due to " +
_error.getClass(), _error);
+ }
+ }
+
}
finally
{
+ _errorAck = true;
+ _errorConditionAck.signal();
+ _error = null;
_lock.unlock();
}
- if (_error != null)
- {
- if (_error instanceof AMQException)
- {
- throw (AMQException) _error;
- }
- else if (_error instanceof FailoverException)
- {
- // This should ensure that FailoverException is not wrapped
and can be caught.
- throw (FailoverException) _error; // needed to expose
FailoverException.
- }
- else
- {
- throw new AMQException("Woken up due to " + _error.getClass(),
_error);
- }
- }
-
return _doneEvt;
}
@@ -242,13 +257,36 @@
{
// set the error so that the thread that is blocking (against
blockForFrame())
// can pick up the exception and rethrow to the caller
- _error = e;
+
_lock.lock();
+
+ if (_error == null)
+ {
+ _error = e;
+ }
+ else
+ {
+ System.err.println("WARNING: new error arrived while old one not
yet processed");
+ }
+
try
{
_ready = true;
_receivedCondition.signal();
+
+ while (!_errorAck)
+ {
+ try
+ {
+ _errorConditionAck.await();
+ }
+ catch (InterruptedException e1)
+ {
+ //
+ }
+ }
+ _errorAck = false;
}
finally
{