Author: aconway
Date: Fri Oct 17 09:45:24 2008
New Revision: 705668

URL: http://svn.apache.org/viewvc?rev=705668&view=rev
Log:

QPID-1367 Mick Goulish: improvements to client-side failover.

Modified:
    incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp
    incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
    incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
    
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/tests/   (props changed)
    incubator/qpid/trunk/qpid/cpp/src/tests/ssl_test

Modified: incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp Fri Oct 
17 09:45:24 2008
@@ -64,6 +64,7 @@
 int main(int argc, char** argv) {
     const char* host = argc>1 ? argv[1] : "127.0.0.1";
     int port = argc>2 ? atoi(argv[2]) : 5672;
+    int count = argc>3 ? atoi(argv[3]) : 10;
     Connection connection;
     Message message;
     try {
@@ -81,14 +82,15 @@
 
        // Now send some messages ...
 
-       for (int i=0; i<10; i++) {
+       for (int i=0; i<count; i++) {
          stringstream message_data;
          message_data << "Message " << i;
 
          message.setData(message_data.str());
           // Asynchronous transfer sends messages as quickly as
           // possible without waiting for confirmation.
-          async(session).messageTransfer(arg::content=message,  
arg::destination="amq.direct");
+          // async(session).messageTransfer(arg::content=message,  
arg::destination="amq.direct");
+          session.messageTransfer(arg::content=message,  
arg::destination="amq.direct");
        }
        
        // And send a final message to indicate termination.

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp Fri Oct 
17 09:45:24 2008
@@ -36,12 +36,13 @@
 int 
 main ( int argc, char ** argv) 
 {
+
     const char* host = argc>1 ? argv[1] : "127.0.0.1";
     int port = argc>2 ? atoi(argv[2]) : 5672;
     int count  = argc>3 ? atoi(argv[3]) : 30;
-    int delayMs  = argc>4 ? atoi(argv[4]) : 1000;
     string program_name = "PRODUCER";
 
+
     try {
         FailoverConnection connection;
         FailoverSession    * session;
@@ -49,14 +50,23 @@
 
         connection.open ( host, port );
         session = connection.newSession();
+        bool report = true;
         int sent  = 0;
         while ( sent < count ) {
+
             message.getDeliveryProperties().setRoutingKey("routing_key"); 
-            std::cout << "sending message " 
-                      << sent
-                      << " of " 
-                      << count 
-                      << ".\n";
+
+
+            if ( count > 1000 )
+              report = !(sent % 1000);
+
+            if ( report )
+            {
+              std::cout << "sending message " 
+                        << sent
+                        << ".\n";
+            }
+
             stringstream message_data;
             message_data << sent;
             message.setData(message_data.str());
@@ -70,12 +80,12 @@
                                        0,
                                        message
             );
-            usleep ( 1000*delayMs );
+
             ++ sent;
         }
         message.setData ( "That's all, folks!" );
 
-        /* MICK FIXME
+        /* FIXME mgoulish 16 Oct 08
            session.messageTransfer ( arg::content=message,  
            arg::destination="amq.direct"
            ); 
@@ -88,10 +98,17 @@
 
         session->sync();
         connection.close();
+        std::cout << program_name 
+                  << " sent " 
+                  << sent
+                  << " messages.\n";
+
         std::cout << program_name << ": " << " completed without error." << 
std::endl;
         return 0;  
     } catch(const std::exception& error) {
         std::cout << program_name << ": " << error.what() << std::endl;
+        std::cout << program_name << "Exiting.\n";
+        return 1;
     }
     return 1;
 }

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp Fri Oct 17 
09:45:24 2008
@@ -34,150 +34,26 @@
 using namespace std;
 
 
-struct Recorder 
-{
-    unsigned int max_messages;
-    unsigned int * messages_received;
-
-    Recorder ( )
-    {
-        max_messages = 1000;
-        messages_received = new unsigned int [ max_messages ];
-        memset ( messages_received, 0, max_messages * sizeof(int) );
-    }
-
-
-    void
-    received ( int i )
-    {
-        messages_received[i] ++;
-    }
-
-
-
-    void
-    report ( )
-    {
-        int i;
-
-        int last_received_message = 0;
-
-        vector<unsigned int> missed_messages,
-            multiple_messages;
-
-        /*----------------------------------------------------
-          Collect indices of missed and multiple messages.
-          ----------------------------------------------------*/
-        bool seen_first_message = false;
-        for ( i = max_messages - 1; i >= 0; -- i )
-        {
-            if ( ! seen_first_message )
-            {
-                if ( messages_received [i] > 0 )
-                {
-                    seen_first_message = true;
-                    last_received_message = i;
-                }
-            }
-            else
-            {
-                if ( messages_received [i] == 0 )
-                    missed_messages.push_back ( i );
-                else
-                    if ( messages_received [i] > 1 )
-                    {
-                        multiple_messages.push_back ( i );
-                    }
-            }
-        }
-
-        /*--------------------------------------------
-          Report missed messages.
-          --------------------------------------------*/
-        char const * verb = ( missed_messages.size() == 1 ) 
-            ? " was "
-            : " were ";
-
-        char const * plural = ( missed_messages.size() == 1 ) 
-            ? "."
-            : "s.";
-
-        std::cerr << "Listener::shutdown: There"
-                  << verb
-                  << missed_messages.size()
-                  << " missed message"
-                  << plural
-                  << endl;
-
-        for ( i = 0; i < int(missed_messages.size()); ++ i )
-        {
-            std::cerr << "    " << i << " was missed.\n";
-        }
-
-
-        /*--------------------------------------------
-          Report multiple messages.
-          --------------------------------------------*/
-        verb = ( multiple_messages.size() == 1 ) 
-            ? " was "
-            : " were ";
-
-        plural = ( multiple_messages.size() == 1 ) 
-            ? "."
-            : "s.";
-
-        std::cerr << "Listener::shutdown: There"
-                  << verb
-                  << multiple_messages.size()
-                  << " multiple message"
-                  << plural
-                  << endl;
-
-        for ( i = 0; i < int(multiple_messages.size()); ++ i )
-        {
-            std::cerr << "    " 
-                      << multiple_messages[i] 
-                      << " was received " 
-                      << messages_received [ multiple_messages[i] ]
-                      << " times.\n";
-        }
-    
-        /*
-          for ( i = 0; i < last_received_message; ++ i )
-          {
-          std::cerr << "Message " << i << ": " << messages_received[i] << 
std::endl;
-          }
-        */
-    }
-
-};
-
-
-      
-
 struct Listener : public MessageListener
 {
     FailoverSubscriptionManager & subscriptionManager;
-    Recorder & recorder;
-
 
-    Listener ( FailoverSubscriptionManager& subs, 
-               Recorder & recorder
-    );
+    Listener ( FailoverSubscriptionManager& subs );
 
-    void shutdown() { recorder.report(); }
-    void parse_message ( std::string const & msg );
+    void shutdown() { subscriptionManager.stop(); }
   
     virtual void received ( Message & message );
+
+    int count;
 };
 
 
 
 
 
-Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) : 
+Listener::Listener ( FailoverSubscriptionManager & s ) : 
     subscriptionManager(s),
