Author: rgodfrey
Date: Thu Dec 20 12:08:01 2007
New Revision: 606015
URL: http://svn.apache.org/viewvc?rev=606015&view=rev
Log:
QPID-714 : (Patch from Aidan Skinner) Issue with competing,
transactional/client-ack consumers
Ack each individual message on commit, not use multiple acks
Modified:
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
Modified:
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=606015&r1=606014&r2=606015&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu Dec 20 12:08:01 2007
@@ -644,15 +644,7 @@
for (Iterator<BasicMessageConsumer> i =
_consumers.values().iterator(); i.hasNext();)
{
-// i.next().acknowledgeLastDelivered();
-// }
-
- // get next acknowledgement to server
- Long next = i.next().getLastDelivered();
- if (next != null && next > lastTag)
- {
- lastTag = next;
- }
+ i.next().acknowledgeDelivered();
}
if (_transacted)
@@ -662,18 +654,9 @@
for (int i = 0; i < _removedConsumers.size(); i++)
{
// Sends acknowledgement to server
- Long next =
_removedConsumers.get(i).getLastDelivered();
- if (next != null && next > lastTag)
- {
- lastTag = next;
- }
+ _removedConsumers.get(i).acknowledgeDelivered();
_removedConsumers.remove(i);
}
- }
-
- if (lastTag != -1)
- {
- acknowledgeMessage(lastTag, true);
}
// Commits outstanding messages sent and outstanding
acknowledgements.
Modified:
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=606015&r1=606014&r2=606015&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Thu Dec 20 12:08:01 2007
@@ -833,20 +833,11 @@
}
/** Acknowledge up to last message delivered (if any). Used when
commiting. */
- void acknowledgeLastDelivered()
+ void acknowledgeDelivered()
{
- if (!_receivedDeliveryTags.isEmpty())
+ while (!_receivedDeliveryTags.isEmpty())
{
- long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- _session.acknowledgeMessage(lastDeliveryTag, true);
+ _session.acknowledgeMessage(_receivedDeliveryTags.poll(),
false);
}
}
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=606015&r1=606014&r2=606015&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu Dec 20 12:08:01 2007
@@ -617,7 +617,7 @@
for (Iterator<BasicMessageConsumer> i =
_consumers.values().iterator(); i.hasNext();)
{
// Sends acknowledgement to server
- i.next().acknowledgeLastDelivered();
+ i.next().acknowledgeDelivered();
}
// Commits outstanding messages sent and outstanding
acknowledgements.
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=606015&r1=606014&r2=606015&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Thu Dec 20 12:08:01 2007
@@ -755,20 +755,11 @@
}
/** Acknowledge up to last message delivered (if any). Used when
commiting. */
- void acknowledgeLastDelivered()
+ void acknowledgeDelivered()
{
- if (!_receivedDeliveryTags.isEmpty())
+ while (!_receivedDeliveryTags.isEmpty())
{
- long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- _session.acknowledgeMessage(lastDeliveryTag, true);
+ _session.acknowledgeMessage(_receivedDeliveryTags.poll(),
false);
}
}
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java?rev=606015&r1=606014&r2=606015&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
Thu Dec 20 12:08:01 2007
@@ -26,6 +26,7 @@
import junit.framework.TestCase;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -120,6 +121,13 @@
super.tearDown();
}
+
+ public int getMessageCount(String queueName)
+ {
+ return
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(_virtualhost.substring(1))
+ .getQueueRegistry().getQueue(new
AMQShortString(queueName)).getMessageCount();
+ }
+
public void testDummyinVMTestCase()
{