Author: aidan
Date: Fri Oct 24 09:20:16 2008
New Revision: 707672

URL: http://svn.apache.org/viewvc?rev=707672&view=rev
Log:
QPID-1315: Fix style issue, iterator control usage as per review comments from 
rgodfrey.

Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=707672&r1=707671&r2=707672&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Fri Oct 24 09:20:16 2008
@@ -333,7 +333,8 @@
         Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
         if (sub != null)
         {
-            try {
+            try 
+            {
                 sub.getSendLock();
                 sub.getQueue().unregisterSubscription(sub);
             }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=707672&r1=707671&r2=707672&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Fri Oct 24 09:20:16 2008
@@ -1204,16 +1204,16 @@
         flushSubscription(sub, Long.MAX_VALUE);
     }
 
-    public boolean flushSubscription(Subscription sub, Long deliveries) throws 
AMQException
+    public boolean flushSubscription(Subscription sub, Long iterations) throws 
AMQException
     {
         boolean atTail = false;
 
-        while (!sub.isSuspended() && !atTail && deliveries != 0)
+        while (!sub.isSuspended() && !atTail && iterations != 0)
         {
             try 
             {
                 sub.getSendLock();
-                atTail =  attemptDelivery(sub, deliveries);
+                atTail =  attemptDelivery(sub);
                 if (atTail && sub.isAutoClose())
                 {
                     unregisterSubscription(sub);
@@ -1221,6 +1221,10 @@
                     ProtocolOutputConverter converter = 
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
                     
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), 
sub.getConsumerTag());
                 }
+                else if (!atTail)
+                {
+                    iterations--;
+                }
             }
             finally
             {
@@ -1239,7 +1243,7 @@
         return atTail;
     }
 
-    private boolean attemptDelivery(Subscription sub, Long deliveries) throws 
AMQException
+    private boolean attemptDelivery(Subscription sub) throws AMQException
     {
         boolean atTail = false;
         boolean advanced = false;
@@ -1258,11 +1262,9 @@
                             if (!sub.isBrowser() && !node.acquire(sub))
                             {
                                 sub.restoreCredit(node);
-
                             }
                             else
                             {
-                                deliveries--;
                                 deliverMessage(sub, node);
 
                                 if (sub.isBrowser())
@@ -1352,11 +1354,11 @@
         boolean deliveryIncomplete = true;
 
         int extraLoops = 1;
-        Long deliveries = new Long(MAX_ASYNC_DELIVERIES);
+        Long iterations = new Long(MAX_ASYNC_DELIVERIES);
 
         _asynchronousRunner.compareAndSet(runner, null);
 
-        while (deliveries != 0 && ((previousStateChangeCount != 
(stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && 
_asynchronousRunner.compareAndSet(null, runner))
+        while (iterations != 0 && ((previousStateChangeCount != 
(stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && 
_asynchronousRunner.compareAndSet(null, runner))
         {
             // we want to have one extra loop after every subscription has 
reached the point where it cannot move
             // further, just in case the advance of one subscription in the 
last loop allows a different subscription to
@@ -1386,7 +1388,7 @@
                         QueueEntry node = moveSubscriptionToNextNode(sub);
                         if (node != null)
                         {
-                            done = attemptDelivery(sub, deliveries);
+                            done = attemptDelivery(sub);
                         }
                     }
                     if (done)
@@ -1409,6 +1411,7 @@
                     }
                     else
                     {
+                        iterations--;
                         extraLoops = 1;
                     }
                 }
@@ -1422,7 +1425,7 @@
 
         // If deliveries == 0 then the limitting factor was the time-slicing 
rather than available messages or credit
         // therefore we should schedule this runner again (unless someone 
beats us to it :-) ).
-        if (deliveries == 0 && _asynchronousRunner.compareAndSet(null, runner))
+        if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
         {
             _asyncDelivery.execute(runner);
         }


Reply via email to