Author: aconway
Date: Tue Mar  3 20:21:01 2009
New Revision: 749730

URL: http://svn.apache.org/viewvc?rev=749730&view=rev
Log:

cluster::UpdateClient added missing error handling.
Minor improvements to failover_soak tests.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
    qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp
    qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp
    qpid/trunk/qpid/cpp/src/tests/run_failover_soak

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=749730&r1=749729&r2=749730&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Mar  3 20:21:01 
2009
@@ -106,6 +106,16 @@
 // Reserved exchange/queue name for catch-up, avoid clashes with user 
queues/exchanges.
 const std::string UpdateClient::UPDATE("qpid.qpid-update");
 
+void UpdateClient::run() {
+    try {
+        update();
+        done();
+    } catch (const std::exception& e) {
+        failed(e);
+    }
+    delete this;
+}
+
 void UpdateClient::update() {
     QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " 
<< updateeUrl);
     Broker& b = updaterBroker;
@@ -130,16 +140,6 @@
     QPID_LOG(debug,  updaterId << " updated state to " << updateeId << " at " 
<< updateeUrl);
 }
 
-void UpdateClient::run() {
-    try {
-        update();
-        done();
-    } catch (const std::exception& e) {
-        failed(e);
-    }
-    delete this;
-}
-
 namespace {
 template <class T> std::string encode(const T& t) {
     std::string encoded;
@@ -172,7 +172,13 @@
     }
 
     ~MessageUpdater() {
-        session.exchangeUnbind(queue, UpdateClient::UPDATE);
+        try {
+            session.exchangeUnbind(queue, UpdateClient::UPDATE);
+        }
+        catch (const std::exception& e) {
+            // Don't throw in a destructor.
+            QPID_LOG(error, "Unbinding update queue " << queue << ": " << 
e.what());
+        }
     }
 
 
@@ -204,11 +210,8 @@
     void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
         updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? 
lastPos.getValue()+1 : 1));
     }
-    
-   
 };
 
-
 void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
     QPID_LOG(debug, updaterId << " updating queue " << q->getName());
     ClusterConnectionProxy proxy(session);

Modified: qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=749730&r1=749729&r2=749730&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Tue Mar  3 20:21:01 2009
@@ -37,6 +37,8 @@
 #include "qpid/framing/Uuid.h"
 
 #include <ForkedBroker.h>
+#include <qpid/client/Connection.h>
+
 
 
 
@@ -45,6 +47,8 @@
 using namespace std;
 using boost::assign::list_of;
 using namespace qpid::framing;
+using namespace qpid::client;
+
 
 
 
@@ -82,6 +86,7 @@
 
 
 
