Author: rgodfrey
Date: Wed Apr 16 02:56:48 2008
New Revision: 648652
URL: http://svn.apache.org/viewvc?rev=648652&view=rev
Log:
QPID-927 : Multiple acknowledgements should be coalesced into single multiple
ack
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=648652&r1=648651&r2=648652&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Wed Apr 16 02:56:48 2008
@@ -39,6 +39,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.SortedSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -111,6 +115,14 @@
/** List of tags delievered, The last of which which should be
acknowledged on commit in transaction mode. */
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new
ConcurrentLinkedQueue<Long>();
+ /** The last tag that was "multiple" acknowledged on this session (if
transacted) */
+ private long _lastAcked;
+
+ /** set of tags which have previously been acked; but not part of the
multiple ack (transacted mode only) */
+ private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
+
+ private final Object _commitLock = new Object();
+
/**
* The thread that was used to call receive(). This is important for being
able to interrupt that thread if a
* receive() is in progress.
@@ -126,6 +138,8 @@
private final boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
+
+
protected BasicMessageConsumer(int channelId, AMQConnection connection,
AMQDestination destination,
String messageSelector, boolean noLocal,
MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler,
FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
@@ -809,9 +823,54 @@
/** Acknowledge up to last message delivered (if any). Used when
commiting. */
void acknowledgeDelivered()
{
- while (!_receivedDeliveryTags.isEmpty())
+ synchronized(_commitLock)
{
- _session.acknowledgeMessage(_receivedDeliveryTags.poll(),
false);
+ ArrayList<Long> tagsToAck = new ArrayList<Long>();
+
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ tagsToAck.add(_receivedDeliveryTags.poll());
+ }
+
+ Collections.sort(tagsToAck);
+
+ long prevAcked = _lastAcked;
+ long oldAckPoint = -1;
+
+ while(oldAckPoint != prevAcked)
+ {
+ oldAckPoint = prevAcked;
+
+ Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+ while(tagsToAckIterator.hasNext() && tagsToAckIterator.next()
== prevAcked+1)
+ {
+ tagsToAckIterator.remove();
+ prevAcked++;
+ }
+
+ Iterator<Long> previousAckIterator =
_previouslyAcked.iterator();
+ while(previousAckIterator.hasNext() &&
previousAckIterator.next() == prevAcked+1)
+ {
+ previousAckIterator.remove();
+ prevAcked++;
+ }
+
+ }
+ if(prevAcked != _lastAcked)
+ {
+ _session.acknowledgeMessage(prevAcked, true);
+ _lastAcked = prevAcked;
+ }
+
+ Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+ while(tagsToAckIterator.hasNext())
+ {
+ Long tag = tagsToAckIterator.next();
+ _session.acknowledgeMessage(tag, false);
+ _previouslyAcked.add(tag);
+ }
}
}