-    recorder(r)
+    count(0)
 {
 }
 
@@ -188,18 +64,19 @@
 void 
 Listener::received ( Message & message ) 
 {
-    std::cerr << "Listener received: " << message.getData() << std::endl;
+    if(! (count%1000))
+      std::cerr << "\t\tListener received: " << message.getData() << std::endl;
+
+    ++ count;
+
     if (message.getData() == "That's all, folks!") 
     {
         std::cout << "Shutting down listener for " << message.getDestination()
                   << std::endl;
-        subscriptionManager.cancel(message.getDestination());
 
-        shutdown();
-    }
-    else
-    {
-        parse_message ( message.getData() );
+        std::cout << "Listener received " << count << " messages.\n";
+        subscriptionManager.cancel(message.getDestination());
+        shutdown ( );
     }
 }
 
@@ -207,21 +84,6 @@
 
 
 
-void
-Listener::parse_message ( const std::string & msg )
-{
-    int msg_number;
-    if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) ) 
-    {
-        std::cerr << "Listener::parse_message error: Can't read message number 
from this message: |" << msg_number << "|\n";
-        return;
-    }
-    recorder.received ( msg_number );
-}
-
-
-
-
 
 
 int 
@@ -235,17 +97,12 @@
 
         FailoverConnection connection;
         FailoverSession    * session;