+
 struct child
 {
     child ( string & name, pid_t pid, childType type ) 
@@ -121,10 +126,11 @@
 
 struct children : public vector<child *>
 { 
+
     void
     add ( string & name, pid_t pid, childType type )
     {
-        push_back(new child ( name, pid, type ));
+        push_back ( new child ( name, pid, type ) );
     }
 
 
@@ -146,7 +152,7 @@
         child * kid = get ( pid );
         if(! kid)
         {
-            if ( verbosity > 0 )
+            if ( verbosity > 1 )
             {
                 cerr << "children::exited warning: Can't find child with pid " 
                      << pid
@@ -176,10 +182,15 @@
     int
     checkChildren ( )
     {
-        vector<child *>::iterator i;
+       vector<child *>::iterator i;
         for ( i = begin(); i != end(); ++ i )
             if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) )
-                return (*i)->retval;
+            {
+              cerr << "checkChildren: error on child of type "
+                   << (*i)->type
+                   << endl;
+              return (*i)->retval;
+            }
       
         return 0;
     }
@@ -314,6 +325,37 @@
 
 
 
+ForkedBroker * newbie = 0;
+int newbie_port       = 0;
+
+
+
+bool
+wait_for_newbie ( )
+{
+  if ( ! newbie )
+    return true;
+
+  try 
+  {
+    Connection connection;
+    connection.open ( "127.0.0.1", newbie_port );
+    sleep ( 2 );
+    connection.close();
+    newbie = 0;  // He's no newbie anymore!
+    return true;
+  }
+  catch ( const std::exception& error )
+  {
+    std::cerr << "wait_for_newbie error: " 
+              << error.what() 
+              << endl;
+    return false;
+  }
+}
+
+
+
 
 void
 startNewBroker ( brokerVector & brokers,
@@ -342,7 +384,9 @@
                         ("--log-to-file")
                         ("/tmp/qpidd.log");
 
-    ForkedBroker * broker = new ForkedBroker ( argv );
+    newbie = new ForkedBroker ( argv );
+    newbie_port = newbie->getPort();
+    ForkedBroker * broker = newbie;
 
     if ( verbosity > 0 )
       std::cerr << "new broker created: pid == " 
@@ -362,6 +406,11 @@
 bool
 killFrontBroker ( brokerVector & brokers, int verbosity )
 {
+    cerr << "killFrontBroker: waiting for newbie sync...\n";
+    if ( ! wait_for_newbie() )
+      return false;
+    cerr << "killFrontBroker: newbie synced.\n";
+
     if ( verbosity > 0 )
         cout << "killFrontBroker pid: " << brokers[0]->getPID() << " on port " 
<< brokers[0]->getPort() << endl;
     try { brokers[0]->kill(9); }
@@ -400,10 +449,12 @@
 
     for ( uint i = 0; i < brokers.size(); ++ i )
         try { brokers[i]->kill(9); }
-        catch ( ... ) 
+        catch ( const exception& error ) 
         { 
           std::cerr << "killAllBrokers Warning: exception during kill on 
broker "
                     << i
+                    << " "
+                    << error.what()
                     << endl;
         }
 }
@@ -422,7 +473,7 @@
     string name("declareQueues");
     int port = brokers[0]->getPort ( );
 
-    if ( verbosity > 0 )
+    if ( verbosity > 1 )
         cout << "startDeclareQueuesClient: host:  " 
              << host 
              << "  port: " 
@@ -463,12 +514,16 @@
     string name("receiver");
     int port = brokers[0]->getPort ( );
 
-    if ( verbosity > 0 )
+    if ( verbosity > 1 )
         cout << "startReceivingClient: port " << port << endl;
+
+    // verbosity has to be > 1 to let clients talk.
+    int client_verbosity = (verbosity > 1 ) ? 1 : 0;
+
     char portStr[100];
     char verbosityStr[100];
     sprintf(portStr, "%d", port);
-    sprintf(verbosityStr, "%d", verbosity);
+    sprintf(verbosityStr, "%d", client_verbosity);
 
 
     vector<const char*> argv;
@@ -507,13 +562,16 @@
     string name("sender");
     int port = brokers[0]->getPort ( );
 
-    if ( verbosity )
+    if ( verbosity > 1)
         cout << "startSenderClient: port " << port << endl;
     char portStr[100];
     char verbosityStr[100];
+    //
+    // verbosity has to be > 1 to let clients talk.
+    int client_verbosity = (verbosity > 1 ) ? 1 : 0;
 
     sprintf ( portStr,      "%d", port);
-    sprintf ( verbosityStr, "%d", verbosity);
+    sprintf ( verbosityStr, "%d", client_verbosity);
 
     vector<const char*> argv;
     argv.push_back ( "replayingSender" );
@@ -582,7 +640,7 @@
 
     brokerVector brokers;
 
-    if ( verbosity > 0 )
+    if ( verbosity > 1 )
         cout << "Starting initial cluster...\n";
 
     int nBrokers = 3;
@@ -603,14 +661,14 @@
      pid_t dqClientPid = 
      runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity );
      if ( -1 == dqClientPid ) {
-         cerr << "ERROR: START_DECLARE_1 END_OF_TEST\n";
+         cerr << "END_OF_TEST ERROR_START_DECLARE_1\n";
          return CANT_FORK_DQ;
      }
 
      // Don't continue until declareQueues is finished.
      pid_t retval = waitpid ( dqClientPid, & childStatus, 0);
      if ( retval != dqClientPid) {
-         cerr << "ERROR: START_DECLARE_2 END_OF_TEST\n";
+         cerr << "END_OF_TEST ERROR_START_DECLARE_2\n";
          return DQ_FAILED;
      }
      allMyChildren.exited ( dqClientPid, childStatus );
@@ -625,7 +683,7 @@
                               reportFrequency,
                               verbosity );
      if ( -1 == receivingClientPid ) {
-         cerr << "ERROR: START_RECEIVER END_OF_TEST\n";
+         cerr << "END_OF_TEST ERROR_START_RECEIVER\n";
          return CANT_FORK_RECEIVER;
      }
 
@@ -639,7 +697,7 @@
                             reportFrequency,
                             verbosity );
      if ( -1 == sendingClientPid ) {
-         cerr << "ERROR: START_SENDER END_OF_TEST\n";
+         cerr << "END_OF_TEST ERROR_START_SENDER\n";
          return CANT_FORK_SENDER;
      }
 
@@ -666,13 +724,13 @@
          if ( ! killFrontBroker ( brokers, verbosity ) )
          {
            allMyChildren.killEverybody();
-           std::cerr << "ERROR: BROKER END_OF_TEST\n";
+           std::cerr << "END_OF_TEST ERROR_BROKER\n";
            return ERROR_KILLING_BROKER;
          }
 
          // Sleep for a while. -------------------------
          sleepyTime = mrand ( minSleep, maxSleep );
