Author: aidan
Date: Wed May  7 06:25:38 2008
New Revision: 654097

URL: http://svn.apache.org/viewvc?rev=654097&view=rev
Log:
QPID-952, QPID-951, QPID-1032 Fix failover, ensure that it is properly 
detected, that frames are replayed approrpiately and that failover does not 
timeout.

Modified:
    incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AMQConnection.cs
    incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
    
incubator/qpid/branches/M2.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs

Modified: 
incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AMQConnection.cs?rev=654097&r1=654096&r2=654097&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AMQConnection.cs 
(original)
+++ incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AMQConnection.cs Wed 
May  7 06:25:38 2008
@@ -69,7 +69,7 @@
 
         internal bool IsFailoverAllowed
         {
-            get { return _failoverPolicy.FailoverAllowed(); }
+            get { if(!_connected) return false; else return 
_failoverPolicy.FailoverAllowed(); }
         }
 
         /// <summary>
@@ -151,34 +151,22 @@
                     _log.Error("Unable to connect to broker " + 
_failoverPolicy.GetCurrentBrokerInfo(), e);
                     // XXX: Should perhaps break out of the do/while here if 
not a SocketException...
                 }
-            } while (_failoverPolicy.FailoverAllowed());
+            } while (!_connected && _failoverPolicy.FailoverAllowed());
 
             _log.Debug("Are we connected:" + _connected);
-            
-            if (!_failoverPolicy.FailoverAllowed())
-            {
-                if ( lastException is AMQException )
-                   throw lastException;
-                else
-                   throw new AMQConnectionException("Unable to connect", 
lastException);
-            }
 
-            // TODO: this needs to be redone so that we are not spinning.
-            // A suitable object should be set that is then waited on
-            // and only notified when a connection is made or when
-            // the AMQConnection gets closed.
-            while (!_connected && !Closed)
+            if (!_connected)
             {
-                _log.Debug("Sleeping.");
-                Thread.Sleep(100);
-            }
-            if (!_failoverPolicy.FailoverAllowed() || 
_failoverPolicy.GetCurrentBrokerInfo() == null)
-            {
-                if (_lastAMQException != null)
-                {
-                    throw _lastAMQException;
-                }
+               if ( lastException is AMQException )
+               {
+                       throw lastException;
+               }
+               else
+               {
+                       throw new AMQConnectionException("Unable to connect", 
lastException);
+               }
             }