-        Recorder recorder;
-
-        connection.name = program_name; 
 
         connection.open ( host, port );
         session = connection.newSession();
-        session->name = program_name; 
 
         FailoverSubscriptionManager subscriptions ( session );
-        subscriptions.name = program_name;
-        Listener listener ( subscriptions, recorder );
+        Listener listener ( subscriptions );
         subscriptions.subscribe ( listener, "message_queue" );
         subscriptions.run ( );
 
@@ -256,7 +113,8 @@
     } catch(const std::exception& error) {
         std::cout << program_name  << ": " << error.what() << std::endl;
     }
-    return 1;
+
+    return 0;
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri Oct 17 
09:45:24 2008
@@ -151,8 +151,11 @@
 static const std::string CONN_CLOSED("Connection closed by broker");
 
 void ConnectionImpl::shutdown() {
-
     Mutex::ScopedLock l(lock);
+
+    if ( failureCallback )
+      failureCallback();
+
     if (handler.isClosed()) return;
 
     // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
@@ -161,9 +164,6 @@
         closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, 
CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
     setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, 
CONN_CLOSED));
     handler.fail(CONN_CLOSED);
-
-    if ( failureCallback )
-      failureCallback();
 }
 
 void ConnectionImpl::erase(uint16_t ch) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Fri Oct 17 
