Author: aidan
Date: Fri Oct 17 09:09:51 2008
New Revision: 705657

URL: http://svn.apache.org/viewvc?rev=705657&view=rev
Log:
QPID-1315:

   BasicBytesFlowControl doesn't wait long enough to determine if the 3rd 
message is going to be delivered accidently. It also ack'd every message, which 
was not it's intent, so use acknowledgeThis() instead.

   Refactor common code out of processQueue and flushSubscription into 
attemptDelivery.

   Make sure sendLock is held when closing the consumer.

Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=705657&r1=705656&r2=705657&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Fri Oct 17 09:09:51 2008
@@ -336,7 +336,14 @@
         Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
         if (sub != null)
         {
-            sub.getQueue().unregisterSubscription(sub);
+            try {
+                sub.getSendLock();
+                sub.getQueue().unregisterSubscription(sub);
+            }
+            finally 
+            {
+                sub.releaseSendLock();
+            }
             return true;
         }
         else
@@ -395,7 +402,16 @@
 
             Subscription sub = me.getValue();
 
-            sub.getQueue().unregisterSubscription(sub);
+            try
+            {
+                sub.getSendLock();
+                sub.getQueue().unregisterSubscription(sub);
+            }
+            finally
+            {
+                sub.releaseSendLock();
+            }
+            
         }
 
         _tag2SubscriptionMap.clear();

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=705657&r1=705656&r2=705657&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Fri Oct 17 09:09:51 2008
@@ -1174,7 +1174,7 @@
             boolean complete = false;
             try
             {
-                complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES);
+                complete = flushSubscription(_sub, new 
Long(MAX_ASYNC_DELIVERIES));
 
             }
             catch (AMQException e)
@@ -1204,79 +1204,28 @@
         flushSubscription(sub, Long.MAX_VALUE);
     }
 
-    public boolean flushSubscription(Subscription sub, long deliveries) throws 
AMQException
+    public boolean flushSubscription(Subscription sub, Long deliveries) throws 
AMQException
     {
         boolean atTail = false;
-        boolean advanced;
 
         while (!sub.isSuspended() && !atTail && deliveries != 0)
         {
-
-            advanced = false;
-            sub.getSendLock();
-            try
+            try 
             {
-                if (sub.isActive())
+                sub.getSendLock();
+                atTail =  attemptDelivery(sub, deliveries);
+                if (atTail && sub.isAutoClose())
                 {
-                    QueueEntry node = moveSubscriptionToNextNode(sub);
-                    if (!(node.isAcquired() || node.isDeleted()))
-                    {
-                        if (!sub.isSuspended())
-                        {
-                            if (sub.hasInterest(node))
-                            {
-                                if (!sub.wouldSuspend(node))
-                                {
-                                    if (!sub.isBrowser() && !node.acquire(sub))
-                                    {
-                                        sub.restoreCredit(node);
-
-                                    }
-                                    else
-                                    {
-                                        deliveries--;
-                                        deliverMessage(sub, node);
-
-                                        if (sub.isBrowser())
-                                        {
-                                            QueueEntry newNode = 
_entries.next(node);
-
-                                            if (newNode != null)
-                                            {
-                                                advanced = true;
-                                                sub.setLastSeenEntry(node, 
newNode);
-                                                node = sub.getLastSeenEntry();
-                                            }
-                                        }
-                                    }
-
-                                }
-                                else
-                                {
-                                    break;
-                                }
-                            }
-                            else
-                            {
-                                // this subscription is not interested in this 
node so we can skip over it
-                                QueueEntry newNode = _entries.next(node);
-                                if (newNode != null)
-                                {
-                                    sub.setLastSeenEntry(node, newNode);
-                                }
-                            }
-                        }
-
-                    }
-                    atTail = (_entries.next(node) == null) && !advanced;
+                    unregisterSubscription(sub);
 
+                    ProtocolOutputConverter converter = 
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+                    
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), 
sub.getConsumerTag());
                 }
             }
             finally
             {
                 sub.releaseSendLock();
             }
-
         }
 
         // if there's (potentially) more than one subscription the others will 
potentially not have been advanced to the
@@ -1287,16 +1236,72 @@
         {
             advanceAllSubscriptions();
         }
+        return atTail;
+    }
 