+            
         }
 
         /*private ITransport LoadTransportFromAssembly(string host, int port, 
String assemblyName, String transportType)

Modified: incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=654097&r1=654096&r2=654097&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs 
(original)
+++ incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs Wed 
May  7 06:25:38 2008
@@ -907,12 +907,16 @@
             AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
                                                               queueName, 
exchangeName,
                                                               routingKey, 
false, args);
-            _replayFrames.Add(queueBind);
+            
 
             lock (_connection.FailoverMutex)
             {
                 _connection.ConvenientProtocolWriter.SyncWrite(queueBind, 
typeof(QueueBindOkBody));
             }
+            // AS FIXME: wasnae me
+            _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0,
+                                                           queueName, 
exchangeName,
+                                                           routingKey, true, 
args));
         }
 
         private String ConsumeFromQueue(String queueName, bool noLocal, bool 
exclusive, AcknowledgeMode acknowledgeMode, String tag)
@@ -936,9 +940,7 @@
                 _logger.Debug(string.Format("DeleteQueue name={0}", 
queueName));
                 
                 AMQFrame queueDelete = 
QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, 
noWait);
-
-                _replayFrames.Add(queueDelete);
-
+                
                 if (noWait)
                 {
                     _connection.ProtocolWriter.Write(queueDelete);
@@ -947,6 +949,8 @@
                 {
                     
_connection.ConvenientProtocolWriter.SyncWrite(queueDelete, 
typeof(QueueDeleteOkBody));
                 }
+                // AS FIXME: wasnae me
+                _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 
0, queueName, ifUnused, ifEmpty, true));
             }
             catch (AMQException)
             {
@@ -962,12 +966,14 @@
             AMQFrame queueDeclare = 
QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, 
isExclusive,
                                                                     
isAutoDelete, false, null);
 
-            _replayFrames.Add(queueDeclare);
 
             lock (_connection.FailoverMutex)
             {
                 _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, 
typeof(QueueDeclareOkBody));
             }
+            // AS FIXME: wasnae me
+            _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, 
queueName, false, isDurable, isExclusive,
+                                                                    
isAutoDelete, true, null));
         }
 
         // AMQP-level method.
@@ -980,8 +986,6 @@
 
             AMQFrame declareExchange = 
ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, 
exchangeClass, passive, 
                                                                           
durable, autoDelete, xinternal, noWait, args);
-
-            _replayFrames.Add(declareExchange);
             
             if (noWait)
             {
@@ -989,6 +993,8 @@
                 {
                     _connection.ProtocolWriter.Write(declareExchange);
                 }
+                // AS FIXME: wasnae me
+               _replayFrames.Add(declareExchange);
             }
             else
             {

Modified: 
incubator/qpid/branches/M2.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs?rev=654097&r1=654096&r2=654097&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs
 (original)
+++ 
incubator/qpid/branches/M2.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs
 Wed May  7 06:25:38 2008
@@ -1,320 +1,397 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Runtime.InteropServices;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-using Apache.Qpid.Messaging;
-
-namespace Apache.Qpid.Integration.Tests.interactive
-{
-    [TestFixture, Category("Interactive")]
-    public class FailoverTest : IConnectionListener
-    {
-        private static readonly ILog _log = 
LogManager.GetLogger(typeof(FailoverTest));
-
-        /// <summary>Specifies the number of times to run the test 
cycle.</summary>
-        const int NUM_MESSAGES = 10;
-
-        /// <summary>Determines how many messages to send within each 
commit.</summary>
-        const int COMMIT_BATCH_SIZE = 1;
-
-        /// <summary>Specifies the duration of the pause to place between each 
message sent in the test.</summary>
-        //const int SLEEP_MILLIS = 1;
-
-        /// <summary>Specified the maximum time in milliseconds to wait for 
the test to complete.</summary>
-        const int TIMEOUT = 10000;
-
-        /// <summary>Defines the number of test messages to send, before 
prompting the user to fail a broker.</summary>
-        const int FAIL_POINT = 5;
-
-        /// <summary>Specified the ack mode to use for the test.</summary>
-        AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
-
-        /// <summary>Determines whether this test runs transactionally or not. 
</summary>
-        bool transacted = false;
-
-        /// <summary>Holds the connection to run the test over.</summary>
-        AMQConnection _connection;
-
-        /// <summary>Holds the channel for the test message publisher. 
</summary>
-        IChannel publishingChannel;
-
-        /// <summary>Holds the test message publisher. </summary>
-        IMessagePublisher publisher;
-
-        /// <summary>Used to keep count of the number of messages sent. 
</summary>
-        int messagesSent;
-
-        /// <summary>Used to keep count of the number of messages received. 
</summary>
-        int messagesReceived;
-
-        /// <summary>Used to wait for test completion on. </summary>
-        private static object testComplete = new Object();
-
-        /// <summary>
-        /// Creates the test connection with a fail-over set up, and a 
producer/consumer pair on that connection.
-        /// </summary>
-        /// [SetUp]
-        public void Init(IConnectionInfo connectionInfo)
-        {
-            // Reset all counts.
-            messagesSent = 0;
-            messagesReceived = 0;
-
-            // Create a connection for the test.
-            _connection = new AMQConnection(connectionInfo);
-            _connection.ConnectionListener = this;
-
-            // Create a consumer to receive the test messages.
-            IChannel receivingChannel = _connection.CreateChannel(false, 
_acknowledgeMode);
-
-            string queueName = receivingChannel.GenerateUniqueName();
-            receivingChannel.DeclareQueue(queueName, false, true, true);
-            receivingChannel.Bind(queueName, "amq.direct", queueName);
-
-            IMessageConsumer consumer = 
receivingChannel.CreateConsumerBuilder(queueName)
-                .WithPrefetchLow(30)
-                .WithPrefetchHigh(60).Create();
-
-            consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
-            _connection.Start();
-
-            // Create a publisher to send the test messages.
-            publishingChannel = _connection.CreateChannel(transacted, 
AcknowledgeMode.NoAcknowledge);
-            publisher = publishingChannel.CreatePublisherBuilder()
-                .WithRoutingKey(queueName)
-                .Create();
-
-            _log.Debug("connection = " + _connection);
-            _log.Debug("connectionInfo = " + connectionInfo);
-            _log.Debug("connection.AsUrl = " + _connection.toURL());
-            _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
-        }
-
-        /// <summary>
-        /// Clean up the test connection.
-        /// </summary>
-        [TearDown]
-        public virtual void Shutdown()
-        {
-            Thread.Sleep(2000);
-            _connection.Close();
-        }
-
-        /// <summary>
-        /// Runs a failover test, building up the connection information from 
its component parts. In particular the brokers
-        /// to fail between are seperately added into the connection info.
-        /// </summary>
-        /*[Test]
-        public void TestWithBasicInfo()
-        {
-            _log.Debug("public void TestWithBasicInfo(): called");
-
-            // Manually create the connection parameters.
-            QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
-            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", 
"localhost", 5672, false));
-            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", 
"localhost", 5673, false));
-
-            Init(connectionInfo);
-            DoFailoverTest();
-        }*/
-
-        /// <summary>
-        /// Runs a failover test, with the failover configuration specified in 
the Qpid connection URL format.
-        /// </summary>
-        [Test]
-        public void TestWithUrl()
-        {
-            _log.Debug("public void runTestWithUrl(): called");
-
-            // Parse the connection parameters from a URL.
-            String clientId = "failover" + DateTime.Now.Ticks;
-            string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
-                
"?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
            
-            IConnectionInfo connectionInfo = 
QpidConnectionInfo.FromUrl(defaultUrl);
-            
-            Init(connectionInfo);
-            DoFailoverTest();
-        }
-
-        /// <summary>
-        /// Send the test messages, prompting at the fail point for the user 
to cause a broker failure. The test checks that all messages sent
-        /// are received within the test time limit.
-        /// </summary>
-        ///
-        /// <param name="connectionInfo">The connection parameters, specifying 
the brokers to fail between.</param>
-        void DoFailoverTest()
-        {
-            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): 
called");
-
-            for (int i = 1; i <= NUM_MESSAGES; ++i)
-            {
-                ITextMessage msg = 
publishingChannel.CreateTextMessage("message=" + messagesSent);
-                //_log.Debug("sending message = " + msg.Text);
-                publisher.Send(msg);
-                messagesSent++;
-
-                _log.Debug("messagesSent = " + messagesSent);
-
-                if (transacted)
-                {
-                    publishingChannel.Commit();
-                }
-
-                // Prompt the user to cause a failure if at the fail point.
-                if (i == FAIL_POINT)
-                {
-                    PromptAndWait("Cause a broker failure now, then press 
return...");
-                }
-
-                //Thread.Sleep(SLEEP_MILLIS);               
-            }
-
-            // Wait for all of the test messages to be received, checking that 
this occurs within the test time limit.
-            bool withinTimeout;
-
-            lock(testComplete)
-            {
-                withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
-            }            
-
-            if (!withinTimeout)
-            {
-                Assert.Fail("Test timed out, before all messages received.");
-            }
-
-            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): 
exiting");
-        }
-
-        /// <summary>
-        /// Receives all of the test messages.
-        /// </summary>
-        ///
-        /// <param name="message">The newly arrived test message.</param>
-        public void OnMessage(IMessage message)
-        {
-            try
-            {
-                if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
-                {
-                    message.Acknowledge();
-                }
-
-                messagesReceived++;
-
-                _log.Debug("messagesReceived = " + messagesReceived);
-
-                // Check if all of the messages in the test have been 
received, in which case notify the message producer that the test has 
-                // succesfully completed.
-                if (messagesReceived == NUM_MESSAGES)
-                {
-                    lock (testComplete)
-                    {
-                        Monitor.Pulse(testComplete);
-                    }
-                }
-            }
-            catch (QpidException e)
-            {
-                _log.Fatal("Exception received. About to stop.", e);
-                Stop();
-            }
-        }
-
-        /// <summary>Prompts the user on stdout and waits for a reply on 
stdin, using the specified prompt message.</summary>
-        ///
-        /// <param name="message">The message to prompt the user with.</param>
-        private void PromptAndWait(string message)
-        {
-            Console.WriteLine("\n" + message);
-            Console.ReadLine();
-        }
-
-        // <summary>Closes the test connection.</summary>
-        private void Stop()
-        {
-            _log.Debug("Stopping...");
-            try
-            {
-                _connection.Close();
-            }
-            catch (QpidException e)
-            {
-                _log.Debug("Failed to shutdown: ", e);
-            }
-        }
-
-        /// <summary>
-        /// Called when bytes have been transmitted to the server
-        /// </summary>
-        ///
-        /// <param>count the number of bytes sent in total since the 
connection was opened</param>     
-        public void BytesSent(long count) {}
-
-        /// <summary>
-        /// Called when some bytes have been received on a connection
-        /// </summary>
-        ///
-        /// <param>count the number of bytes received in total since the 
connection was opened</param>         
-        public void BytesReceived(long count) {}
-
-        /// <summary>
-        /// Called after the infrastructure has detected that failover is 
required but before attempting failover.
-        /// </summary>
-        ///
-        /// <param>redirect true if the broker requested redirect. false if 
failover is occurring due to a connection error.</param>
-        ///
-        /// <return>true to continue failing over, false to veto failover and 
raise a connection exception</return>         
-        public bool PreFailover(bool redirect) 
-        {
-            _log.Debug("public bool PreFailover(bool redirect): called");
-            return true; 
-        }
-
-        /// <summary>
-        /// Called after connection has been made to another broker after 
failover has been started but before
-        /// any resubscription has been done.
-        /// </summary>
-        ///
-        /// <return> true to continue with resubscription, false to prevent 
automatic resubscription. This is useful in
-        /// cases where the application wants to handle resubscription. Note 
that in the latter case all sessions, producers
-        /// and consumers are invalidated.
-        /// </return>
-        public bool PreResubscribe() 
-        {
-            _log.Debug("public bool PreResubscribe(): called");
-            return true; 
-        }
-
-        /// <summary>
-        /// Called once failover has completed successfully. This is called 
irrespective of whether the client has
-        /// vetoed automatic resubscription.
-        /// </summary>
-        public void FailoverComplete() 
-        {
-            _log.Debug("public void FailoverComplete(): called");
-        }
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.interactive
+{
+    [TestFixture, Category("Interactive")]
+    public class FailoverTest : IConnectionListener
+    {
+        private static readonly ILog _log = 
LogManager.GetLogger(typeof(FailoverTest));
+
+        /// <summary>Specifies the number of times to run the test 
cycle.</summary>
+        const int NUM_MESSAGES = 10;
+
+        /// <summary>Determines how many messages to send within each 
commit.</summary>
+        const int COMMIT_BATCH_SIZE = 1;
+
+        /// <summary>Specifies the duration of the pause to place between each 
message sent in the test.</summary>
+        //const int SLEEP_MILLIS = 1;
+
+        /// <summary>Specified the maximum time in milliseconds to wait for 
the test to complete.</summary>
+        const int TIMEOUT = 10000;
+
+        /// <summary>Defines the number of test messages to send, before 
prompting the user to fail a broker.</summary>
+        const int FAIL_POINT = 5;
+
+        /// <summary>Specified the ack mode to use for the test.</summary>
+        AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
+
+        /// <summary>Determines whether this test runs transactionally or not. 
</summary>
+        bool transacted = false;
+
+        /// <summary>Holds the connection to run the test over.</summary>
+        AMQConnection _connection;
+
+        /// <summary>Holds the channel for the test message publisher. 
</summary>
+        IChannel publishingChannel;
+
+        /// <summary>Holds the test message publisher. </summary>
+        IMessagePublisher publisher;
+
+        /// <summary>Used to keep count of the number of messages sent. 
</summary>
+        int messagesSent;
+
+        /// <summary>Used to keep count of the number of messages received. 
</summary>
+        int messagesReceived;
+
+        /// <summary>Used to wait for test completion on. </summary>
+        private static object testComplete = new Object();
+
+        /// <summary>Used to wait for failover completion on. </summary>
+       private static object failoverComplete = new Object();
+       
+        bool failedOver=false;
+
+        /// <summary>Used to record the extra message count (1) if the message 
sent right after failover actually made it to the new broker.</summary>
+        int _extraMessage = 0;
+       
+        /// <summary>
+        /// Creates the test connection with a fail-over set up, and a 
producer/consumer pair on that connection.
+        /// </summary>
+        /// [SetUp]
+        public void Init(IConnectionInfo connectionInfo)
+        {
+           //log4net.Config.BasicConfigurator.Configure();
+            // Reset all counts.
+            messagesSent = 0;
+            messagesReceived = 0;
+            failedOver=false;
+            _extraMessage = 0;
+
+           PromptAndWait("Ensure both brokers are running, then press Enter"); 
    
+           
+            // Create a connection for the test.
+            _connection = new AMQConnection(connectionInfo);
+            _connection.ConnectionListener = this;
+
+            // Create a consumer to receive the test messages.
+            IChannel receivingChannel = _connection.CreateChannel(false, 
_acknowledgeMode);
+
+            string queueName = receivingChannel.GenerateUniqueName();
+            receivingChannel.DeclareQueue(queueName, false, true, true);
+            receivingChannel.Bind(queueName, "amq.direct", queueName);
+
+            IMessageConsumer consumer = 
receivingChannel.CreateConsumerBuilder(queueName)
+                .WithPrefetchLow(30)
+                .WithPrefetchHigh(60).Create();
+
+            consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+            _connection.Start();
+
+            // Create a publisher to send the test messages.
+            publishingChannel = _connection.CreateChannel(transacted, 
AcknowledgeMode.NoAcknowledge);
+            publisher = publishingChannel.CreatePublisherBuilder()
+                .WithRoutingKey(queueName)
+                .Create();
+
+            _log.Debug("connection = " + _connection);
+            _log.Debug("connectionInfo = " + connectionInfo);
+            _log.Debug("connection.AsUrl = " + _connection.toURL());
+            _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
+        }
+
+        /// <summary>
+        /// Clean up the test connection.
+        /// </summary>
+        [TearDown]
+        public virtual void Shutdown()
+        {
+           if (!failedOver)
+           {
+                 Assert.Fail("The failover callback never occured.");
+            }
+
+            Console.WriteLine("Test done shutting down");
+           Thread.Sleep(2000);
+            _connection.Close();
+        }
+
+        /// <summary>
+        /// Runs a failover test, building up the connection information from 
its component parts. In particular the brokers
+        /// to fail between are seperately added into the connection info.
+        /// </summary>
+        /*[Test]
+        public void TestWithBasicInfo()
+        {
+            _log.Debug("public void TestWithBasicInfo(): called");
+
+            // Manually create the connection parameters.
+            QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", 
"localhost", 5672, false));
+            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", 
"localhost", 5673, false));
+
+            Init(connectionInfo);
+            DoFailoverTest();
+        }*/
+
+        /// <summary>
+        /// Runs a failover test, with the failover configuration specified in 
the Qpid connection URL format.
+        /// </summary>
+        [Test]
+        public void TestWithUrl()
+        {
+            _log.Debug("public void runTestWithUrl(): called");
+
+            // Parse the connection parameters from a URL.
+            String clientId = "failover" + DateTime.Now.Ticks;
+            string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
+                
"?brokerlist='tcp://localhost:9672;tcp://localhost:9673'&failover='roundrobin'";
            
+            IConnectionInfo connectionInfo = 
QpidConnectionInfo.FromUrl(defaultUrl);
+            
+            Init(connectionInfo);
+            DoFailoverTest(0);
+        }
+
+        /// <summary>
+        /// Send the test messages, prompting at the fail point for the user 
to cause a broker failure. The test checks that all messages sent
+        /// are received within the test time limit.
+        /// </summary>
+        ///
+        /// <param name="connectionInfo">The connection parameters, specifying 
the brokers to fail between.</param>
+        void DoFailoverTest(int delay)
+        {
+            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): 
called");
+
+            // Wait for all of the test messages to be received, checking that 
this occurs within the test time limit.
+           bool withinTimeout = false;
+
+            for (int i = 1; i <= NUM_MESSAGES; ++i)
+            {
+               SendMessage();
+
+               // Prompt the user to cause a failure if at the fail point.
+               if (i == FAIL_POINT)
+               {
+                   for( int min = delay ; min > 0 ; min--)
+                   {
+                      Console.WriteLine("Waiting for "+min+" minutes to test 
connection time bug.");
+                      Thread.Sleep(60*1000);
+                   }
+
+                   PromptAndWait("Cause a broker failure now, then press 
return.");
+                   Console.WriteLine("NOTE: ensure that the delay between 
killing the broker and continuing here is less than 20 second");
+                   
+                   Console.WriteLine("Sending a message to ensure send right 
after works");
+
+                   SendMessage();
+
+                   Console.WriteLine("Waiting for fail-over to complete before 
continuing...");
+
+
+                   lock(failoverComplete)
+                   {
+                       if (!failedOver)
+                       {
+                           withinTimeout = Monitor.Wait(failoverComplete, 
TIMEOUT);
+                       }
+                       else
+                       {
+                           withinTimeout=true;
+                       }
+                   }
+
+                   if (!withinTimeout)
+                   {
+                       PromptAndWait("Failover has not yet occured. Press 
enter to give up waiting.");
+                   }
+               }
+           }
+
+            lock(testComplete)
+            {
+                withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
+            }            
+
+            if (!withinTimeout)
+            {
+                Assert.Fail("Test timed out, before all messages received.");
+            }
+
+            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): 
exiting");
+        }
+
+       [Test]
+        public void Test5MinuteWait()
+       {
+           String clientId = "failover" + DateTime.Now.Ticks;
+
+           QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+           connectionInfo.Username = "guest";
+           connectionInfo.Password = "guest";
+           connectionInfo.ClientName = clientId;
+           connectionInfo.VirtualHost = "/test";
+           connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 
9672, false));
+           connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 
9673, false));
+           
+           Init(connectionInfo);
+           DoFailoverTest(5);
+       }
+
+       void SendMessage()
+       {
+           ITextMessage msg = publishingChannel.CreateTextMessage("message=" + 
messagesSent);
+
+           publisher.Send(msg);
+           messagesSent++;
+
+           if (transacted)
+           {
+               publishingChannel.Commit();
+           }
+           
+           Console.WriteLine("messagesSent = " + messagesSent);
+       }
+       
+        /// <summary>
+        /// Receives all of the test messages.
+        /// </summary>
+        ///
+        /// <param name="message">The newly arrived test message.</param>
+        public void OnMessage(IMessage message)
+        {
+            try
+            {
+                if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+                {
+                    message.Acknowledge();
+                }
+
+                messagesReceived++;
+
+                _log.Debug("messagesReceived = " + messagesReceived);
+
+                // Check if all of the messages in the test have been 
received, in which case notify the message producer that the test has 
+                // succesfully completed.
+                if (messagesReceived == NUM_MESSAGES + _extraMessage)
+                {
+                    lock (testComplete)
+                    {
+                       failedOver = true;
+                        Monitor.Pulse(testComplete);
+                    }
+                }
+            }
+            catch (QpidException e)
+            {
+                _log.Fatal("Exception received. About to stop.", e);
+                Stop();
+            }
+        }
+
+        /// <summary>Prompts the user on stdout and waits for a reply on 
stdin, using the specified prompt message.</summary>
+        ///
+        /// <param name="message">The message to prompt the user with.</param>
+        private void PromptAndWait(string message)
+        {
+            Console.WriteLine("\n" + message);
+            Console.ReadLine();
+        }
+
+        // <summary>Closes the test connection.</summary>
+        private void Stop()
+        {
+            _log.Debug("Stopping...");
+            try
+            {
+                _connection.Close();
+            }
+            catch (QpidException e)
+            {
+                _log.Debug("Failed to shutdown: ", e);
+            }
+        }
+
+        /// <summary>
+        /// Called when bytes have been transmitted to the server
+        /// </summary>
+        ///
+        /// <param>count the number of bytes sent in total since the 
connection was opened</param>     
+        public void BytesSent(long count) {}
+
+        /// <summary>
+        /// Called when some bytes have been received on a connection
+        /// </summary>
+        ///
+        /// <param>count the number of bytes received in total since the 
connection was opened</param>         
+        public void BytesReceived(long count) {}
+
+        /// <summary>
+        /// Called after the infrastructure has detected that failover is 
required but before attempting failover.
+        /// </summary>
+        ///
+        /// <param>redirect true if the broker requested redirect. false if 
failover is occurring due to a connection error.</param>
+        ///
+        /// <return>true to continue failing over, false to veto failover and 
raise a connection exception</return>         
+        public bool PreFailover(bool redirect) 
+        {
+            _log.Debug("public bool PreFailover(bool redirect): called");
+            return true; 
+        }
+
+        /// <summary>
+        /// Called after connection has been made to another broker after 
failover has been started but before
+        /// any resubscription has been done.
+        /// </summary>
+        ///
+        /// <return> true to continue with resubscription, false to prevent 
automatic resubscription. This is useful in
+        /// cases where the application wants to handle resubscription. Note 
that in the latter case all sessions, producers
+        /// and consumers are invalidated.
+        /// </return>
+        public bool PreResubscribe() 
+        {
+            _log.Debug("public bool PreResubscribe(): called");
+            return true; 
+        }
+
+        /// <summary>
+        /// Called once failover has completed successfully. This is called 
irrespective of whether the client has
+        /// vetoed automatic resubscription.
+        /// </summary>
+        public void FailoverComplete() 
+        {
+           failedOver = true;
+            _log.Debug("public void FailoverComplete(): called");
+           Console.WriteLine("public void FailoverComplete(): called");
+           lock (failoverComplete) 
+           {
+             Monitor.Pulse(failoverComplete);
+           }
+        }
+    }
+}


Reply via email to