Author: rgodfrey
Date: Wed May 21 03:08:41 2008
New Revision: 658614
URL: http://svn.apache.org/viewvc?rev=658614&view=rev
Log:
QPID-1084 : Fix AMQSession race condition on no-ack flow control
Modified:
incubator/qpid/branches/M2.x/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified:
incubator/qpid/branches/M2.x/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=658614&r1=658613&r2=658614&view=diff
==============================================================================
---
incubator/qpid/branches/M2.x/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/M2.x/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Wed May 21 03:08:41 2008
@@ -468,12 +468,15 @@
new FlowControllingBlockingQueue(_defaultPrefetchHighMark,
_defaultPrefetchLowMark,
new
FlowControllingBlockingQueue.ThresholdListener()
{
+ private final
AtomicBoolean _suspendState = new AtomicBoolean();
+
public void
aboveThreshold(int currentValue)
{
- _logger.debug(
-
"Above threshold(" + _defaultPrefetchHighMark
- + ")
so suspending channel. Current value is " + currentValue);
- new
Thread(new SuspenderRunner(true)).start();
+ _logger.debug(
+ "Above
threshold(" + _defaultPrefetchHighMark
+ + ") so
suspending channel. Current value is " + currentValue);
+
_suspendState.set(true);
+ new
Thread(new SuspenderRunner(_suspendState)).start();
}
@@ -482,7 +485,8 @@
_logger.debug(
"Below threshold(" + _defaultPrefetchLowMark
+ ")
so unsuspending channel. Current value is " + currentValue);
- new
Thread(new SuspenderRunner(false)).start();
+
_suspendState.set(false);
+ new
Thread(new SuspenderRunner(_suspendState)).start();
}
});
@@ -3106,9 +3110,9 @@
private class SuspenderRunner implements Runnable
{
- private boolean _suspend;
+ private AtomicBoolean _suspend;
- public SuspenderRunner(boolean suspend)
+ public SuspenderRunner(AtomicBoolean suspend)
{
_suspend = suspend;
}
@@ -3117,7 +3121,10 @@
{
try
{
- suspendChannel(_suspend);
+ synchronized(_suspensionLock)
+ {
+ suspendChannel(_suspend.get());
+ }
}
catch (AMQException e)
{