-         if ( verbosity > 0 )
+         if ( verbosity > 1 )
              cerr << "Sleeping for " << sleepyTime << " seconds.\n";
          sleep ( sleepyTime );
 
@@ -686,7 +744,7 @@
                           clusterName,
                           verbosity ); 
        
-         if ( verbosity > 0 )
+         if ( verbosity > 1 )
              printBrokers ( brokers );
        
          // If all children have exited, quit.
@@ -694,20 +752,20 @@
          if ( ! unfinished ) {
              killAllBrokers ( brokers, 5 );
 
-             if ( verbosity > 0 )
+             if ( verbosity > 1 )
                  cout << "failoverSoak: all children have exited.\n";
            int retval = allMyChildren.checkChildren();
-           if ( verbosity > 0 )
+           if ( verbosity > 1 )
              std::cerr << "failoverSoak: checkChildren: " << retval << endl;
 
            if ( retval )
            {
-               std::cerr << "ERROR: CLIENT END_OF_TEST\n";
+               std::cerr << "END_OF_TEST ERROR_CLIENT\n";
                return ERROR_ON_CHILD;
            }
            else
            {
-               std::cerr << "SUCCESSFUL END_OF_TEST\n";
+               std::cerr << "END_OF_TEST SUCCESSFUL\n";
                return HUNKY_DORY;
            }
          }
@@ -719,7 +777,7 @@
                  cout << "failoverSoak: error on child.\n";
              allMyChildren.killEverybody();
              killAllBrokers ( brokers, 5 );
-             std::cerr << "ERROR: CLIENT END_OF_TEST\n";
+             std::cerr << "END_OF_TEST ERROR_CLIENT\n";
              return ERROR_ON_CHILD;
          }
 
@@ -729,27 +787,27 @@
              /*
               * Don't kill any processes.  Leave alive for questioning.
               * */
-             std::cerr << "ERROR: HANGING END_OF_TEST\n";
+             std::cerr << "END_OF_TEST ERROR_HANGING\n";
              return HANGING;
          }
 
-         if ( verbosity > 0 ) {
+         if ( verbosity > 1 ) {
            std::cerr << "------- next kill-broker loop --------\n";
            allMyChildren.print();
          }
      }
 
      retval = allMyChildren.checkChildren();
-     if ( verbosity > 0 )
+     if ( verbosity > 1 )
        std::cerr << "failoverSoak: checkChildren: " << retval << endl;
 
-     if ( verbosity > 0 )
+     if ( verbosity > 1 )
          cout << "failoverSoak: maxBrokers reached.\n";
 
      allMyChildren.killEverybody();
      killAllBrokers ( brokers, 5 );
 
-     std::cerr << "SUCCESSFUL END_OF_TEST\n";
+     std::cerr << "END_OF_TEST SUCCESSFUL\n";
 
      return retval ? ERROR_ON_CHILD : HUNKY_DORY;
 }

Modified: qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp?rev=749730&r1=749729&r2=749730&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp Tue Mar  3 20:21:01 2009
@@ -60,6 +60,8 @@
 
 void Sender::execute(AsyncSession& session, bool isRetry)
 {
+    if (verbosity > 0)
+        std::cout << "replaying_sender " << (isRetry ? "first " : "re-") << 
"connect." << endl;
     if (isRetry) sender.replay(session);
     else sender.init(session);
     while (sent < count) {

Modified: qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp?rev=749730&r1=749729&r2=749730&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp Tue Mar  3 20:21:01 2009
@@ -114,11 +114,9 @@
     if (gaps) throw Exception("Detected gaps in sequence; messages appear to 
have been lost.");
 }
 
-void Listener::execute(AsyncSession& session, bool isRetry)
-{
-    if (isRetry) {
-        // std::cout << "Resuming from " << count << std::endl;
-    }
+void Listener::execute(AsyncSession& session, bool isRetry) {
+    if (verbosity > 0)
+        std::cout << "resuming_receiver " << (isRetry ? "first " : "re-") << 
"connect." << endl;
     if (!done) {
         SubscriptionManager subs(session);
         subscription = subs.subscribe(*this, "message_queue");

Modified: qpid/trunk/qpid/cpp/src/tests/run_failover_soak
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_failover_soak?rev=749730&r1=749729&r2=749730&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_failover_soak (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_failover_soak Tue Mar  3 20:21:01 2009
@@ -47,10 +47,10 @@
 
 src_root=..
 module_dir=$src_root/.libs
-n_messages=300000
-report_frequency=10000
-verbosity=1
 
+MESSAGES=${MESSAGES:-300000}
+REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`}
+VERBOSITY=${VERBOSITY:-1}
 
-exec ./failover_soak $src_root $module_dir $host ./declare_queues 
./replaying_sender ./resuming_receiver $n_messages $report_frequency $verbosity
+exec ./failover_soak $src_root $module_dir $host ./declare_queues 
./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to