Author: rgodfrey
Date: Tue Jul  1 04:19:20 2008
New Revision: 673058

URL: http://svn.apache.org/viewvc?rev=673058&view=rev
Log:
QPID-1084 : Applying patch previously applied to M2.x

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=673058&r1=673057&r2=673058&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Jul  1 04:19:20 2008
@@ -445,21 +445,25 @@
                     new FlowControllingBlockingQueue(_defaultPrefetchHighMark, 
_defaultPrefetchLowMark,
                                                      new 
FlowControllingBlockingQueue.ThresholdListener()
                                                      {
+                                                         private final 
AtomicBoolean _suspendState = new AtomicBoolean();
+
                                                          public void 
aboveThreshold(int currentValue)
                                                          {
-                                                                 _logger.debug(
-                                                                         
"Above threshold(" + _defaultPrefetchHighMark
-                                                                         + ") 
so suspending channel. Current value is " + currentValue);
-                                                                 new 
Thread(new SuspenderRunner(true)).start();
+                                                             _logger.debug(
+                                                                 "Above 
threshold(" + _defaultPrefetchHighMark
+                                                                 + ") so 
suspending channel. Current value is " + currentValue);
+                                                             
_suspendState.set(true);
+                                                             new Thread(new 
SuspenderRunner(_suspendState)).start();
 
                                                          }
 
                                                          public void 
underThreshold(int currentValue)
                                                          {
-                                                                 _logger.debug(
+                                                             _logger.debug(
                                                                          
"Below threshold(" + _defaultPrefetchLowMark
                                                                          + ") 
so unsuspending channel. Current value is " + currentValue);
-                                                                 new 
Thread(new SuspenderRunner(false)).start();
+                                                             
_suspendState.set(false);
+                                                             new Thread(new 
SuspenderRunner(_suspendState)).start();
 
                                                          }
                                                      });
@@ -2915,9 +2919,9 @@
 
     private class SuspenderRunner implements Runnable
     {
-        private boolean _suspend;
+        private AtomicBoolean _suspend;
 
-        public SuspenderRunner(boolean suspend)
+        public SuspenderRunner(AtomicBoolean suspend)
         {
             _suspend = suspend;
         }
@@ -2926,7 +2930,10 @@
         {
             try
             {
-                suspendChannel(_suspend);
+                synchronized(_suspensionLock)
+                {
+                    suspendChannel(_suspend.get());
+                }
             }
             catch (AMQException e)
             {


Reply via email to