Author: rupertlssmith
Date: Mon Feb 11 06:50:18 2008
New Revision: 620496

URL: http://svn.apache.org/viewvc?rev=620496&view=rev
Log:
QPID-729 : Added explicit list of unacked messages, acked on commit, rejected 
on roll-back.

Modified:
    incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs
    
incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs

Modified: incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=620496&r1=620495&r2=620496&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs 
(original)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs Mon 
Feb 11 06:50:18 2008
@@ -457,7 +457,7 @@
                 foreach (BasicMessageConsumer consumer  in _consumers.Values)
                 {
                     // Sends acknowledgement to server.
-                    consumer.AcknowledgeLastDelivered();
+                    consumer.AcknowledgeDelivered();
                 }
 
                 // Commits outstanding messages sent and outstanding 
acknowledgements.
@@ -485,13 +485,16 @@
                     {
                         Suspend(true);
                     }
-                 
-                    // todo: rollback dispatcher when TX support is added
-                    //if ( _dispatcher != null )
-                    //   _dispatcher.Rollback();
 
-                    _connection.ConvenientProtocolWriter.SyncWrite(
-                                                                   
TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+                    // Reject up to message last delivered (if any) for each 
consumer.
+                    // Need to send reject for messages delivered to consumers 
so far.
+                    foreach (BasicMessageConsumer consumer  in 
_consumers.Values)
+                    {
+                        // Sends acknowledgement to server.
+                        consumer.RejectUnacked();
+                    }
+
+                    
_connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId),
 typeof(TxRollbackOkBody));
 
                     if ( !suspended )
                     {
@@ -1012,6 +1015,15 @@
             _connection.ProtocolWriter.Write(ackFrame);
         }
 
+        public void RejectMessage(ulong deliveryTag, bool requeue)
+        {
+            if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || 
(_acknowledgeMode == AcknowledgeMode.SessionTransacted))
+            {
+                AMQFrame rejectFrame = 
BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue);
+                _connection.ProtocolWriter.Write(rejectFrame);
+            }
+        }
+        
         /// <summary>
         /// Handle a message that bounced from the server, creating
         /// the corresponding exception and notifying the connection about it
@@ -1104,8 +1116,8 @@
         /// Placing stop check after consume may also be wrong as it may cause 
a message to be thrown away. Seems more correct to use interupt on 
         /// the block thread to cause it to prematurely return from its wait, 
whereupon it can be made to re-check the stop flag.</remarks>
         ///
-        /// <remarks>Exception swalled, if there is an exception whilst 
notifying the connection on bounced messages. Unhandled excetpion should
-        /// fall through and termiante the loop, as it is a bug if it 
occurrs.</remarks>
+        /// <remarks>Exception swallowed, if there is an exception whilst 
notifying the connection on bounced messages. Unhandled excetpion should
+        /// fall through and terminate the loop, as it is a bug if it 
occurrs.</remarks>
         private class Dispatcher
         {            
             /// <summary> Flag used to indicate when this dispatcher is to be 
stopped (0=go, 1=stop). </summary>

Modified: 
incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?rev=620496&r1=620495&r2=620496&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs 
(original)
+++ 
incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs 
Mon Feb 11 06:50:18 2008
@@ -20,6 +20,8 @@
  */
 using System;
 using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
 using log4net;
 using Apache.Qpid.Client.Message;
 using Apache.Qpid.Collections;
@@ -106,10 +108,15 @@
 
         private AmqChannel _channel;
 
+        // <summary>
+        // Tag of last message delievered, whoch should be acknowledged on 
commit in transaction mode.
+        // </summary>
+        //private long _lastDeliveryTag;
+
         /// <summary>
-        /// Tag of last message delievered, whoch should be acknowledged on 
commit in transaction mode.
+        /// Explicit list of all received but un-acked messages in a 
transaction. Used to ensure acking is completed when transaction is committed.
         /// </summary>
-        private long _lastDeliveryTag;
+        private LinkedList<long> _receivedDeliveryTags;
 
         /// <summary>
         /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
@@ -135,6 +142,11 @@
             _prefetchHigh = prefetchHigh;
             _prefetchLow = prefetchLow;
             _exclusive = exclusive;
+
+            if (_acknowledgeMode == AcknowledgeMode.SessionTransacted)
+            {
+                _receivedDeliveryTags = new LinkedList<long>();
+            }
         }
 
         #region IMessageConsumer Members
@@ -391,13 +403,24 @@
         /// <summary>
         /// Acknowledge up to last message delivered (if any). Used when 
commiting.
         /// </summary>
-        internal void AcknowledgeLastDelivered()
+        internal void AcknowledgeDelivered()
         {
-            if (_lastDeliveryTag > 0)
+            foreach (long tag in _receivedDeliveryTags)
             {
-                _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // 
XXX evil cast
-                _lastDeliveryTag = -1;
+                _channel.AcknowledgeMessage((ulong)tag, false);
             }
+
+            _receivedDeliveryTags.Clear();
+        }
+
+        internal void RejectUnacked()
+        {
+            foreach (long tag in _receivedDeliveryTags)
+            {
+                _channel.RejectMessage((ulong)tag, true);
+            }
+
+            _receivedDeliveryTags.Clear();
         }
 
         private void PreDeliver(AbstractQmsMessage msg)
@@ -442,7 +465,7 @@
                     break;
                 
                 case AcknowledgeMode.SessionTransacted:
-                    _lastDeliveryTag = msg.DeliveryTag;
+                    _receivedDeliveryTags.AddLast(msg.DeliveryTag);
                     break;
             }
         }


Reply via email to