Author: rgodfrey
Date: Thu Dec 20 12:08:01 2007
New Revision: 606015

URL: http://svn.apache.org/viewvc?rev=606015&view=rev
Log:
QPID-714 : (Patch from Aidan Skinner) Issue with competing, 
transactional/client-ack consumers
Ack each individual message on commit, not use multiple acks

Modified:
    
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java

Modified: 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=606015&r1=606014&r2=606015&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Dec 20 12:08:01 2007
@@ -644,15 +644,7 @@
 
                     for (Iterator<BasicMessageConsumer> i = 
_consumers.values().iterator(); i.hasNext();)
                     {
-//                        i.next().acknowledgeLastDelivered();
-//                    }
-
-                        // get next acknowledgement to server
-                        Long next = i.next().getLastDelivered();
-                        if (next != null && next > lastTag)
-                        {
-                            lastTag = next;
-                        }
+                        i.next().acknowledgeDelivered();
                     }
 
                     if (_transacted)
@@ -662,18 +654,9 @@
                         for (int i = 0; i < _removedConsumers.size(); i++)
                         {
                             // Sends acknowledgement to server
-                            Long next = 
_removedConsumers.get(i).getLastDelivered();
-                            if (next != null && next > lastTag)
-                            {
-                                lastTag = next;
-                            }
+                            _removedConsumers.get(i).acknowledgeDelivered();
                             _removedConsumers.remove(i);
                         }
-                    }
-
-                    if (lastTag != -1)
-                    {
-                        acknowledgeMessage(lastTag, true);
                     }
 
                     // Commits outstanding messages sent and outstanding 
acknowledgements.

Modified: 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=606015&r1=606014&r2=606015&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Thu Dec 20 12:08:01 2007
@@ -833,20 +833,11 @@
     }
 
     /** Acknowledge up to last message delivered (if any). Used when 
commiting. */
-    void acknowledgeLastDelivered()
+    void acknowledgeDelivered()
     {
-        if (!_receivedDeliveryTags.isEmpty())
+       while (!_receivedDeliveryTags.isEmpty())
         {
-            long lastDeliveryTag = _receivedDeliveryTags.poll();
-
-            while (!_receivedDeliveryTags.isEmpty())
-            {
-                lastDeliveryTag = _receivedDeliveryTags.poll();
-            }
-
-            assert _receivedDeliveryTags.isEmpty();
-
-            _session.acknowledgeMessage(lastDeliveryTag, true);
+               _session.acknowledgeMessage(_receivedDeliveryTags.poll(), 
false);
         }
     }
 

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=606015&r1=606014&r2=606015&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Dec 20 12:08:01 2007
@@ -617,7 +617,7 @@
             for (Iterator<BasicMessageConsumer> i = 
_consumers.values().iterator(); i.hasNext();)
             {
                 // Sends acknowledgement to server
-                i.next().acknowledgeLastDelivered();
+               i.next().acknowledgeDelivered();
             }
 
             // Commits outstanding messages sent and outstanding 
acknowledgements.

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=606015&r1=606014&r2=606015&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Thu Dec 20 12:08:01 2007
@@ -755,20 +755,11 @@
     }
 
     /** Acknowledge up to last message delivered (if any). Used when 
commiting. */
-    void acknowledgeLastDelivered()
+    void acknowledgeDelivered()
     {
-        if (!_receivedDeliveryTags.isEmpty())
+       while (!_receivedDeliveryTags.isEmpty())
         {
-            long lastDeliveryTag = _receivedDeliveryTags.poll();
-
-            while (!_receivedDeliveryTags.isEmpty())
-            {
-                lastDeliveryTag = _receivedDeliveryTags.poll();
-            }
-
-            assert _receivedDeliveryTags.isEmpty();
-
-            _session.acknowledgeMessage(lastDeliveryTag, true);
+               _session.acknowledgeMessage(_receivedDeliveryTags.poll(), 
false);
         }
     }
 

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java?rev=606015&r1=606014&r2=606015&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
 Thu Dec 20 12:08:01 2007
@@ -26,6 +26,7 @@
 import junit.framework.TestCase;
 
 import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 
@@ -120,6 +121,13 @@
 
         super.tearDown();
     }
+
+    public int getMessageCount(String queueName)
+    {
+        return 
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(_virtualhost.substring(1))
+                .getQueueRegistry().getQueue(new 
AMQShortString(queueName)).getMessageCount();
+    }
+
 
     public void testDummyinVMTestCase()
     {


Reply via email to