Author: ritchiem
Date: Tue Aug 12 02:36:08 2008
New Revision: 685104

URL: http://svn.apache.org/viewvc?rev=685104&view=rev
Log:
QPID-1136 : Fixed Flow Control problem due to this change and added test to 
validate that Flow Control is operating correctly

Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
 Tue Aug 12 02:36:08 2008
@@ -116,7 +116,6 @@
         //make persistent changes, i.e. dequeue and decrementReference
         for (QueueEntry msg : _unacked.values())
         {
-            msg.restoreCredit();
             //Message has been ack so discard it. This will dequeue and 
decrement the reference.
             msg.discard(storeContext);
 

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 Tue Aug 12 02:36:08 2008
@@ -94,7 +94,7 @@
             if(message != null)
             {
                 _unackedSize -= message.getMessage().getSize();
-                message.restoreCredit();
+
             }
 
             return message;
@@ -185,8 +185,6 @@
 
                 _unackedSize -= unacked.getValue().getMessage().getSize();
 
-                unacked.getValue().restoreCredit();
-
 
                 if (unacked.getKey() == deliveryTag)
                 {

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 Tue Aug 12 02:36:08 2008
@@ -175,8 +175,6 @@
 
     void dispose(final StoreContext storeContext) throws 
MessageCleanupException;
 
-    void restoreCredit();
-
     void discard(StoreContext storeContext) throws FailedDequeueException, 
MessageCleanupException;
 
     boolean isQueueDeleted();

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 Tue Aug 12 02:36:08 2008
@@ -256,6 +256,12 @@
 
         if((state.getState() == State.ACQUIRED) 
&&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
         {
+            if (state instanceof SubscriptionAcquiredState)
+            {
+                Subscription s = ((SubscriptionAcquiredState) 
state).getSubscription();
+                s.restoreCredit(this);
+            }
+
             getQueue().dequeue(storeContext, this);
             if(_stateChangeListeners != null)
             {
@@ -282,16 +288,6 @@
         }
     }
 
-    public void restoreCredit()
-    {
-        EntryState state = _state;
-        if(state instanceof SubscriptionAcquiredState)
-        {
-            Subscription s = ((SubscriptionAcquiredState) 
_state).getSubscription();
-            s.restoreCredit(this);
-        }
-    }
-
     public void discard(StoreContext storeContext) throws 
FailedDequeueException, MessageCleanupException
     {
         //if the queue is null then the message is waiting to be acked, but 
has been removed.

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 Tue Aug 12 02:36:08 2008
@@ -134,7 +134,6 @@
                         {
                             beginTranIfNecessary();
                         }
-                        message.restoreCredit();
                         //Message has been ack so discard it. This will 
dequeue and decrement the reference.
                         message.discard(_storeContext);
 

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
 Tue Aug 12 02:36:08 2008
@@ -32,6 +32,7 @@
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
@@ -301,6 +302,31 @@
         }
     }
 
+            /**
+     * A regression fixing QPID-1136 showed this up
+     *
+     * @throws Exception
+     */
+    public void testMessageDequeueRestoresCreditTest() throws Exception
+    {
+        // Send 10 messages
+        Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
+
+        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, 
_protocolSession,
+                                                                            
DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+        final int msgCount = 1;
+        publishMessages(msgCount);
+
+        _queue.deliverAsync(_subscription);
+
+        _channel.acknowledgeMessage(1, false);
+
+        // Check credit available
+        assertTrue("No credit available", creditManager.hasCredit());
+
+    }
+
+
 /*
     public void testPrefetchHighLow() throws AMQException
     {


Reply via email to