Author: steshaw
Date: Tue Nov 28 15:37:52 2006
New Revision: 480283

URL: http://svn.apache.org/viewvc?view=rev&rev=480283
Log:
QPID-135 Ported enough transaction support to run FailoverTxTest. Still has 
same problem as the Java client in that on fail-over the "transaction" 
continues but the earlier part of the transaction is forgotten.

Modified:
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
    incubator/qpid/trunk/qpid/dotnet/TODO.txt

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs?view=diff&rev=480283&r1=480282&r2=480283
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs 
(original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs 
Tue Nov 28 15:37:52 2006
@@ -59,32 +59,34 @@
             _log.Info("connectionInfo = " + connectionInfo);
             _log.Info("connection.asUrl = " + _connection.toURL());
 
-            IChannel channel = _connection.CreateChannel(false, 
AcknowledgeMode.NoAcknowledge);
+            IChannel receivingChannel = _connection.CreateChannel(false, 
AcknowledgeMode.NoAcknowledge);
 
-            string queueName = channel.GenerateUniqueName();
+            string queueName = receivingChannel.GenerateUniqueName();
 
             // Queue.Declare
-            channel.DeclareQueue(queueName, false, true, true);
+            receivingChannel.DeclareQueue(queueName, false, true, true);
 
             // No need to call Queue.Bind as automatically bound to default 
direct exchange.
-            channel.Bind(queueName, "amq.direct", queueName);
+            receivingChannel.Bind(queueName, "amq.direct", queueName);
 
-            channel.CreateConsumerBuilder(queueName).Create().OnMessage = new 
MessageReceivedDelegate(onMessage);
+            
receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new 
MessageReceivedDelegate(onMessage);
 
             _connection.Start();
 
-            sendInTx(queueName);
+            publishInTx(queueName);
+
+            Thread.Sleep(2000); // Wait a while for last messages.
 
             _connection.Close();
             _log.Info("FailoverTxText complete");
         }
 
-        private void sendInTx(string routingKey)
+        private void publishInTx(string routingKey)
         {
             _log.Info("sendInTx");
-            bool transacted = false;
-            IChannel channel = _connection.CreateChannel(transacted, 
AcknowledgeMode.NoAcknowledge);
-            IMessagePublisher publisher = channel.CreatePublisherBuilder()
+            bool transacted = true;
+            IChannel publishingChannel = _connection.CreateChannel(transacted, 
AcknowledgeMode.NoAcknowledge);
+            IMessagePublisher publisher = 
publishingChannel.CreatePublisherBuilder()
                 .withRoutingKey(routingKey)
                 .Create();
 
@@ -92,12 +94,12 @@
             {
                 for (int j = 1; j <= NUM_MESSAGES; ++j)
                 {
-                    ITextMessage msg = channel.CreateTextMessage("Tx=" + i + " 
msg=" + j);
+                    ITextMessage msg = 
publishingChannel.CreateTextMessage("Tx=" + i + " msg=" + j);
                     _log.Info("sending message = " + msg.Text);
                     publisher.Send(msg);
                     Thread.Sleep(SLEEP_MILLIS);
                 }
-                if (transacted) channel.Commit();
+                if (transacted) publishingChannel.Commit();
             }
         }
 

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=480283&r1=480282&r2=480283
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Tue Nov 
28 15:37:52 2006
@@ -276,20 +276,23 @@
             CheckNotClosed();
             CheckTransacted(); // throws IllegalOperationException if not a 
transacted session
 
-            /*Channel.Commit frame = new Channel.Commit();
-            frame.channelId = _channelId;
-            frame.confirmTag = 1;*/
+            try
+            {
+                // Acknowledge up to message last delivered (if any) for each 
consumer.
+                // Need to send ack for messages delivered to consumers so far.
+                foreach (BasicMessageConsumer consumer  in _consumers.Values)
+                {
+                    // Sends acknowledgement to server.
+                    consumer.AcknowledgeLastDelivered();
+                }
 
-            //        try
-            //        {
-            //            
_connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new 
ChannelReplyListener(_channelId));
-            //        }
-            //        catch (AMQException e)
-            //        {
-            //            throw new JMSException("Error creating session: " + 
e);
-            //        }
-            throw new NotImplementedException();
-            //_logger.Info("Transaction commited on channel " + _channelId);
+                // Commits outstanding messages sent and outstanding 
acknowledgements.
+                
_connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId),
 typeof(TxCommitOkBody));
+            }
+            catch (AMQException e)
+            {
+                throw new QpidException("Failed to commit", e);
+            }
         }
 
         public void Rollback()
@@ -977,6 +980,27 @@
                 throw new NotImplementedException("Don't use nowait=false with 
DeclareExchange");
 //                
_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof 
(ExchangeDeclareOkBody));
             }
