Author: rgodfrey
Date: Sun May 11 09:01:10 2008
New Revision: 655330

URL: http://svn.apache.org/viewvc?rev=655330&view=rev
Log:
Copy over QPID-926

Modified:
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=655330&r1=655329&r2=655330&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 Sun May 11 09:01:10 2008
@@ -110,6 +110,9 @@
                                    boolean multiple, final 
UnacknowledgedMessageMap unacknowledgedMessageMap)
             throws AMQException
     {
+
+        final boolean debug = _log.isDebugEnabled();
+        ;
         if (multiple)
         {
             if (deliveryTag == 0)
@@ -123,11 +126,14 @@
                 {
                     public boolean callback(final long deliveryTag, QueueEntry 
message) throws AMQException
                     {
-                        if (_log.isDebugEnabled())
+                        if (debug)
                         {
                             _log.debug("Discarding message: " + 
message.getMessage().getMessageId());
                         }
-
+                        if(message.getMessage().isPersistent())
+                        {
+                            beginTranIfNecessary();
+                        }
                         message.restoreCredit();
                         //Message has been ack so discard it. This will 
dequeue and decrement the reference.
                         message.discard(_storeContext);
@@ -152,11 +158,14 @@
                 unacknowledgedMessageMap.drainTo(acked, deliveryTag);
                 for (QueueEntry msg : acked)
                 {
-                        if (_log.isDebugEnabled())
+                        if (debug)
                         {
                             _log.debug("Discarding message: " + 
msg.getMessage().getMessageId());
                         }
-
+                        if(msg.getMessage().isPersistent())
+                        {
+                            beginTranIfNecessary();
+                        }
 
                         //Message has been ack so discard it. This will 
dequeue and decrement the reference.
                         msg.discard(_storeContext);
@@ -176,20 +185,29 @@
                                        _channel.getChannelId());
             }
 
-            if (_log.isDebugEnabled())
+            if (debug)
             {
                 _log.debug("Discarding message: " + 
msg.getMessage().getMessageId());
             }
+            if(msg.getMessage().isPersistent())
+            {
+                beginTranIfNecessary();
+            }
 
             //Message has been ack so discard it. This will dequeue and 
decrement the reference.
             msg.discard(_storeContext);
 
-            if (_log.isDebugEnabled())
+            if (debug)
             {
                 _log.debug("Received non-multiple ack for messaging with 
delivery tag " + deliveryTag + " msg id " +
                            msg.getMessage().getMessageId());
             }
         }
+        if(_inTran)
+        {
+            _messageStore.commitTran(_storeContext);
+            _inTran = false;
+        }
     }
 
     public void messageFullyReceived(boolean persistent) throws AMQException


Reply via email to