09:45:24 2008
@@ -42,6 +42,7 @@
 
 void Subscriber::received(Message& msg)
 {
+
     if (listener) {
         listener->received(msg);
         autoAck.ack(msg, session);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp Fri 
Oct 17 09:45:24 2008
@@ -36,7 +36,6 @@
 
 
 FailoverConnection::FailoverConnection ( ) :
-    name(),
     failoverCompleteTime(0)
 {
     connection.registerFailureCallback
@@ -59,7 +58,6 @@
     settings.host         = host;
     settings.port         = port;
     settings.username     = uid;
-    settings.username     = uid;
     settings.password     = pwd;
     settings.virtualhost  = virtualhost;
     settings.maxFrameSize = maxFrameSize;
@@ -124,9 +122,19 @@
 void 
 FailoverConnection::failover ( )
 {
+  std::vector<FailoverSession *>::iterator sessions_iterator;
+
+  for ( sessions_iterator = sessions.begin(); 
+        sessions_iterator != sessions.end(); 
+        ++ sessions_iterator )
+  {
+    FailoverSession * fs = * sessions_iterator;
+    fs->failover_in_progress = true;
+  }
+
     std::vector<Url> knownBrokers = connection.getKnownBrokers();
     if (knownBrokers.empty())
-        throw Exception(QPID_MSG("FailoverConnection::failover " << name << " 
no known brokers."));
+        throw Exception(QPID_MSG("FailoverConnection::failover no known 
brokers."));
 
     Connection newConnection;
     for (std::vector<Url>::iterator i = knownBrokers.begin(); i != 
knownBrokers.end(); ++i) {
@@ -148,7 +156,6 @@
      */
 
     // FIXME aconway 2008-10-10: thread unsafe, possible race with concurrent 
newSession 
-    std::vector<FailoverSession *>::iterator sessions_iterator;
     for ( sessions_iterator = sessions.begin();
           sessions_iterator < sessions.end();
           ++ sessions_iterator
@@ -173,6 +180,15 @@
         FailoverSession * fs = * sessions_iterator;
         fs->failover ( );
     }
+
+    for ( sessions_iterator = sessions.begin();
+          sessions_iterator < sessions.end();
+          ++ sessions_iterator
+    )
+    {
+        FailoverSession * fs = * sessions_iterator;
+        fs->failover_in_progress = false;
+    }
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h Fri Oct 
17 09:45:24 2008
@@ -71,10 +71,6 @@
 
     void registerFailureCallback ( boost::function<void ()> fn );
 
-    // If you have more than 1 connection and you want to give them 
-    // separate names for debugging...
-    std::string name; 
-
     void failover ( );
 
     struct timeval * failoverCompleteTime;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp Fri Oct 
17 09:45:24 2008
@@ -38,7 +38,7 @@
 namespace client {
 
 FailoverSession::FailoverSession ( ) :
-    name("no_name")
+    failover_in_progress(false)
 {
     // The session is created by FailoverConnection::newSession
     failoverSubscriptionManager = 0;
@@ -170,11 +170,26 @@
 )
 {
 
-    session.messageTransfer ( destination,
-                              acceptMode,
-                              acquireMode,
-                              content
-    );
+    while ( 1 )
+    {
+      try
+      {
+        session.messageTransfer ( destination,
+                                  acceptMode,
+                                  acquireMode,
+                                  content
+        );
+        break;
+      }
+      catch ( ... )
+      {
+        // Take special action only if there is a failover in progress.
+        if ( ! failover_in_progress )
+          break;
+
+        usleep ( 1000 );
+      }
+    }
 }
 
 
@@ -583,7 +598,6 @@
 
     if ( failoverSubscriptionManager )
     {
-        // 
         failoverSubscriptionManager->prepareForFailover ( newSession );
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h Fri Oct 17 
09:45:24 2008
@@ -59,8 +59,6 @@
     FailoverSession ( );
     ~FailoverSession ( );
 
-    std::string name;
-
     framing::FrameSet::shared_ptr get();
 
     SessionId getId() const;
@@ -82,6 +80,8 @@
     
     void sendCompletion ( );
 
+    bool failover_in_progress; 
+
 
 
     // Wrapped functions from Session ----------------------------

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp 
(original)
+++ 
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp 
Fri Oct 17 09:45:24 2008
@@ -33,7 +33,6 @@
 
 
 FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * 
fs) :
-    name("no_name"),
     newSessionIsValid(false),
     no_failover(false)
 {

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h 
Fri Oct 17 09:45:24 2008
@@ -114,9 +114,6 @@
     void prepareForFailover ( Session newSession );
     void failover ( );
 
-    std::string name;
-
-
 
   private:
     sys::Monitor lock;

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct 17 09:45:24 2008
@@ -23,3 +23,5 @@
 .valgrindrc
 cluster_test
 echotest
+cert.password
+test_cert_db

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ssl_test
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ssl_test?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ssl_test (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ssl_test Fri Oct 17 09:45:24 2008
@@ -1,8 +1,8 @@
 #!/bin/sh
 # Run a simple test over SSL
-MY_DIR=$(dirname $(which $0))
-CERT_DIR=${MY_DIR}/test_cert_db
-CERT_PW_FILE=${MY_DIR}/cert.password
+
+CERT_DIR=`pwd`/test_cert_db
+CERT_PW_FILE=`pwd`/cert.password
 HOSTNAME=`hostname`
 COUNT=10000
 
@@ -18,13 +18,13 @@
 }
 
 start_broker() {
-    ${MY_DIR}/../qpidd --daemon --transport ssl --port 0 --ssl-port 0 
--no-data-dir --no-module-dir --auth no\
-        --load-module ${MY_DIR}/../.libs/ssl.so --ssl-cert-db $CERT_DIR 
--ssl-cert-password-file $CERT_PW_FILE > qpidd.port
+    ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir 
--no-module-dir --auth no\
+        --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR 
--ssl-cert-password-file $CERT_PW_FILE > qpidd.port
     PORT=`cat qpidd.port`
 }
 
 stop_broker() {
-    ${MY_DIR}/../qpidd -q --port $PORT
+    ../qpidd -q --port $PORT
 }
 
 if [[ !(-e ${CERT_PW_FILE}) ]] ;  then
@@ -36,8 +36,8 @@
 
 start_broker || error "Could not start broker"
 echo "Running SSL test on port $PORT"
-export QPID_LOAD_MODULE=${MY_DIR}/../.libs/sslconnector.so
+export QPID_LOAD_MODULE=../.libs/sslconnector.so
 export QPID_SSL_CERT_DB=${CERT_DIR}
 export QPID_SSL_CERT_PASSWORD_FILE=${CERT_PW_FILE}
-${MY_DIR}/perftest --count ${COUNT} --port ${PORT} -P ssl -b $HOSTNAME 
--summary
+./perftest --count ${COUNT} --port ${PORT} -P ssl -b $HOSTNAME --summary
 


Reply via email to