Author: aconway
Date: Wed Nov 19 12:22:38 2008
New Revision: 719055

URL: http://svn.apache.org/viewvc?rev=719055&view=rev
Log:
tests/failover_soak: run a cluster with clients, kill and restart cluster 
members, verify client fail-over.

Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/run_failover_soak   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/   (props changed)
    incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Nov 19 12:22:38 2008
@@ -31,3 +31,7 @@
 txshift
 txjob
 
+declare_queues
+failover_soak
+replaying_sender
+resuming_receiver

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=719055&r1=719054&r2=719055&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Wed Nov 19 12:22:38 
2008
@@ -59,17 +59,23 @@
     }
 
     void kill(int sig=SIGINT) {
-        using qpid::ErrnoException;
         if (pid == 0) return;
-        if (::kill(pid, sig) < 0) throw ErrnoException("kill failed");
+        int savePid = pid;      
+        pid = 0;                // Always reset pid, even in case of an 
exception below. 
+        ::close(pipeFds[1]);
+
+        using qpid::ErrnoException;
+        if (::kill(savePid, sig) < 0) 
+            throw ErrnoException("kill failed");
         int status;
-        if (::waitpid(pid, &status, 0) < 0) throw ErrnoException("wait for 
forked process failed");
-        if (WEXITSTATUS(status) != 0)
-            throw qpid::Exception(QPID_MSG("forked broker exited with: " << 
WEXITSTATUS(status)));
-        pid = 0;
+        if (::waitpid(savePid, &status, 0) < 0) 
+            throw ErrnoException("wait for forked process failed");
+        if (WEXITSTATUS(status) != 0) 
+            throw qpid::Exception(QPID_MSG("Forked broker exited with: " << 
WEXITSTATUS(status)));
     }
 
     uint16_t getPort() { return port; }
+    pid_t getPID() { return pid; }
 
   private:
 
@@ -77,7 +83,6 @@
         using qpid::ErrnoException;
         pid = 0;
         port = 0;
-        int pipeFds[2];
         if(::pipe(pipeFds) < 0) throw ErrnoException("Can't create pipe");
         pid = ::fork();
         if (pid < 0) throw ErrnoException("Fork failed");
@@ -104,6 +109,7 @@
         }
     }
 
+    int pipeFds[2];
     pid_t pid;
     int port;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=719055&r1=719054&r2=719055&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Nov 19 12:22:38 2008
@@ -134,6 +134,22 @@
 header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h
 header_test_LDADD=$(lib_client) 
 
+check_PROGRAMS+=failover_soak
+failover_soak_SOURCES=failover_soak.cpp  ForkedBroker.h
+failover_soak_LDADD=$(lib_client) 
+
+check_PROGRAMS+=declare_queues
+declare_queues_SOURCES=declare_queues.cpp  
+declare_queues_LDADD=$(lib_client) 
+
+check_PROGRAMS+=replaying_sender
+replaying_sender_SOURCES=replaying_sender.cpp  
+replaying_sender_LDADD=$(lib_client) 
+
+check_PROGRAMS+=resuming_receiver
+resuming_receiver_SOURCES=resuming_receiver.cpp  
+resuming_receiver_LDADD=$(lib_client) 
+
 check_PROGRAMS+=txshift
 txshift_SOURCES=txshift.cpp  TestOptions.h ConnectionOptions.h
 txshift_LDADD=$(lib_client) 
@@ -153,7 +169,7 @@
 TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= 
BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test 
 
 system_tests = client_test quick_perftest quick_topictest run_header_test 
quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker 
run_federation_tests run_acl_tests
+TESTS += start_broker $(system_tests) python_tests stop_broker 
run_federation_tests run_acl_tests 
 
 EXTRA_DIST +=                                                          \
   run_test vg_check                                                    \
@@ -201,7 +217,7 @@
 
 # Longer running stability tests, not run by default check: target.
 # Not run under valgrind, too slow
-LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest
+LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest 
run_failover_soak
 EXTRA_DIST+=$(LONG_TESTS) run_perftest
 check-long:
        $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=

