Author: steshaw
Date: Tue Nov 28 11:12:37 2006
New Revision: 480157

URL: http://svn.apache.org/viewvc?view=rev&rev=480157
Log:
Initial changes to record and replay frames on fail over.

Modified:
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs?view=diff&rev=480157&r1=480156&r2=480157
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs 
(original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs 
Tue Nov 28 11:12:37 2006
@@ -33,7 +33,7 @@
     {
         private static readonly ILog _log = 
LogManager.GetLogger(typeof(FailoverTxTest));
 
-        const int NUM_ITERATIONS = 3;
+        const int NUM_ITERATIONS = 10;
         const int NUM_MESSAGES = 10;
         const int SLEEP_MILLIS = 500;
 
@@ -59,17 +59,17 @@
             _log.Info("connectionInfo = " + connectionInfo);
             _log.Info("connection.asUrl = " + _connection.toURL());
 
-            IChannel session = _connection.CreateChannel(false, 
AcknowledgeMode.NoAcknowledge);
+            IChannel channel = _connection.CreateChannel(false, 
AcknowledgeMode.NoAcknowledge);
 
-            string queueName = session.GenerateUniqueName();
+            string queueName = channel.GenerateUniqueName();
 
             // Queue.Declare
-            session.DeclareQueue(queueName, false, true, true);
+            channel.DeclareQueue(queueName, false, true, true);
 
             // No need to call Queue.Bind as automatically bound to default 
direct exchange.
-//            channel.Bind(queueName, exchangeName, routingKey);
+            channel.Bind(queueName, "amq.direct", queueName);
 
-            session.CreateConsumerBuilder(queueName).Create().OnMessage = new 
MessageReceivedDelegate(onMessage);
+            channel.CreateConsumerBuilder(queueName).Create().OnMessage = new 
MessageReceivedDelegate(onMessage);
 
             _connection.Start();
 
@@ -83,8 +83,8 @@
         {
             _log.Info("sendInTx");
             bool transacted = false;
-            IChannel session = _connection.CreateChannel(transacted, 
AcknowledgeMode.NoAcknowledge);
-            IMessagePublisher publisher = session.CreatePublisherBuilder()
+            IChannel channel = _connection.CreateChannel(transacted, 
AcknowledgeMode.NoAcknowledge);
+            IMessagePublisher publisher = channel.CreatePublisherBuilder()
                 .withRoutingKey(routingKey)
                 .Create();
 
@@ -92,12 +92,12 @@
             {
                 for (int j = 1; j <= NUM_MESSAGES; ++j)
                 {
-                    ITextMessage msg = session.CreateTextMessage("Tx=" + i + " 
msg=" + j);
+                    ITextMessage msg = channel.CreateTextMessage("Tx=" + i + " 
msg=" + j);
                     _log.Info("sending message = " + msg.Text);
                     publisher.Send(msg);
                     Thread.Sleep(SLEEP_MILLIS);
                 }
-                if (transacted) session.Commit();
+                if (transacted) channel.Commit();
             }
         }
 

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs?view=diff&rev=480157&r1=480156&r2=480157
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs Tue 
Nov 28 11:12:37 2006
@@ -738,23 +738,25 @@
         }
 
         /**
-         * For all sessions, and for all consumers in those sessions, 
resubscribe. This is called during failover handling.
+         * For all channels, and for all consumers in those sessions, 
resubscribe. This is called during failover handling.
          * The caller must hold the failover mutex before calling this method.
          */
-        public void ResubscribeSessions()
+        public void ResubscribeChannels()
         {
-            ArrayList sessions = new ArrayList(_sessions.Values);
-            _log.Info(String.Format("Resubscribing sessions = {0} 
sessions.size={1}", sessions, sessions.Count));
-            foreach (AmqChannel s in sessions)
+            ArrayList channels = new ArrayList(_sessions.Values);
+            _log.Info(String.Format("Resubscribing sessions = {0} 
sessions.size={1}", channels, channels.Count));
+            foreach (AmqChannel channel in channels)
             {
-                _protocolSession.AddSessionByChannel(s.ChannelId, s);
-                ReopenChannel(s.ChannelId, (ushort)s.DefaultPrefetch, 
s.Transacted);
-                s.Resubscribe();
+                _protocolSession.AddSessionByChannel(channel.ChannelId, 
channel);
+                ReopenChannel(channel.ChannelId, 
(ushort)channel.DefaultPrefetch, channel.Transacted);
+                channel.ReplayOnFailOver();
             }
         }
 
         private void ReopenChannel(ushort channelId, ushort prefetch, bool 
transacted)
         {
+            _log.Info(string.Format("Reopening channel id={0} prefetch={1} 
transacted={2}",
+                channelId, prefetch, transacted));
             try
             {
                 createChannelOverWire(channelId, prefetch, transacted);
@@ -781,7 +783,6 @@
                     typeof (BasicQosOkBody));                
             }
             
-
             if (transacted)
             {
                 if (_log.IsDebugEnabled)

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=480157&r1=480156&r2=480157
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Tue Nov 
28 11:12:37 2006
@@ -71,6 +71,8 @@
         /// </summary>
         private Hashtable _consumers = Hashtable.Synchronized(new Hashtable());
 
+        private ArrayList _replayFrames = new ArrayList();
+
         /// <summary>
         /// The counter of the _next producer id. This id is generated by the 
session and used only to allow the
         /// producer to identify itself to the session when deregistering 
itself.
@@ -709,32 +711,13 @@
          * Resubscribes all producers and consumers. This is called when 
performing failover.
          * @throws AMQException
          */
-        internal void Resubscribe()
-        {
-            ResubscribeProducers();
-            ResubscribeConsumers();
-        }
-
-        private void ResubscribeProducers()
-        {
-            // FIXME: This needs to Replay DeclareExchange method calls.
-            
-//            ArrayList producers = new ArrayList(_producers.Values);
-//            _logger.Debug(String.Format("Resubscribing producers = {0} 
producers.size={1}", producers, producers.Count));
-//            foreach (BasicMessageProducer producer in producers)
-//            {
-//                producer.Resubscribe();
-//            }
-        }
-
-        private void ResubscribeConsumers()
+        internal void ReplayOnFailOver()
         {
-            ArrayList consumers = new ArrayList(_consumers.Values);
-            _consumers.Clear();
-
-            foreach (BasicMessageConsumer consumer in consumers)
+            _logger.Debug(string.Format("Replaying frames for channel {0}", 
_channelId));
+            foreach (AMQFrame frame in _replayFrames)
             {
-                RegisterConsumer(consumer);                
+                _logger.Debug(string.Format("Replaying frame=[{0}]", frame));
+                _connection.ProtocolWriter.Write(frame);
             }
         }
 
@@ -769,6 +752,8 @@
             AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
                                                               queueName, 
exchangeName,
                                                               routingKey, 
true, args);
+            _replayFrames.Add(queueBind);
+
             _connection.ProtocolWriter.Write(queueBind);
         }
 
@@ -945,6 +930,8 @@
                                                                     false, 
isDurable, isExclusive,
                                                                     
isAutoDelete, true, null);
 
+            _replayFrames.Add(queueDeclare);
+
             _connection.ProtocolWriter.Write(queueDeclare);
         }
 
@@ -963,19 +950,18 @@
             _logger.Debug(String.Format("DeclareExchange channelId={0} 
exchangeName={1} exchangeClass={2}",
                                             _channelId, exchangeName, 
exchangeClass));
 
-            AMQFrame exchangeDeclareFrame = ExchangeDeclareBody.CreateAMQFrame(
+            AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(
                 channelId, ticket, exchangeName, exchangeClass, passive, 
durable, autoDelete, xinternal, noWait, args);
+
+            _replayFrames.Add(declareExchange);
             
-            // FIXME: Probably need to record the exchangeDeclareBody for 
later replay.
-            ExchangeDeclareBody exchangeDeclareBody = 
(ExchangeDeclareBody)exchangeDeclareFrame.BodyFrame;
-//            Console.WriteLine(string.Format("XXX 
AMQP:DeclareExchangeBody=[{0}]", exchangeDeclareBody));
-            if (exchangeDeclareBody.Nowait)
+            if (noWait)
             {
-                _connection.ProtocolWriter.Write(exchangeDeclareFrame);
+                _connection.ProtocolWriter.Write(declareExchange);
             }
             else
             {
-                
_connection.ConvenientProtocolWriter.SyncWrite(exchangeDeclareFrame, typeof 
(ExchangeDeclareOkBody));
+                
_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof 
(ExchangeDeclareOkBody));
             }
         }
     }

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs?view=diff&rev=480157&r1=480156&r2=480157
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs 
(original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs 
Tue Nov 28 11:12:37 2006
@@ -121,7 +121,7 @@
                         if (_connection.FirePreResubscribe())
                         {
                             _log.Info("Resubscribing on new connection");
-                            _connection.ResubscribeSessions();
+                            _connection.ResubscribeChannels();
                         }
                         else
                         {


Reply via email to