Author: rgodfrey
Date: Tue Jul 1 04:19:20 2008
New Revision: 673058
URL: http://svn.apache.org/viewvc?rev=673058&view=rev
Log:
QPID-1084 : Applying patch previously applied to M2.x
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=673058&r1=673057&r2=673058&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Tue Jul 1 04:19:20 2008
@@ -445,21 +445,25 @@
new FlowControllingBlockingQueue(_defaultPrefetchHighMark,
_defaultPrefetchLowMark,
new
FlowControllingBlockingQueue.ThresholdListener()
{
+ private final
AtomicBoolean _suspendState = new AtomicBoolean();
+
public void
aboveThreshold(int currentValue)
{
- _logger.debug(
-
"Above threshold(" + _defaultPrefetchHighMark
- + ")
so suspending channel. Current value is " + currentValue);
- new
Thread(new SuspenderRunner(true)).start();
+ _logger.debug(
+ "Above
threshold(" + _defaultPrefetchHighMark
+ + ") so
suspending channel. Current value is " + currentValue);
+
_suspendState.set(true);
+ new Thread(new
SuspenderRunner(_suspendState)).start();
}
public void
underThreshold(int currentValue)
{
- _logger.debug(
+ _logger.debug(
"Below threshold(" + _defaultPrefetchLowMark
+ ")
so unsuspending channel. Current value is " + currentValue);
- new
Thread(new SuspenderRunner(false)).start();
+
_suspendState.set(false);
+ new Thread(new
SuspenderRunner(_suspendState)).start();
}
});
@@ -2915,9 +2919,9 @@
private class SuspenderRunner implements Runnable
{
- private boolean _suspend;
+ private AtomicBoolean _suspend;
- public SuspenderRunner(boolean suspend)
+ public SuspenderRunner(AtomicBoolean suspend)
{
_suspend = suspend;
}
@@ -2926,7 +2930,10 @@
{
try
{
- suspendChannel(_suspend);
+ synchronized(_suspensionLock)
+ {
+ suspendChannel(_suspend.get());
+ }
}
catch (AMQException e)
{