+        }
+
+        /**
+         * Acknowledge a message or several messages. This method can be 
called via AbstractJMSMessage or from
+         * a BasicConsumer. The former where the mode is CLIENT_ACK and the 
latter where the mode is
+         * AUTO_ACK or similar.
+         *
+         * @param deliveryTag the tag of the last message to be acknowledged
+         * @param multiple    if true will acknowledge all messages up to and 
including the one specified by the
+         *                    delivery tag
+         */
+        public void AcknowledgeMessage(long deliveryTag, bool multiple)
+        {
+            // XXX: cast to ulong evil?
+            AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, 
(ulong)deliveryTag, multiple);
+            if (_logger.IsDebugEnabled)
+            {
+                _logger.Debug("Sending ack for delivery tag " + deliveryTag + 
" on channel " + _channelId);
+            }
+            // FIXME: lock FailoverMutex here?
+            _connection.ProtocolWriter.Write(ackFrame);
         }
     }
 }

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?view=diff&rev=480283&r1=480282&r2=480283
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs 
Tue Nov 28 15:37:52 2006
@@ -98,6 +98,11 @@
 
         private AmqChannel _channel;
 
+        /// <summary>
+        /// Tag of last message delievered, whoch should be acknowledged on 
commit in transaction mode.
+        /// </summary>
+        private long _lastDeliveryTag;
+
         public BasicMessageConsumer(ushort channelId, string queueName, bool 
noLocal,
                                     MessageFactoryRegistry messageFactory, 
AmqChannel channel)
         {
@@ -167,7 +172,13 @@
                 {
                     o = _synchronousQueue.DequeueBlocking();
                 }
-                return ReturnMessageOrThrow(o);
+
+                IMessage m = ReturnMessageOrThrow(o);
+                if (m != null)
+                {
+                    PostDeliver(m);
+                }
+                return m;
             }
             finally
             {
@@ -222,7 +233,7 @@
         /// <returns> a message only if o is a Message</returns>
         /// <exception>JMSException if the argument is a throwable. If it is a 
QpidMessagingException it is rethrown as is, but if not
         /// a QpidMessagingException is created with the linked exception set 
appropriately</exception>
-        private IMessage ReturnMessageOrThrow(object o)                
+        private IMessage ReturnMessageOrThrow(object o)
         {
             // errors are passed via the queue too since there is no way of 
interrupting the poll() via the API.
             if (o is Exception)
@@ -397,5 +408,49 @@
         {
             get { return _queueName; }
         }
+
+        /// <summary>
+        /// Acknowledge up to last message delivered (if any). Used when 
commiting.
+        /// </summary>
+        internal void AcknowledgeLastDelivered()
+        {
+            if (_lastDeliveryTag > 0)
+            {
+                _channel.AcknowledgeMessage(_lastDeliveryTag, true);
+                _lastDeliveryTag = -1;
+            }
+        }
+
+        private void PostDeliver(IMessage m)
+        {
+            AbstractQmsMessage msg = (AbstractQmsMessage) m;
+            switch (_acknowledgeMode)
+            {
+/* TODO
+                case AcknowledgeMode.DupsOkAcknowledge:
+                    if (++_outstanding >= _prefetchHigh)
+                    {
+                        _dups_ok_acknowledge_send = true;
+                    }
+                    if (_outstanding <= _prefetchLow)
+                    {
+                        _dups_ok_acknowledge_send = false;
+                    }
+
+                    if (_dups_ok_acknowledge_send)
+                    {
+                        _channel.AcknowledgeMessage(msg.getDeliveryTag(), 
true);
+                    }
+                    break;
+ */
+                case AcknowledgeMode.AutoAcknowledge:
+                    _channel.AcknowledgeMessage(msg.DeliveryTag, false);
+                    break;
+                case AcknowledgeMode.SessionTransacted:
+                    _lastDeliveryTag = msg.DeliveryTag;
+                    break;
+            }
+        }
+
     }
 }

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs?view=diff&rev=480283&r1=480282&r2=480283
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs 
Tue Nov 28 15:37:52 2006
@@ -31,22 +31,27 @@
         /// </summary>
         protected AmqChannel _channel;
 
-        public AMQMessage(IContentHeaderProperties properties)
+        private long _deliveryTag;
+
+        public AMQMessage(IContentHeaderProperties properties, long 
deliveryTag)
         {
             _contentHeaderProperties = properties;
+            _deliveryTag = deliveryTag;
         }
 
-        public AmqChannel Channel
+        public AMQMessage(IContentHeaderProperties properties) : 
this(properties, -1)
+        {
+        }
+
+        public long DeliveryTag
         {
-            get
-            {
-                return _channel;
-            }
+            get { return _deliveryTag; }
+        }
 
-            set
-            {
-                _channel = value;
-            }
+        public AmqChannel Channel
+        {
+            get { return _channel; }
+            set { _channel = value; }
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/dotnet/TODO.txt
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/TODO.txt?view=diff&rev=480283&r1=480282&r2=480283
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/TODO.txt (original)
+++ incubator/qpid/trunk/qpid/dotnet/TODO.txt Tue Nov 28 15:37:52 2006
@@ -1,9 +1,4 @@
 
-https://issues.apache.org/jira/browse/QPID-134
-* Failover.
-  * Review new API methods for fail over requirements.
-    i.e. lock on mutex for non-blocking methods, FailoverSupport (for blocking 
methods)
-
 https://issues.apache.org/jira/browse/QPID-135
 * transactions Tx.Select and Tx.Commit
   * Do the TxSelect message after opening a transactional channel


Reply via email to