Author: rgodfrey
Date: Wed Apr 16 02:40:42 2008
New Revision: 648648

URL: http://svn.apache.org/viewvc?rev=648648&view=rev
Log:
QPID-926 : Perform all store operations associated with an acknowledge in a 
single store transaction

Modified:
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=648648&r1=648647&r2=648648&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 Wed Apr 16 02:40:42 2008
@@ -49,8 +49,6 @@
     /** Where to put undeliverable messages */
     private final List<RequiredDeliveryException> _returnMessages;
 
-    private final Set<Long> _browsedAcks;
-
     private final MessageStore _messageStore;
 
     private final StoreContext _storeContext;
@@ -61,11 +59,17 @@
     public NonTransactionalContext(MessageStore messageStore, StoreContext 
storeContext, AMQChannel channel,
                                    List<RequiredDeliveryException> 
returnMessages, Set<Long> browsedAcks)
     {
+        this(messageStore,storeContext,channel,returnMessages);
+    }
+
+    public NonTransactionalContext(MessageStore messageStore, StoreContext 
storeContext, AMQChannel channel,
+                                   List<RequiredDeliveryException> 
returnMessages)
+    {
         _channel = channel;
         _storeContext = storeContext;
         _returnMessages = returnMessages;
         _messageStore = messageStore;
-        _browsedAcks = browsedAcks;
+
     }
 
 
@@ -112,6 +116,9 @@
                                    boolean multiple, final 
UnacknowledgedMessageMap unacknowledgedMessageMap)
             throws AMQException
     {
+
+        final boolean debug = _log.isDebugEnabled();
+
         if (multiple)
         {
             if (deliveryTag == 0)
@@ -125,20 +132,17 @@
                 {
                     public boolean callback(UnacknowledgedMessage message) 
throws AMQException
                     {
-                        if (!_browsedAcks.contains(deliveryTag))
+                        if (debug)
                         {
-                            if (_log.isDebugEnabled())
-                            {
-                                _log.debug("Discarding message: " + 
message.getMessage().getMessageId());
-                            }
-
-                            //Message has been ack so discard it. This will 
dequeue and decrement the reference.
-                            message.discard(_storeContext);
+                            _log.debug("Discarding message: " + 
message.getMessage().getMessageId());
                         }
-                        else
+                        if(message.getMessage().isPersistent())
                         {
-                            _browsedAcks.remove(deliveryTag);
+                            beginTranIfNecessary();
                         }
+                        //Message has been ack so discard it. This will 
dequeue and decrement the reference.
+                        message.discard(_storeContext);
+
                         return false;
                     }
 
@@ -159,20 +163,17 @@
                 unacknowledgedMessageMap.drainTo(acked, deliveryTag);
                 for (UnacknowledgedMessage msg : acked)
                 {
-                    if (!_browsedAcks.contains(deliveryTag))
+                    if (debug)
                     {
-                        if (_log.isDebugEnabled())
-                        {
-                            _log.debug("Discarding message: " + 
msg.getMessage().getMessageId());
-                        }
-
-                        //Message has been ack so discard it. This will 
dequeue and decrement the reference.
-                        msg.discard(_storeContext);
+                        _log.debug("Discarding message: " + 
msg.getMessage().getMessageId());
                     }
-                    else
+                    if(msg.getMessage().isPersistent())
                     {
-                        _browsedAcks.remove(deliveryTag);
+                        beginTranIfNecessary();
                     }
+
+                    //Message has been ack so discard it. This will dequeue 
and decrement the reference.
+                    msg.discard(_storeContext);
                 }
             }
         }
@@ -189,27 +190,31 @@
                                        _channel.getChannelId());
             }
 
-            if (!_browsedAcks.contains(deliveryTag))
+            if (debug)
             {
-                if (_log.isDebugEnabled())
-                {
-                    _log.debug("Discarding message: " + 
msg.getMessage().getMessageId());
-                }
-
-                //Message has been ack so discard it. This will dequeue and 
decrement the reference.
-                msg.discard(_storeContext);
+                _log.debug("Discarding message: " + 
msg.getMessage().getMessageId());
             }
-            else
+            if(msg.getMessage().isPersistent())
             {
-                _browsedAcks.remove(deliveryTag);
+                beginTranIfNecessary();
             }
 
-            if (_log.isDebugEnabled())
+            //Message has been ack so discard it. This will dequeue and 
decrement the reference.
+            msg.discard(_storeContext);
+
+            if (debug)
             {
                 _log.debug("Received non-multiple ack for messaging with 
delivery tag " + deliveryTag + " msg id " +
                            msg.getMessage().getMessageId());
             }
         }
+
+        if(_inTran)
+        {
+            _messageStore.commitTran(_storeContext);
+            _inTran = false;
+        }
+
     }
 
     public void messageFullyReceived(boolean persistent) throws AMQException


Reply via email to