-        if (atTail && sub.isAutoClose())
+    private boolean attemptDelivery(Subscription sub, Long deliveries) throws 
AMQException
+    {
+        boolean atTail = false;
+        boolean advanced = false;
+        boolean subActive = sub.isActive();
+        if (subActive)
         {
-            unregisterSubscription(sub);
+            QueueEntry node = moveSubscriptionToNextNode(sub);
+            if (!(node.isAcquired() || node.isDeleted()))
+            {
+                if (!sub.isSuspended())
+                {
+                    if (sub.hasInterest(node))
+                    {
+                        if (!sub.wouldSuspend(node))
+                        {
+                            if (!sub.isBrowser() && !node.acquire(sub))
+                            {
+                                sub.restoreCredit(node);
 
-            ProtocolOutputConverter converter = 
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-            
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), 
sub.getConsumerTag());
-        }
+                            }
+                            else
+                            {
+                                deliveries--;
+                                deliverMessage(sub, node);
 
-        return atTail;
+                                if (sub.isBrowser())
+                                {
+                                    QueueEntry newNode = _entries.next(node);
+
+                                    if (newNode != null)
+                                    {
+                                        advanced = true;
+                                        sub.setLastSeenEntry(node, newNode);
+                                        node = sub.getLastSeenEntry();
+                                    }
+                                }
+                            }
+
+                        }
+                        else // Not enough Credit for message and wouldSuspend
+                        {
+                            //QPID-1187 - Treat the subscription as suspended 
for this message
+                            // and wait for the message to be removed to 
continue delivery.
+                            subActive = false;
+                            node.addStateChangeListener(new 
QueueEntryListener(sub, node));
+                        }
+                    }
+                    else
+                    {
+                        // this subscription is not interested in this node so 
we can skip over it
+                        QueueEntry newNode = _entries.next(node);
+                        if (newNode != null)
+                        {
+                            sub.setLastSeenEntry(node, newNode);
+                        }
+                    }
+                }
+
+            }
+            atTail = (_entries.next(node) == null) && !advanced;
+        }
+        return atTail || !subActive;
     }
 
     protected void advanceAllSubscriptions() throws AMQException
@@ -1347,7 +1352,7 @@
         boolean deliveryIncomplete = true;
 
         int extraLoops = 1;
-        int deliveries = MAX_ASYNC_DELIVERIES;
+        Long deliveries = new Long(MAX_ASYNC_DELIVERIES);
 
         _asynchronousRunner.compareAndSet(runner, null);
 
@@ -1372,110 +1377,46 @@
             {
                 boolean closeConsumer = false;
                 Subscription sub = 
subscriptionIter.getNode().getSubscription();
-                if (sub != null)
+                sub.getSendLock();
+                try
                 {
-                    sub.getSendLock();
-                    try
+                    if (sub != null)
                     {
-                        QueueEntry node = moveSubscriptionToNextNode(sub);
 
-                        if (node != null && sub.isActive())
+                        QueueEntry node = moveSubscriptionToNextNode(sub);
+                        if (node != null)
                         {
-                            boolean advanced = false;
-                            boolean subActive = false;
-
-                            if (!(node.isAcquired() || node.isDeleted()))
-                            {
-                                if (!sub.isSuspended())
-                                {
-                                    subActive = true;
-                                    if (sub.hasInterest(node))
-                                    {
-                                        if (!sub.wouldSuspend(node))
-                                        {
-                                            if (!sub.isBrowser() && 
!node.acquire(sub))
-                                            {
-                                                sub.restoreCredit(node);
-
-                                            }
-                                            else
-                                            {
-                                                deliverMessage(sub, node);
-                                                deliveries--;
-
-                                                if (sub.isBrowser())
-                                                {
-                                                    QueueEntry newNode = 
_entries.next(node);
-
-                                                    if (newNode != null)
-                                                    {
-                                                        
sub.setLastSeenEntry(node, newNode);
-                                                        node = 
sub.getLastSeenEntry();
-                                                        advanced = true;
-                                                    }
-
-                                                }
-                                            }
-                                            done = false;
-                                        }
-                                        else // Not enough Credit for message 
and wouldSuspend
-                                        {
-                                            //QPID-1187 - Treat the 
subscription as suspended for this message
-                                            // and wait for the message to be 
removed to continue delivery.
-                                            subActive = false;
-
-                                            node.addStateChangeListener(new 
QueueEntryListener(sub, node));
-                                        }
-                                    }
-                                    else
-                                    {
-                                        // this subscription is not interested 
in this node so we can skip over it
-                                        QueueEntry newNode = 
_entries.next(node);
-                                        if (newNode != null)
-                                        {
-                                            sub.setLastSeenEntry(node, 
newNode);
-                                        }
-                                    }
-                                }
-                            }
-                            final boolean atTail = (_entries.next(node) == 
null);
-
-                            done = done && (!subActive || atTail);
-
-                            closeConsumer = (atTail && !advanced && 
sub.isAutoClose());
+                            done = attemptDelivery(sub, deliveries);
                         }
                     }
-                    finally
+                    if (done)
                     {
-                        sub.releaseSendLock();
-                    }
-
-                    if (closeConsumer)
-                    {
-                        unregisterSubscription(sub);
-
-                        ProtocolOutputConverter converter = 
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-                        
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), 
sub.getConsumerTag());
-                    }
+                        if (extraLoops == 0)
+                        {
+                            deliveryIncomplete = false;
+                            if (sub.isAutoClose())
+                            {
+                                unregisterSubscription(sub);
 
-                }
-                if (done)
-                {
-                    if (extraLoops == 0)
-                    {
-                        deliveryIncomplete = false;
+                                ProtocolOutputConverter converter = 
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+                                
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), 
sub.getConsumerTag());
+                            }
+                        }
+                        else
+                        {
+                            extraLoops--;
+                        }
                     }
                     else
                     {
-                        extraLoops--;
+                        extraLoops = 1;
                     }
                 }
