Author: gsim
Date: Fri Nov  7 06:08:29 2008
New Revision: 712127

URL: http://svn.apache.org/viewvc?rev=712127&view=rev
Log:
* Added some doxygen comments for FailoverManager
* Added means for application to alter the order in which urls are tried (or 
indeed the list of urls to try)


Modified:
    incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp
    incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp?rev=712127&r1=712126&r2=712127&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp Fri Nov  
7 06:08:29 2008
@@ -19,67 +19,40 @@
  *
  */
 
-#include <qpid/client/FailoverConnection.h>
+#include <qpid/client/FailoverManager.h>
 #include <qpid/client/Session.h>
+#include <qpid/Exception.h>
 
-#include <unistd.h>
 #include <cstdlib>
 #include <iostream>
-#include <fstream>
 
 using namespace qpid::client;
-using namespace qpid::framing;
-
 
 using namespace std;
 
-
-
-
-int 
-main ( int argc, char ** argv) 
+int main(int argc, char ** argv) 
 {
-  if ( argc < 3 )
-  {
-    std::cerr << "Usage: ./declare_queues host cluster_port_file_name\n";
-    std::cerr << "i.e. for host: 127.0.0.1\n";
-    exit(1);
-  }
-
-  const char * host = argv[1];
-  int port = atoi(argv[2]);
-
-
-  try 
-  {
-    FailoverConnection connection;
-    FailoverSession    * session;
-
-    connection.open ( host, port );
-    session = connection.newSession();
-
-    session->queueDeclare ( "message_queue");
+    ConnectionSettings settings;
+    if (argc > 1) settings.host = argv[1];
+    if (argc > 2) settings.port = atoi(argv[2]);
+    
+    FailoverManager connection(settings);
+    try {
+        bool complete = false;
+        while (!complete) {
+            Session session = connection.connect().newSession();
+            try {
+                session.queueDeclare(arg::queue="message_queue");
+                complete = true;
+            } catch (const qpid::TransportFailure&) {}
+        }
+        connection.close();
+        return 0;
+    } catch (const std::exception& error) {
+        std::cout << "Failed:" << error.what() << std::endl;
+        return 1;
+    }
     
-    /*
-    session->exchangeBind 
-      ( arg::exchange="amq.direct", 
-        arg::queue="message_queue", 
-        arg::bindingKey="routing_key"
-      );
-     * */
-    session->exchangeBind ( "message_queue",
-                           "amq.direct", 
-                           "routing_key"
-                         );
-    connection.close();
-    return 0;
-  }
-  catch ( const std::exception& error ) 
-  {
-    std::cout << error.what() << std::endl;
-  }
-
-  return 1;
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp?rev=712127&r1=712126&r2=712127&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp Fri 
Nov  7 06:08:29 2008
@@ -35,13 +35,16 @@
 using namespace std;
 
 
-class Listener : public MessageListener, public FailoverManager::Command
+class Listener : public MessageListener, 
+                 public FailoverManager::Command, 
+                 public FailoverManager::ReconnectionStrategy
 {
   public:
     Listener();
     void received(Message& message);
     void execute(AsyncSession& session, bool isRetry);
     void check();
+    void editUrlList(std::vector<Url>& urls);
   private:
     Subscription subscription;
     uint count;
@@ -90,14 +93,23 @@
     subs.run();
 }
 
+void Listener::editUrlList(std::vector<Url>& urls)
+{
+    /**
+     * A more realistic algorithm would be to search through the list
+     * for prefered hosts and ensure they come first in the list.
+     */
+    if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, 
urls.end());
+}
+
 int main(int argc, char ** argv)
 {
     ConnectionSettings settings;
     if (argc > 1) settings.host = argv[1];
     if (argc > 2) settings.port = atoi(argv[2]);
     
-    FailoverManager connection(settings);
     Listener listener;
+    FailoverManager connection(settings, &listener);
 
     try {
         connection.execute(listener);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp?rev=712127&r1=712126&r2=712127&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp Fri Nov  
7 06:08:29 2008
@@ -28,7 +28,8 @@
 
 using qpid::sys::Monitor;
 
-FailoverManager::FailoverManager(const ConnectionSettings& s) : settings(s), 
state(IDLE) {}
+FailoverManager::FailoverManager(const ConnectionSettings& s, 
+                                 ReconnectionStrategy* rs) : settings(s), 
strategy(rs), state(IDLE) {}
 
 void FailoverManager::execute(Command& c)
 {
@@ -38,11 +39,11 @@
         try {
             AsyncSession session = connect().newSession();
             c.execute(session, retry);
-            session.sync();//TODO: shouldn't be required, but seems there is a 
bug in session
+            session.sync();//TODO: shouldn't be required
             session.close();
             completed = true;
         } catch(const TransportFailure&) {
-            retry = true;
+            retry = true;            
         }            
     }
 }
@@ -86,6 +87,7 @@
 void FailoverManager::attempt(Connection& c, ConnectionSettings s, 
std::vector<Url> urls)
 {
     Monitor::ScopedUnlock u(lock);
+    if (strategy) strategy->editUrlList(urls);
     if (urls.empty()) {
         attempt(c, s);
     } else {
@@ -105,7 +107,9 @@
 void FailoverManager::attempt(Connection& c, ConnectionSettings s)
 {
     try {
+        QPID_LOG(info, "Attempting to connect to " << s.host << " on " << 
s.port << "..."); 
         c.open(s);
+        QPID_LOG(info, "Connected to " << s.host << " on " << s.port); 
     } catch (const Exception& e) {
         QPID_LOG(info, "Could not connect to " << s.host << " on " << s.port 
<< ": " << e.what()); 
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h?rev=712127&r1=712126&r2=712127&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h Fri Nov  7 
06:08:29 2008
@@ -38,21 +38,75 @@
 };
 
 /**
- * Utility to handle reconnection.
+ * Utility to manage failover.
  */
 class FailoverManager
 {
   public:
+    /**
+     * Interface to implement for doing work that can be resumed on
+     * failover
+     */
     struct Command
     {
+        /**
+         * This method will be called with isRetry=false when the
+         * command is first executed. The session to use for the work
+         * will be passed to the implementing class. If the connection
+         * fails while the execute call is in progress, the
+         * FailoverManager controlling the execution will re-establish
+         * a connection, open a new session and call back to the
+         * Command implementations execute method with the new session
+         * and isRetry=true.
+         */
         virtual void execute(AsyncSession& session, bool isRetry) = 0;
         virtual ~Command() {}
     };
 
-    FailoverManager(const ConnectionSettings& settings);
+    struct ReconnectionStrategy
+    {
+        /**
+         * This method is called by the FailoverManager prior to
+         * establishing a connection (or re-connection) and can be
+         * used if the application wishes to edit or re-order the list
+         * which will default to the list of known brokers for the
+         * last connection.
+         */
+        virtual void editUrlList(std::vector<Url>&  urls) = 0;
+        virtual ~ReconnectionStrategy() {}
+    };
+
+    /**
+     * Create a manager to control failover for a logical connection.
+     * 
+     * @param settings the initial connection settings
+     * @param strategy optional stratgey callback allowing application
+     * to edit or reorder the list of urls to which reconnection is
+     * attempted
+     */
+    FailoverManager(const ConnectionSettings& settings, ReconnectionStrategy* 
strategy = 0);
+    /**
+     * Return the current connection if open or attept to reconnect to
+     * the specified list of urls. If no list is specified the list of
+     * known brokers from the last connection will be used. If no list
+     * is specified and this is the first connect attempt, the host
+     * and port from the initial settings will be used.
+     */
     Connection& connect(std::vector<Url> brokers = std::vector<Url>());
+    /**
+     * Return the current connection whether open or not
+     */
     Connection& getConnection();
+    /**
+     * Close the current connection
+     */
     void close();
+    /**
+     * Reliably execute the specified command. This involves creating
+     * a session on which to carry out the work of the command,
+     * handling failover occuring while exeuting that command and
+     * re-starting the work.
+     */
     void execute(Command&);
   private:
     enum State {IDLE, CONNECTING, CANT_CONNECT};
@@ -60,6 +114,7 @@
     qpid::sys::Monitor lock;
     Connection connection;
     ConnectionSettings settings;
+    ReconnectionStrategy* strategy;
     State state;
 
     void attempt(Connection&, ConnectionSettings settings, std::vector<Url> 
urls);


Reply via email to