Added: incubator/qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp?rev=719055&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp Wed Nov 19 
12:22:38 2008
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/Exception.h>
+
+#include <cstdlib>
+#include <iostream>
+
+using namespace qpid::client;
+
+using namespace std;
+
+int main(int argc, char ** argv) 
+{
+    ConnectionSettings settings;
+    if ( argc != 3 )
+    {
+      cerr << "Usage: declare_queues host port\n";
+      return 1;
+    }
+
+    settings.host = argv[1];
+    settings.port = atoi(argv[2]);
+    
+    FailoverManager connection(settings);
+    try {
+        bool complete = false;
+        while (!complete) {
+            Session session = connection.connect().newSession();
+            try {
+                session.queueDeclare(arg::queue="message_queue");
+                complete = true;
+            } catch (const qpid::TransportFailure&) {}
+        }
+        connection.close();
+        return 0;
+    } catch (const exception& error) {
+        cerr << "declare_queues failed:" << error.what() << endl;
+        cerr << "  host: " << settings.host 
+             << "  port: " << settings.port << endl;
+        return 1;
+    }
+    
+}
+
+
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=719055&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Wed Nov 19 
12:22:38 2008
@@ -0,0 +1,651 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/wait.h>
+#include <sys/time.h>
+#include <string.h>
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <vector>
+
+#include <ForkedBroker.h>
+
+
+
+
+using namespace std;
+
+
+typedef vector<ForkedBroker *> brokerVector;
+
+typedef enum
+{
+    NO_STATUS,
+    RUNNING,
+    COMPLETED
+}
+childStatus;
+
+
+
+struct child
+{
+    child ( string & name, pid_t pid ) 
+        : name(name), pid(pid), retval(-999), status(RUNNING)
+    { 
+        gettimeofday ( & startTime, 0 );
+    }
+
+
+    void
+    done ( int _retval )
+    {
+        retval = _retval;
+        status = COMPLETED;
+        gettimeofday ( & stopTime, 0 );
+    }
+
+
+    string name;
+    pid_t pid;
+    int   retval;
+    childStatus status;
+    struct timeval startTime,
+                   stopTime;
+};
+
+
+
+
+struct children : public vector<child *>
+{ 
+    void
+    add ( string & name, pid_t pid )
+    {
+        push_back(new child ( name, pid ));
+    }
+
+
+    child * 
+    get ( pid_t pid )
+    {
+        vector<child *>::iterator i;
+        for ( i = begin(); i != end(); ++ i )
+            if ( pid == (*i)->pid )
+                return *i;
+
+        return 0;
+    }
+
+
+    void
+    exited ( pid_t pid, int retval  )
+    {
+        child * kid = get ( pid );
+        if(! kid)
+        {
+            if ( verbosity > 0 )
+            {
+                cerr << "children::exited warning: Can't find child with pid " 
+                     << pid
+                     << endl;
+            }
+            return;
+        }
+
+        kid->done ( retval );
+    }
+
+
+    int
+    unfinished ( )
+    {
+        int count = 0;
+
+        vector<child *>::iterator i;
+        for ( i = begin(); i != end(); ++ i )
+            if ( COMPLETED != (*i)->status )
+                ++ count;
+
+        return count;
+    }
+
+
+    int
+    checkChildren ( )
+    {
+        vector<child *>::iterator i;
+        for ( i = begin(); i != end(); ++ i )
+            if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) )
+                return (*i)->retval;
+      
+        return 0;
+    }
+
+
+    void
+    killEverybody ( )
+    {
+        vector<child *>::iterator i;
+        for ( i = begin(); i != end(); ++ i )
+            kill ( (*i)->pid, 9 );
+    }
+
+
+
+    void
+    print ( )
+    {
+        cout << "--- status of all children --------------\n";
+        vector<child *>::iterator i;
+        for ( i = begin(); i != end(); ++ i )
+            cout << "child: " << (*i)->name
+                 << "  status: " << (*i)->status
+                 << endl;
+        cout << "\n\n\n\n";
+    }
+
+
+    /* 
+       Only call this if you already know there is at least 
+       one child still running.  Supply a time in seconds.
+       If it has been at least that long since a shild stopped
+       running, we judge the system to have hung.
+    */
+    bool
+    hanging ( int hangTime )
+    {
+        struct timeval now,
+                       duration;
+        gettimeofday ( &now, 0 );
+
+        vector<child *>::iterator i;
+        for ( i = begin(); i != end(); ++ i )
+        {
+            timersub ( & now, &((*i)->startTime), & duration );
+            if ( duration.tv_sec >= hangTime )
+                return true;
+        }
+        
+        return false;
+    }
+    
+
+    int verbosity;
+};
+
+
+
+children allMyChildren;
+
+
+
+
+void 
+childExit ( int signalNumber ) 
+{
+    signalNumber ++;  // Now maybe the compiler willleave me alone?
+    int  childReturnCode; 
+    pid_t pid = waitpid ( 0, & childReturnCode, WNOHANG);  
+
+    if ( pid > 0 )
+        allMyChildren.exited ( pid, childReturnCode );
+}
+
+
+
+int
+mrand ( int maxDesiredVal ) {
+    double zeroToOne = (double) rand() / (double) RAND_MAX;
+    return (int) (zeroToOne * (double) maxDesiredVal);
+}
+
+
+
+int
+mrand ( int minDesiredVal, int maxDesiredVal ) {
+    int interval = maxDesiredVal - minDesiredVal;
+    return minDesiredVal + mrand ( interval );
+}
+
+
+
+void
+makeClusterName ( string & s, int & num ) {
+    num = mrand(1000);
+    stringstream ss;
+    ss << "soakTestCluster_" << num;
+    s = ss.str();
+}
+
+
+
+
+
+void
+printBrokers ( brokerVector & brokers )
+{
+    cout << "Broker List ------------ size: " << brokers.size() << "\n";
+    for ( brokerVector::iterator i = brokers.begin(); i != brokers.end(); ++ 
i) {
+        cout << "pid: " 
+             << (*i)->getPID() 
+             << "   port: " 
+             << (*i)->getPort() 
+             << endl;
+    }
+    cout << "end Broker List ------------\n";
+}
+
+
+
+
+
+void
+startNewBroker ( brokerVector & brokers,
+                 char const * srcRoot,
+                 char const * moduleDir,
+                 string const clusterName ) 
+{
+    stringstream path;
+    path << srcRoot << "/qpidd";
+  
+    const char * const argv[] = 
+    {
+        "qpidd",
+        "-p0",
+        "--module-dir",
+        moduleDir,
+        "--load-module=cluster.so",
+        "--cluster-name",
+        clusterName.c_str(),
+        "--auth=no", 
+        "--no-data-dir",
+        "--mgmt-enable=no",
+        0
+    };
+
+    size_t argc = sizeof(argv)/sizeof(argv[0]);
+    brokers.push_back ( new ForkedBroker ( argc, argv ) );
+}
+
+
+
+
+
+void
+killFrontBroker ( brokerVector & brokers, int verbosity )
+{
+    if ( verbosity > 0 )
+        cout << "killFrontBroker pid: " << brokers[0]->getPID() << " on port " 
<< brokers[0]->getPort() << endl;
+    try { brokers[0]->kill(9); }
+    catch ( const exception& error ) {
+        if ( verbosity > 0 )
+            cout << "error killing broker: " << error.what() << endl;
+    }
+    delete brokers[0];
+    brokers.erase ( brokers.begin() );
+}
+
+
+
+
+
+void
+killAllBrokers ( brokerVector & brokers )
+{
+    for ( uint i = 0; i < brokers.size(); ++ i )
+        try { brokers[i]->kill(9); }
+        catch ( ... ) { }
+}
+
+
+
+
+
+pid_t
+runDeclareQueuesClient ( brokerVector brokers, 
+                            char const *  host,
+                            char const *  path,
+                            int verbosity
+                          ) 
+{
+    string name("declareQueues");
+    int port = brokers[0]->getPort ( );
+
+    if ( verbosity > 0 )
+        cout << "startDeclareQueuesClient: host:  " 
+             << host 
+             << "  port: " 
+             << port 
+             << endl;
+    stringstream portSs;
+    portSs << port;
+
+    vector<const char*> argv;
+    argv.push_back ( "declareQueues" );
+    argv.push_back ( host );
+    argv.push_back ( portSs.str().c_str() );
+    argv.push_back ( 0 );
+    pid_t pid = fork();
+
+    if ( ! pid ) {
+        execv ( path, const_cast<char * const *>(&argv[0]) );
+        perror ( "error executing dq: " );
+        return 0;
+    }
+
+    allMyChildren.add ( name, pid );
+    return pid;
+}
+
+
+
+
+
+pid_t
+startReceivingClient ( brokerVector brokers, 
+                         char const *  host,
+                         char const *  receiverPath,
+                         char const *  reportFrequency,
+                         int verbosity
+                       ) 
+{
+    string name("receiver");
+    int port = brokers[0]->getPort ( );
+
+    if ( verbosity > 0 )
+        cout << "startReceivingClient: port " << port << endl;
+    char portStr[100];
+    char verbosityStr[100];
+    sprintf(portStr, "%d", port);
+    sprintf(verbosityStr, "%d", verbosity);
+
+
+    vector<const char*> argv;
+    argv.push_back ( "resumingReceiver" );
+    argv.push_back ( host );
+    argv.push_back ( portStr );
+    argv.push_back ( reportFrequency );
+    argv.push_back ( verbosityStr );
+    argv.push_back ( 0 );
+
+    pid_t pid = fork();
+
+    if ( ! pid ) {
+        execv ( receiverPath, const_cast<char * const *>(&argv[0]) );
+        perror ( "error executing receiver: " );
+        return 0;
+    }
+
+    allMyChildren.add ( name, pid );
+    return pid;
+}
+
+
+
+
+
+pid_t
+startSendingClient ( brokerVector brokers, 
+                       char const *  host,
+                       char const *  senderPath,
+                       char const *  nMessages,
+                       char const *  reportFrequency,
+                       int verbosity
+                     ) 
+{
+    string name("sender");
+    int port = brokers[0]->getPort ( );
+
+    if ( verbosity )
+        cout << "startSenderClient: port " << port << endl;
+    char portStr[100];
+    char verbosityStr[100];
+
+    sprintf ( portStr,      "%d", port);
+    sprintf ( verbosityStr, "%d", verbosity);
+
+    vector<const char*> argv;
+    argv.push_back ( "replayingSender" );
+    argv.push_back ( host );
+    argv.push_back ( portStr );
+    argv.push_back ( nMessages );
+    argv.push_back ( reportFrequency );
+    argv.push_back ( verbosityStr );
+    argv.push_back ( 0 );
+
+    pid_t pid = fork();
+
+    if ( ! pid ) {
+        execv ( senderPath, const_cast<char * const *>(&argv[0]) );
+        perror ( "error executing sender: " );
+        return 0;
+    }
+
+    allMyChildren.add ( name, pid );
+    return pid;
+}
+
+
+
+#define HUNKY_DORY          0
+#define BAD_ARGS            1
+#define CANT_FORK_DQ        2
+#define CANT_FORK_RECEIVER  3
+#define DQ_FAILED           4
+#define ERROR_ON_CHILD      5
+#define HANGING             6
+
+
+int
+main ( int argc, char const ** argv ) 
+{    
+    if ( argc < 9 ) {
+        cerr << "Usage: failoverSoak srcRoot moduleDir host senderPath 
receiverPath nMessages verbosity\n";
+        cerr << "    ( argc was " << argc << " )\n";
+        return BAD_ARGS;
+    }
+
+    signal ( SIGCHLD, childExit );
+
+    char const * srcRoot            = argv[1];
+    char const * moduleDir          = argv[2];
+    char const * host               = argv[3];
+    char const * declareQueuesPath  = argv[4];
+    char const * senderPath         = argv[5];
+    char const * receiverPath       = argv[6];
+    char const * nMessages          = argv[7];
+    char const * reportFrequency    = argv[8];
+    int          verbosity          = atoi(argv[9]);
+
+    int maxBrokers = 50;
+
+    allMyChildren.verbosity = verbosity;
+
+    int clusterNum;
+    string clusterName;
+
+    srand ( getpid() );
+
+    makeClusterName ( clusterName, clusterNum );
+
+    brokerVector brokers;
+
+    if ( verbosity > 0 )
+        cout << "Starting initial cluster...\n";
+
+    int nBrokers = 3;
+    for ( int i = 0; i < nBrokers; ++ i ) {
+        startNewBroker ( brokers,
+                         srcRoot,
+                         moduleDir, 
+                         clusterName ); 
+    }
+
+
+    if ( verbosity > 0 )
+        printBrokers ( brokers );
+
+     // Run the declareQueues child.
+     int childStatus;
+     pid_t dqClientPid = 
+     runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity );
+     if ( -1 == dqClientPid ) {
+         cerr << "failoverSoak error: Couldn't fork declareQueues.\n";
+         return CANT_FORK_DQ;
+     }
+
+     // Don't continue until declareQueues is finished.
+     pid_t retval = waitpid ( dqClientPid, & childStatus, 0);
+     if ( retval != dqClientPid) {
+         cerr << "failoverSoak error:  waitpid on declareQueues returned value 
" <<  retval << endl;
+         return DQ_FAILED;
+     }
+     allMyChildren.exited ( dqClientPid, childStatus );
+
+
+
+     // Start the receiving client.
+     pid_t receivingClientPid =
+     startReceivingClient ( brokers, 
+                              host, 
+                              receiverPath,
+                              reportFrequency,
+                              verbosity );
+     if ( -1 == receivingClientPid ) {
+         cerr << "failoverSoak error: Couldn't fork receiver.\n";
+         return CANT_FORK_RECEIVER;
+     }
+
+
+     // Start the sending client.
+     pid_t sendingClientPid = 
+     startSendingClient ( brokers, 
+                            host, 
+                            senderPath, 
+                            nMessages,
+                            reportFrequency,
+                            verbosity );
+     if ( -1 == sendingClientPid ) {
+         cerr << "failoverSoak error: Couldn't fork sender.\n";
+         return CANT_FORK_RECEIVER;
+     }
+
+
+     int minSleep = 3,
+         maxSleep = 6;
+
+
+     for ( int totalBrokers = 3; 
+           totalBrokers < maxBrokers; 
+           ++ totalBrokers 
+         ) 
+     {
+         if ( verbosity > 0 )
+             cout << totalBrokers << " brokers have been added to the 
cluster.\n\n\n";
+
+         // Sleep for a while. -------------------------
+         int sleepyTime = mrand ( minSleep, maxSleep );
+         if ( verbosity > 0 )
+             cout << "Sleeping for " << sleepyTime << " seconds.\n";
+         sleep ( sleepyTime );
+
+         // Kill the oldest broker. --------------------------
+         killFrontBroker ( brokers, verbosity );
+
+         // Sleep for a while. -------------------------
+         sleepyTime = mrand ( minSleep, maxSleep );
+         if ( verbosity > 0 )
+             cerr << "Sleeping for " << sleepyTime << " seconds.\n";
+         sleep ( sleepyTime );
+
+         // Start a new broker. --------------------------
+         if ( verbosity > 0 )
+             cout << "Starting new broker.\n\n";
+
+         startNewBroker ( brokers,
+                          srcRoot,
+                          moduleDir, 
+                          clusterName ); 
+       
+         if ( verbosity > 0 )
+             printBrokers ( brokers );
+       
+         // If all children have exited, quit.
+         int unfinished = allMyChildren.unfinished();
+         if ( ! unfinished ) {
+             killAllBrokers ( brokers );
+
+             if ( verbosity > 0 )
+                 cout << "failoverSoak: all children have exited.\n";
+           int retval = allMyChildren.checkChildren();
+           if ( verbosity > 0 )
+             std::cerr << "failoverSoak: checkChildren: " << retval << endl;
+           return retval ? ERROR_ON_CHILD : HUNKY_DORY;
+         }
+
+         // Even if some are still running, if there's an error, quit.
+         if ( allMyChildren.checkChildren() )
+         {
+             if ( verbosity > 0 )
+                 cout << "failoverSoak: error on child.\n";
+             allMyChildren.killEverybody();
+             killAllBrokers ( brokers );
+             return ERROR_ON_CHILD;
+         }
+
+         // If one is hanging, quit.
+         if ( allMyChildren.hanging ( 120 ) )
+         {
+             if ( verbosity > 0 )
+                 cout << "failoverSoak: child hanging.\n";
+             allMyChildren.killEverybody();
+             killAllBrokers ( brokers );
+             return HANGING;
+         }
+
+         if ( verbosity > 0 ) {
+           std::cerr << "------- next kill-broker loop --------\n";
+           allMyChildren.print();
+         }
+     }
+
+     retval = allMyChildren.checkChildren();
+     if ( verbosity > 0 )
+       std::cerr << "failoverSoak: checkChildren: " << retval << endl;
+
+     if ( verbosity > 0 )
+         cout << "failoverSoak: maxBrokers reached.\n";
+
+     allMyChildren.killEverybody();
+     killAllBrokers ( brokers );
+
+     return retval ? ERROR_ON_CHILD : HUNKY_DORY;
+}
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp?rev=719055&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp Wed Nov 19 
12:22:38 2008
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/MessageReplayTracker.h>
+#include <qpid/Exception.h>
+
+#include <iostream>
+#include <sstream>
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+class Sender : public FailoverManager::Command
+{
+  public:
+    Sender(const std::string& queue, uint count, uint reportFreq);
+    void execute(AsyncSession& session, bool isRetry);
+    uint getSent();
+
+    int verbosity;
+
+  private:
+    MessageReplayTracker sender;
+    const uint count;
+    uint sent;
+    const uint reportFrequency;
+    Message message;
+    
+};
+
+Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : 
sender(10), count(count_), sent(0), reportFrequency(reportFreq)
+{
+    message.getDeliveryProperties().setRoutingKey(queue);
+}
+
+void Sender::execute(AsyncSession& session, bool isRetry)
+{
+    if (isRetry) sender.replay(session);
+    else sender.init(session);
+    while (sent < count) {
+        stringstream message_data;
+        message_data << ++sent;
+        message.setData(message_data.str());
+        message.getHeaders().setInt("sn", sent);
+        sender.send(message);
+        if (count > reportFrequency && !(sent % reportFrequency)) {
+            if ( verbosity > 0 )
+                std::cout << "sent " << sent << " of " << count << std::endl;
+        }
+    }
+    message.setData("That's all, folks!");
+    sender.send(message);
+
+    if ( verbosity > 0 )
+      std::cout << "SENDER COMPLETED\n";
+}
+
+uint Sender::getSent()
+{
+    return sent;
+}
+
+int main(int argc, char ** argv) 
+{
+    ConnectionSettings settings;
+
+    if ( argc != 6 )
+    {
+      std::cerr << "Usage: replaying_sender host port n_messages 
report_frequency verbosity\n";
+      return 1;
+    }
+
+    settings.host       = argv[1];
+    settings.port       = atoi(argv[2]);
+    int n_messages      = atoi(argv[3]);
+    int reportFrequency = atoi(argv[4]);
+    int verbosity       = atoi(argv[5]);
+
+    FailoverManager connection(settings);
+    Sender sender("message_queue", n_messages, reportFrequency );
+    sender.verbosity = verbosity;
+    try {
+        connection.execute ( sender );
+        if ( verbosity > 0 )
+        {
+            std::cout << "Sender finished.  Sent " 
+                      << sender.getSent() 
+                      << " messages." 
+                      << endl;
+        }
+        connection.close();
+        return 0;  
+    } 
+    catch(const std::exception& error) 
+    {
+        cerr << "Sender (host: " 
+             << settings.host 
+             << " port: " 
+             << settings.port
+             << " )  "
+             << " Failed: " 
+             << error.what() 
+             << std::endl;
+    }
+    return 1;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp?rev=719055&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp Wed Nov 19 
12:22:38 2008
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+
+#include <iostream>
+#include <fstream>
+
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+
+class Listener : public MessageListener, 
+                 public FailoverManager::Command, 
+                 public FailoverManager::ReconnectionStrategy
+{
+  public:
+    Listener ( int report_frequency = 1000, int verbosity = 0 );
+    void received(Message& message);
+    void execute(AsyncSession& session, bool isRetry);
+    void check();
+    void editUrlList(std::vector<Url>& urls);
+  private:
+    Subscription subscription;
+    uint count;
+    uint received_twice;
+    uint lastSn;
+    bool gaps;
+    uint  reportFrequency;
+    int  verbosity;
+};
+
+
+Listener::Listener(int freq, int verbosity) 
+  : count(0), 
+    received_twice(0), 
+    lastSn(0), 
+    gaps(false), 
+    reportFrequency(freq),
+    verbosity(verbosity)
+{}
+
+
+void Listener::received(Message & message) 
+{
+    if (message.getData() == "That's all, folks!") 
+    {
+        if(verbosity > 0 )
+        {
+            std::cout << "Shutting down listener for " 
+                      << message.getDestination() << std::endl;
+
+            std::cout << "Listener received " 
+                      << count 
+                      << " messages (" 
+                      << received_twice 
+                      << " received_twice)" 
+                      << endl;
+        }
+        subscription.cancel();
+        if ( verbosity > 0 )
+          std::cout << "LISTENER COMPLETED\n";
+    } else {
+        uint sn = message.getHeaders().getAsInt("sn");
+        if (lastSn < sn) {
+            if (sn - lastSn > 1) {
+                std::cerr << "Error: gap in sequence between " << lastSn << " 
and " << sn << std::endl;
+                gaps = true;
+            }
+            lastSn = sn;
+            ++count;
+            if ( ! ( count % reportFrequency ) ) {
+                if ( verbosity > 0 )
+                    std::cout << "Listener has received " 
+                              << count 
+                              << " messages.\n";
+            }
+        } else {
+            ++received_twice;
+        }
+    }
+}
+
+void Listener::check()
+{
+    if (gaps) throw Exception("Detected gaps in sequence; messages appear to 
have been lost.");
+}
+
+void Listener::execute(AsyncSession& session, bool isRetry)
+{
+    if (isRetry) {
+        // std::cout << "Resuming from " << count << std::endl;
+    }
+    SubscriptionManager subs(session);
+    subscription = subs.subscribe(*this, "message_queue");
+    subs.run();
+}
+
+void Listener::editUrlList(std::vector<Url>& urls)
+{
+    /**
+     * A more realistic algorithm would be to search through the list
+     * for prefered hosts and ensure they come first in the list.
+     */
+    if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, 
urls.end());
+}
+
+int main(int argc, char ** argv)
+{
+    ConnectionSettings settings;
+
+    if ( argc != 5 )
+    {
+      std::cerr << "Usage: resuming_receiver host port report_frequency 
verbosity\n";
+      return 1;
+    }
+
+    settings.host       = argv[1];
+    settings.port       = atoi(argv[2]);
+    int reportFrequency = atoi(argv[3]);
+    int verbosity       = atoi(argv[4]);
+
+    Listener listener(reportFrequency, verbosity);
+    FailoverManager connection(settings, &listener);
+
+    try {
+        connection.execute(listener);
+        connection.close();
+        listener.check();
+        return 0;
+    } catch(const std::exception& error) {
+        std::cerr << "Receiver failed: " << error.what() << std::endl;
+    }
+    return 1;
+}
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/run_failover_soak
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/run_failover_soak?rev=719055&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/run_failover_soak (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/run_failover_soak Wed Nov 19 
12:22:38 2008
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+host=127.0.0.1
+
+src_root=..
+module_dir=$src_root/.libs
+n_messages=300000
+report_frequency=10000
+verbosity=1
+
+
+exec `dirname $0`/failover_soak $src_root $module_dir $host ./declare_queues 
./replaying_sender ./resuming_receiver $n_messages $report_frequency $verbosity
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/run_failover_soak
------------------------------------------------------------------------------
    svn:executable = *


Reply via email to