Author: rgodfrey
Date: Sun May 11 08:42:45 2008
New Revision: 655326

URL: http://svn.apache.org/viewvc?rev=655326&view=rev
Log:
Copy over QPID-925

Modified:
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=655326&r1=655325&r2=655326&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
 Sun May 11 08:42:45 2008
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.ArrayList;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.store.StoreContext;
@@ -37,7 +38,7 @@
 {
     private final UnacknowledgedMessageMap _map;
     private final Map<Long, QueueEntry> _unacked = new 
HashMap<Long,QueueEntry>();
-    private final List<Long> _individual = new LinkedList<Long>();
+    private List<Long> _individual;
     private long _deliveryTag;
     private boolean _multiple;
 
@@ -50,7 +51,10 @@
     {
         if (!multiple)
         {
-
+            if(_individual == null)
+            {
+                _individual = new ArrayList<Long>();
+            }
             //have acked a single message that is not part of
             //the previously acked region so record
             //individually
@@ -67,26 +71,33 @@
 
     public void consolidate()
     {
-        //lookup all the unacked messages that have been acked in this 
transaction
-        if (_multiple)
+        if(_unacked.isEmpty())
         {
-            //get all the unacked messages for the accumulated
-            //multiple acks
-            _map.collect(_deliveryTag, true, _unacked);
-        }
-        //get any unacked messages for individual acks outside the
-        //range covered by multiple acks
-        for (long tag : _individual)
-        {
-            if(_deliveryTag < tag)
+            //lookup all the unacked messages that have been acked in this 
transaction
+            if (_multiple)
+            {
+                //get all the unacked messages for the accumulated
+                //multiple acks
+                _map.collect(_deliveryTag, true, _unacked);
+            }
+            if(_individual != null)
             {
-                _map.collect(tag, false, _unacked);
+                //get any unacked messages for individual acks outside the
+                //range covered by multiple acks
+                for (long tag : _individual)
+                {
+                    if(_deliveryTag < tag)
+                    {
+                        _map.collect(tag, false, _unacked);
+                    }
+                }
             }
         }
     }
 
     public boolean checkPersistent() throws AMQException
     {
+        consolidate();
         //if any of the messages in unacked are persistent the txn
         //buffer must be marked as persistent:
         for (QueueEntry msg : _unacked.values())

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=655326&r1=655325&r2=655326&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
 Sun May 11 08:42:45 2008
@@ -176,8 +176,7 @@
         // as new acks come in. If this is the first ack in the txn
         // we will need to create and enlist the op.
         if (_ackOp == null)
-        {
-            beginTranIfNecessary();
+        {            
             _ackOp = new TxAck(unacknowledgedMessageMap);
             _txnBuffer.enlist(_ackOp);
         }
@@ -192,6 +191,10 @@
         {
             _ackOp.update(deliveryTag, multiple);
         }
+        if(!_inTran && _ackOp.checkPersistent())
+        {
+            beginTranIfNecessary();
+        }
     }
 
     public void messageFullyReceived(boolean persistent) throws AMQException


Reply via email to