Author: aconway Date: Tue Mar 3 20:21:01 2009 New Revision: 749730 URL: http://svn.apache.org/viewvc?rev=749730&view=rev Log:
cluster::UpdateClient added missing error handling. Minor improvements to failover_soak tests. Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp qpid/trunk/qpid/cpp/src/tests/run_failover_soak Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=749730&r1=749729&r2=749730&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Mar 3 20:21:01 2009 @@ -106,6 +106,16 @@ // Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. const std::string UpdateClient::UPDATE("qpid.qpid-update"); +void UpdateClient::run() { + try { + update(); + done(); + } catch (const std::exception& e) { + failed(e); + } + delete this; +} + void UpdateClient::update() { QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; @@ -130,16 +140,6 @@ QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl); } -void UpdateClient::run() { - try { - update(); - done(); - } catch (const std::exception& e) { - failed(e); - } - delete this; -} - namespace { template <class T> std::string encode(const T& t) { std::string encoded; @@ -172,7 +172,13 @@ } ~MessageUpdater() { - session.exchangeUnbind(queue, UpdateClient::UPDATE); + try { + session.exchangeUnbind(queue, UpdateClient::UPDATE); + } + catch (const std::exception& e) { + // Don't throw in a destructor. + QPID_LOG(error, "Unbinding update queue " << queue << ": " << e.what()); + } } @@ -204,11 +210,8 @@ void updateMessage(const boost::intrusive_ptr<broker::Message>& message) { updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); } - - }; - void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) { QPID_LOG(debug, updaterId << " updating queue " << q->getName()); ClusterConnectionProxy proxy(session); Modified: qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=749730&r1=749729&r2=749730&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Tue Mar 3 20:21:01 2009 @@ -37,6 +37,8 @@ #include "qpid/framing/Uuid.h" #include <ForkedBroker.h> +#include <qpid/client/Connection.h> + @@ -45,6 +47,8 @@ using namespace std; using boost::assign::list_of; using namespace qpid::framing; +using namespace qpid::client; + @@ -82,6 +86,7 @@ + struct child { child ( string & name, pid_t pid, childType type ) @@ -121,10 +126,11 @@ struct children : public vector<child *> { + void add ( string & name, pid_t pid, childType type ) { - push_back(new child ( name, pid, type )); + push_back ( new child ( name, pid, type ) ); } @@ -146,7 +152,7 @@ child * kid = get ( pid ); if(! kid) { - if ( verbosity > 0 ) + if ( verbosity > 1 ) { cerr << "children::exited warning: Can't find child with pid " << pid @@ -176,10 +182,15 @@ int checkChildren ( ) { - vector<child *>::iterator i; + vector<child *>::iterator i; for ( i = begin(); i != end(); ++ i ) if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) ) - return (*i)->retval; + { + cerr << "checkChildren: error on child of type " + << (*i)->type + << endl; + return (*i)->retval; + } return 0; } @@ -314,6 +325,37 @@ +ForkedBroker * newbie = 0; +int newbie_port = 0; + + + +bool +wait_for_newbie ( ) +{ + if ( ! newbie ) + return true; + + try + { + Connection connection; + connection.open ( "127.0.0.1", newbie_port ); + sleep ( 2 ); + connection.close(); + newbie = 0; // He's no newbie anymore! + return true; + } + catch ( const std::exception& error ) + { + std::cerr << "wait_for_newbie error: " + << error.what() + << endl; + return false; + } +} + + + void startNewBroker ( brokerVector & brokers, @@ -342,7 +384,9 @@ ("--log-to-file") ("/tmp/qpidd.log"); - ForkedBroker * broker = new ForkedBroker ( argv ); + newbie = new ForkedBroker ( argv ); + newbie_port = newbie->getPort(); + ForkedBroker * broker = newbie; if ( verbosity > 0 ) std::cerr << "new broker created: pid == " @@ -362,6 +406,11 @@ bool killFrontBroker ( brokerVector & brokers, int verbosity ) { + cerr << "killFrontBroker: waiting for newbie sync...\n"; + if ( ! wait_for_newbie() ) + return false; + cerr << "killFrontBroker: newbie synced.\n"; + if ( verbosity > 0 ) cout << "killFrontBroker pid: " << brokers[0]->getPID() << " on port " << brokers[0]->getPort() << endl; try { brokers[0]->kill(9); } @@ -400,10 +449,12 @@ for ( uint i = 0; i < brokers.size(); ++ i ) try { brokers[i]->kill(9); } - catch ( ... ) + catch ( const exception& error ) { std::cerr << "killAllBrokers Warning: exception during kill on broker " << i + << " " + << error.what() << endl; } } @@ -422,7 +473,7 @@ string name("declareQueues"); int port = brokers[0]->getPort ( ); - if ( verbosity > 0 ) + if ( verbosity > 1 ) cout << "startDeclareQueuesClient: host: " << host << " port: " @@ -463,12 +514,16 @@ string name("receiver"); int port = brokers[0]->getPort ( ); - if ( verbosity > 0 ) + if ( verbosity > 1 ) cout << "startReceivingClient: port " << port << endl; + + // verbosity has to be > 1 to let clients talk. + int client_verbosity = (verbosity > 1 ) ? 1 : 0; + char portStr[100]; char verbosityStr[100]; sprintf(portStr, "%d", port); - sprintf(verbosityStr, "%d", verbosity); + sprintf(verbosityStr, "%d", client_verbosity); vector<const char*> argv; @@ -507,13 +562,16 @@ string name("sender"); int port = brokers[0]->getPort ( ); - if ( verbosity ) + if ( verbosity > 1) cout << "startSenderClient: port " << port << endl; char portStr[100]; char verbosityStr[100]; + // + // verbosity has to be > 1 to let clients talk. + int client_verbosity = (verbosity > 1 ) ? 1 : 0; sprintf ( portStr, "%d", port); - sprintf ( verbosityStr, "%d", verbosity); + sprintf ( verbosityStr, "%d", client_verbosity); vector<const char*> argv; argv.push_back ( "replayingSender" ); @@ -582,7 +640,7 @@ brokerVector brokers; - if ( verbosity > 0 ) + if ( verbosity > 1 ) cout << "Starting initial cluster...\n"; int nBrokers = 3; @@ -603,14 +661,14 @@ pid_t dqClientPid = runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity ); if ( -1 == dqClientPid ) { - cerr << "ERROR: START_DECLARE_1 END_OF_TEST\n"; + cerr << "END_OF_TEST ERROR_START_DECLARE_1\n"; return CANT_FORK_DQ; } // Don't continue until declareQueues is finished. pid_t retval = waitpid ( dqClientPid, & childStatus, 0); if ( retval != dqClientPid) { - cerr << "ERROR: START_DECLARE_2 END_OF_TEST\n"; + cerr << "END_OF_TEST ERROR_START_DECLARE_2\n"; return DQ_FAILED; } allMyChildren.exited ( dqClientPid, childStatus ); @@ -625,7 +683,7 @@ reportFrequency, verbosity ); if ( -1 == receivingClientPid ) { - cerr << "ERROR: START_RECEIVER END_OF_TEST\n"; + cerr << "END_OF_TEST ERROR_START_RECEIVER\n"; return CANT_FORK_RECEIVER; } @@ -639,7 +697,7 @@ reportFrequency, verbosity ); if ( -1 == sendingClientPid ) { - cerr << "ERROR: START_SENDER END_OF_TEST\n"; + cerr << "END_OF_TEST ERROR_START_SENDER\n"; return CANT_FORK_SENDER; } @@ -666,13 +724,13 @@ if ( ! killFrontBroker ( brokers, verbosity ) ) { allMyChildren.killEverybody(); - std::cerr << "ERROR: BROKER END_OF_TEST\n"; + std::cerr << "END_OF_TEST ERROR_BROKER\n"; return ERROR_KILLING_BROKER; } // Sleep for a while. ------------------------- sleepyTime = mrand ( minSleep, maxSleep ); - if ( verbosity > 0 ) + if ( verbosity > 1 ) cerr << "Sleeping for " << sleepyTime << " seconds.\n"; sleep ( sleepyTime ); @@ -686,7 +744,7 @@ clusterName, verbosity ); - if ( verbosity > 0 ) + if ( verbosity > 1 ) printBrokers ( brokers ); // If all children have exited, quit. @@ -694,20 +752,20 @@ if ( ! unfinished ) { killAllBrokers ( brokers, 5 ); - if ( verbosity > 0 ) + if ( verbosity > 1 ) cout << "failoverSoak: all children have exited.\n"; int retval = allMyChildren.checkChildren(); - if ( verbosity > 0 ) + if ( verbosity > 1 ) std::cerr << "failoverSoak: checkChildren: " << retval << endl; if ( retval ) { - std::cerr << "ERROR: CLIENT END_OF_TEST\n"; + std::cerr << "END_OF_TEST ERROR_CLIENT\n"; return ERROR_ON_CHILD; } else { - std::cerr << "SUCCESSFUL END_OF_TEST\n"; + std::cerr << "END_OF_TEST SUCCESSFUL\n"; return HUNKY_DORY; } } @@ -719,7 +777,7 @@ cout << "failoverSoak: error on child.\n"; allMyChildren.killEverybody(); killAllBrokers ( brokers, 5 ); - std::cerr << "ERROR: CLIENT END_OF_TEST\n"; + std::cerr << "END_OF_TEST ERROR_CLIENT\n"; return ERROR_ON_CHILD; } @@ -729,27 +787,27 @@ /* * Don't kill any processes. Leave alive for questioning. * */ - std::cerr << "ERROR: HANGING END_OF_TEST\n"; + std::cerr << "END_OF_TEST ERROR_HANGING\n"; return HANGING; } - if ( verbosity > 0 ) { + if ( verbosity > 1 ) { std::cerr << "------- next kill-broker loop --------\n"; allMyChildren.print(); } } retval = allMyChildren.checkChildren(); - if ( verbosity > 0 ) + if ( verbosity > 1 ) std::cerr << "failoverSoak: checkChildren: " << retval << endl; - if ( verbosity > 0 ) + if ( verbosity > 1 ) cout << "failoverSoak: maxBrokers reached.\n"; allMyChildren.killEverybody(); killAllBrokers ( brokers, 5 ); - std::cerr << "SUCCESSFUL END_OF_TEST\n"; + std::cerr << "END_OF_TEST SUCCESSFUL\n"; return retval ? ERROR_ON_CHILD : HUNKY_DORY; } Modified: qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp?rev=749730&r1=749729&r2=749730&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp Tue Mar 3 20:21:01 2009 @@ -60,6 +60,8 @@ void Sender::execute(AsyncSession& session, bool isRetry) { + if (verbosity > 0) + std::cout << "replaying_sender " << (isRetry ? "first " : "re-") << "connect." << endl; if (isRetry) sender.replay(session); else sender.init(session); while (sent < count) { Modified: qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp?rev=749730&r1=749729&r2=749730&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp Tue Mar 3 20:21:01 2009 @@ -114,11 +114,9 @@ if (gaps) throw Exception("Detected gaps in sequence; messages appear to have been lost."); } -void Listener::execute(AsyncSession& session, bool isRetry) -{ - if (isRetry) { - // std::cout << "Resuming from " << count << std::endl; - } +void Listener::execute(AsyncSession& session, bool isRetry) { + if (verbosity > 0) + std::cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl; if (!done) { SubscriptionManager subs(session); subscription = subs.subscribe(*this, "message_queue"); Modified: qpid/trunk/qpid/cpp/src/tests/run_failover_soak URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_failover_soak?rev=749730&r1=749729&r2=749730&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/run_failover_soak (original) +++ qpid/trunk/qpid/cpp/src/tests/run_failover_soak Tue Mar 3 20:21:01 2009 @@ -47,10 +47,10 @@ src_root=.. module_dir=$src_root/.libs -n_messages=300000 -report_frequency=10000 -verbosity=1 +MESSAGES=${MESSAGES:-300000} +REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`} +VERBOSITY=${VERBOSITY:-1} -exec ./failover_soak $src_root $module_dir $host ./declare_queues ./replaying_sender ./resuming_receiver $n_messages $report_frequency $verbosity +exec ./failover_soak $src_root $module_dir $host ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org