Author: rgodfrey
Date: Wed Apr 16 02:56:48 2008
New Revision: 648652

URL: http://svn.apache.org/viewvc?rev=648652&view=rev
Log:
QPID-927 : Multiple acknowledgements should be coalesced into single multiple 
ack

Modified:
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=648652&r1=648651&r2=648652&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Wed Apr 16 02:56:48 2008
@@ -39,6 +39,10 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.TreeSet;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -111,6 +115,14 @@
     /** List of tags delievered, The last of which which should be 
acknowledged on commit in transaction mode. */
     private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new 
ConcurrentLinkedQueue<Long>();
 
+    /** The last tag that was "multiple" acknowledged on this session (if 
transacted) */
+    private long _lastAcked;
+
+    /** set of tags which have previously been acked; but not part of the 
multiple ack (transacted mode only) */
+    private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
+
+    private final Object _commitLock = new Object();
+
     /**
      * The thread that was used to call receive(). This is important for being 
able to interrupt that thread if a
      * receive() is in progress.
@@ -126,6 +138,8 @@
     private final boolean _noConsume;
     private List<StackTraceElement> _closedStack = null;
 
+
+
     protected BasicMessageConsumer(int channelId, AMQConnection connection, 
AMQDestination destination,
                                    String messageSelector, boolean noLocal, 
MessageFactoryRegistry messageFactory, AMQSession session,
                                    AMQProtocolHandler protocolHandler, 
FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
@@ -809,9 +823,54 @@
     /** Acknowledge up to last message delivered (if any). Used when 
commiting. */
     void acknowledgeDelivered()
     {
-       while (!_receivedDeliveryTags.isEmpty())
+        synchronized(_commitLock)
         {
-               _session.acknowledgeMessage(_receivedDeliveryTags.poll(), 
false);
+            ArrayList<Long> tagsToAck = new ArrayList<Long>();
+
+            while (!_receivedDeliveryTags.isEmpty())
+            {
+                tagsToAck.add(_receivedDeliveryTags.poll());
+            }
+
+            Collections.sort(tagsToAck);
+
+            long prevAcked = _lastAcked;
+            long oldAckPoint = -1;
+
+            while(oldAckPoint != prevAcked)
+            {
+                oldAckPoint = prevAcked;
+
+                Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+                while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() 
== prevAcked+1)
+                {
+                    tagsToAckIterator.remove();
+                    prevAcked++;
+                }
+
+                Iterator<Long> previousAckIterator = 
_previouslyAcked.iterator();
+                while(previousAckIterator.hasNext() && 
previousAckIterator.next() == prevAcked+1)
+                {
+                    previousAckIterator.remove();
+                    prevAcked++;
+                }
+
+            }
+            if(prevAcked != _lastAcked)
+            {
+                _session.acknowledgeMessage(prevAcked, true);
+                _lastAcked = prevAcked;
+            }
+
+            Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+            while(tagsToAckIterator.hasNext())
+            {
+                Long tag = tagsToAckIterator.next();
+                _session.acknowledgeMessage(tag, false);
+                _previouslyAcked.add(tag);
+            }
         }
     }
 


Reply via email to