-                else
+                finally
                 {
-                    extraLoops = 1;
+                    sub.releaseSendLock();
                 }
             }
-
             _asynchronousRunner.set(null);
         }
 

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java?rev=705657&r1=705656&r2=705657&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
 Fri Oct 17 09:09:51 2008
@@ -21,6 +21,7 @@
 package org.apache.qpid.test.client;
 
 import org.apache.qpid.client.AMQSession_0_8;
+import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.log4j.Logger;
 
@@ -91,25 +92,22 @@
         assertNotNull("Second message not received", r2);
         assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
 
-        Message r3 = recv.receiveNoWait();
+        Message r3 = recv.receive(RECEIVE_TIMEOUT);
         assertNull("Third message incorrectly delivered", r3);
 
-        r1.acknowledge();
+        ((AbstractJMSMessage)r1).acknowledgeThis();
 
-        r3 = recv.receiveNoWait();
+        r3 = recv.receive(RECEIVE_TIMEOUT);
         assertNull("Third message incorrectly delivered", r3);
 
-        r2.acknowledge();
+        ((AbstractJMSMessage)r2).acknowledgeThis();
 
         r3 = recv.receive(RECEIVE_TIMEOUT);
         assertNotNull("Third message not received", r3);
         assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
 
-        r3.acknowledge();
-        recv.close();
-        consumerSession.close();
+        ((AbstractJMSMessage)r3).acknowledgeThis();
         consumerConnection.close();
-
     }
 
     public void testTwoConsumersBytesFlowControl() throws Exception
@@ -161,21 +159,21 @@
         assertNotNull("First message not received", r1);
         assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
 
-        Message r2 = recv1.receiveNoWait();
+        Message r2 = recv1.receive(RECEIVE_TIMEOUT);
         assertNull("Second message incorrectly delivered", r2);
 
         Session consumerSession2 = consumerConnection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
         ((AMQSession_0_8) consumerSession2).setPrefetchLimits(0, 256);
         MessageConsumer recv2 = consumerSession2.createConsumer(_queue);
 
-        r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT);
+        r2 = recv2.receive(RECEIVE_TIMEOUT);
         assertNotNull("Second message not received", r2);
         assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
 
-        Message r3 = recv2.receiveNoWait();
+        Message r3 = recv2.receive(RECEIVE_TIMEOUT);
         assertNull("Third message incorrectly delivered", r3);
 
-        r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT);
+        r3 = recv1.receive(RECEIVE_TIMEOUT);
         assertNotNull("Third message not received", r3);
         assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
 


Reply via email to