Author: aconway
Date: Thu Jul 19 14:52:24 2007
New Revision: 557788

URL: http://svn.apache.org/viewvc?view=rev&rev=557788
Log:

        * Summary:
         - Connect cluster handlers into broker handler chains.
         - Progress on wiring replication.
        
        * src/tests/cluster.mk: Temporarily disabled Cluster test.

        * src/tests/Cluster.h, cpp, Cluster_child.cpp: Updated to use UUIDs.

        * src/qpidd.cpp:
         - Load optional libs (cluster)
         - Include plugin config in options.parse.

        * src/qpid/cluster/SessionManager.h:
         - Create sessions, update handler chains (as HandlerUpdater)
         - Handle frames from cluster.

        * src/qpid/cluster/ClusterPlugin.h, .cpp:
         - renamed from ClusterPluginProvider
         - Create and connect Cluster and SessionManager.
         - Register SessionManager as HandlerUpdater.

        * src/qpid/cluster/Cluster.h, .cpp: Refactor as SessionFrameHandler.

        * src/qpid/broker/Connection.cpp: Apply HandlerUpdaters.

        * src/qpid/broker/Broker.h, .cpp:
         - Initialize plugins
         - Apply HandlerUpdaters

        * src/qpid/Plugin.h, .cpp: Simplified plugin framework.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
      - copied, changed from r557725, 
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h   (with 
props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h
    incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Thu Jul 19 14:52:24 2007
@@ -11,11 +11,13 @@
   qpid/cluster/Cpg.cpp \
   qpid/cluster/Cpg.h \
   qpid/cluster/Dispatchable.h \
-  qpid/cluster/ClusterPluginProvider.cpp \
+  qpid/cluster/ClusterPlugin.cpp \
   qpid/cluster/ClassifierHandler.h \
   qpid/cluster/ClassifierHandler.cpp \
   qpid/cluster/SessionFrame.h \
-  qpid/cluster/SessionFrame.cpp
+  qpid/cluster/SessionFrame.cpp \
+  qpid/cluster/SessionManager.h \
+  qpid/cluster/SessionManager.cpp
 
 libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp Thu Jul 19 14:52:24 2007
@@ -22,19 +22,19 @@
 
 namespace qpid {
 
-std::vector<PluginProvider*> PluginProvider::providers;
+Plugin::Plugins Plugin::plugins;
 
-PluginProvider::PluginProvider() {
+Plugin::Plugin() {
     // Register myself.
-    providers.push_back(this);
+    plugins.push_back(this);
 }
 
-PluginProvider::~PluginProvider() {}
+Plugin::~Plugin() {}
 
-Options*  PluginProvider::getOptions() { return 0; }
+Options*  Plugin::getOptions() { return 0; }
 
-const std::vector<PluginProvider*>& PluginProvider::getProviders() {
-    return providers;
+const Plugin::Plugins& Plugin::getPlugins() {
+    return plugins;
 }
 
 } // namespace qpid

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h Thu Jul 19 14:52:24 2007
@@ -32,70 +32,56 @@
 namespace qpid {
 class Options;
 
-/** Generic base class to allow dynamic casting of generic Plugin objects
- * to concrete types.
- */
-struct Plugin : private boost::noncopyable {
-    virtual ~Plugin() {}
-};
-
-/** Generic interface for anything that uses plug-ins. */
-struct PluginUser : boost::noncopyable {
-    virtual ~PluginUser() {}
-    /**
-     * Called by a PluginProvider to provide a plugin.
-     *
-     * A concrete PluginUser will dynamic_pointer_cast plugin to a
-     * class it knows how to use. A PluginUser should ignore plugins
-     * it does not recognize.
-     *
-     * The user will release its shared_ptr when it is finished using
-     * plugin.
-     */ 
-    virtual void use(const shared_ptr<Plugin>& plugin) = 0;
-};
-
 /**
- * Base for classes that provide plug-ins.
+ * Plug-in base class.
  */
-class PluginProvider : boost::noncopyable
+class Plugin : boost::noncopyable
 {
   public:
     /**
-     * Register the provider to appear in getProviders()
+     * Base interface for targets that receive plug-ins.
+     *
+     * The Broker is a plug-in target, there might be others
+     * in future.
+     */
+    struct Target { virtual ~Target() {} };
+
+    typedef std::vector<Plugin*> Plugins;
+    
+    /**
+     * Construct registers the plug-in to appear in getPlugins().
      * 
-     * A concrete PluginProvider is instantiated as a global or static
+     * A concrete Plugin is instantiated as a global or static
      * member variable in a library so it is registered during static
      * initialization when the library is loaded.
      */
-    PluginProvider();
+    Plugin();
     
-    virtual ~PluginProvider();
+    virtual ~Plugin();
 
     /**
-     * Returns configuration options for the plugin.
+     * Configuration options for the plugin.
      * Then will be updated during option parsing by the host program.
      * 
      * @return An options group or 0 for no options. Default returns 0.
-     * PluginProvider retains ownership of return value.
+     * Plugin retains ownership of return value.
      */
     virtual Options* getOptions();
 
-    /** Provide plugins to a PluginUser.
+    /**
+     * Initialize Plugin functionality on a Target.
      * 
-     * The provider can dynamic_cast the user if it only provides
-     * plugins to certain types of user. Providers should ignore
-     * users they don't recognize.
+     * Plugins should ignore targets they don't recognize.
      */
-    virtual void provide(PluginUser& user) = 0;
+    virtual void initialize(Target&) = 0;
 
-    /** Get the list of pointers to the registered providers.
-     * Caller must not delete the pointers.
+    /** List of registered Plugin objects.
+     * Caller must not delete plugin pointers.
      */
-    static const std::vector<PluginProvider*>& getProviders();
+    static const Plugins& getPlugins();
 
   private:
-    static std::vector<PluginProvider*> providers;
+    static Plugins plugins;
 };
  
 } // namespace qpid

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Jul 19 
14:52:24 2007
@@ -20,7 +20,6 @@
  */
 
 #include "Broker.h"
-
 #include "Connection.h"
 #include "DirectExchange.h"
 #include "FanOutExchange.h"
@@ -40,11 +39,14 @@
 #include "qpid/sys/ConnectionInputHandlerFactory.h"
 #include "qpid/sys/TimeoutHandler.h"
 
+#include <boost/bind.hpp>
+
 #include <iostream>
 #include <memory>
 
 using qpid::sys::Acceptor;
 using qpid::framing::HandlerUpdater;
+using qpid::framing::FrameHandler;
 
 namespace qpid {
 namespace broker {
@@ -98,6 +100,12 @@
         store->recover(recoverer);
     }
 
+    // Initialize plugins
+    const Plugin::Plugins& plugins=Plugin::getPlugins();
+    for (Plugin::Plugins::const_iterator i = plugins.begin();
+         i != plugins.end();
+         i++)
+        (*i)->initialize(*this);
 }
 
 
@@ -149,13 +157,14 @@
     return *acceptor;
 }
 
-void Broker::use(const shared_ptr<Plugin>& plugin) {
-    shared_ptr<HandlerUpdater> updater=
-        dynamic_pointer_cast<HandlerUpdater>(plugin);
-    if (updater) {
-        QPID_LOG(critical, "HandlerUpdater plugins not implemented");
-        // FIXME aconway 2007-06-28: hook into Connections.
-    }
+void Broker::add(const shared_ptr<HandlerUpdater>& updater) {
+    QPID_LOG(debug, "Broker added HandlerUpdater");
+    handlerUpdaters.push_back(updater);
+}
+
+void Broker::update(FrameHandler::Chains& chains) {
+    for_each(handlerUpdaters.begin(), handlerUpdaters.end(),
+             boost::bind(&HandlerUpdater::update, _1, chains));
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Jul 19 14:52:24 
2007
@@ -23,19 +23,23 @@
  */
 
 #include "ConnectionFactory.h"
-#include "qpid/Url.h"
-#include "qpid/Plugin.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Acceptor.h"
-#include "MessageStore.h"
-#include "ExchangeRegistry.h"
 #include "ConnectionToken.h"
 #include "DirectExchange.h"
 #include "DtxManager.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
+#include "ExchangeRegistry.h"
+#include "MessageStore.h"
 #include "QueueRegistry.h"
 #include "qpid/Options.h"
+#include "qpid/Plugin.h"
+#include "qpid/Url.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/Runnable.h"
+
+#include <vector>
 
 namespace qpid { 
 
@@ -48,7 +52,7 @@
 /**
  * A broker instance. 
  */
-class Broker : public sys::Runnable, public PluginUser
+class Broker : public sys::Runnable, public Plugin::Target
 {
   public:
     struct Options : public qpid::Options {
@@ -88,26 +92,32 @@
     /** Shut down the broker */
     virtual void shutdown();
 
-    /** Use a plugin */
-    void use(const shared_ptr<Plugin>& plugin);
+    /** Register a handler updater. */
+    void add(const shared_ptr<framing::HandlerUpdater>&);
+    
+    /** Apply all handler updaters to a handler chain pair. */
+    void update(framing::FrameHandler::Chains&); 
     
     MessageStore& getStore() { return *store; }
     QueueRegistry& getQueues() { return queues; }
     ExchangeRegistry& getExchanges() { return exchanges; }
     uint64_t getStagingThreshold() { return stagingThreshold; }
     DtxManager& getDtxManager() { return dtxManager; }
-    
+
   private:
     sys::Acceptor& getAcceptor() const;
 
     Options config;
     sys::Acceptor::shared_ptr acceptor;
     const std::auto_ptr<MessageStore> store;
+    typedef std::vector<shared_ptr<framing::HandlerUpdater> > HandlerUpdaters;
+
     QueueRegistry queues;
     ExchangeRegistry exchanges;
     uint64_t stagingThreshold;
     ConnectionFactory factory;
     DtxManager dtxManager;
+    HandlerUpdaters handlerUpdaters;
 
     static MessageStore* createStore(const Options& config);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Jul 19 
14:52:24 2007
@@ -104,7 +104,10 @@
 FrameHandler::Chains& Connection::getChannel(ChannelId id) {
     ChannelMap::iterator i = channels.find(id);
     if (i == channels.end()) {
-        FrameHandler::Chains chains(new SemanticHandler(id, *this), new 
OutputHandlerFrameHandler(*out));
+        FrameHandler::Chains chains(
+            new SemanticHandler(id, *this),
+            new OutputHandlerFrameHandler(*out));
+        broker.update(chains);
         i = channels.insert(ChannelMap::value_type(id, chains)).first;
     }        
     return i->second;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Jul 19 
14:52:24 2007
@@ -32,6 +32,7 @@
 using namespace qpid::sys;
 using namespace std;
 
+
 ostream& operator <<(ostream& out, const Cluster& cluster) {
     return out << "cluster[" << cluster.name.str() << " " << cluster.self << 
"]";
 }
@@ -46,38 +47,20 @@
     return out;
 }
 
-struct Cluster::IncomingHandler : public FrameHandler {
-    IncomingHandler(Cluster& c) : cluster(c) {}
-    void handle(AMQFrame& frame) {
-        SessionFrame sf(Uuid(true), frame, SessionFrame::IN);
-        cluster.mcast(sf);
-    }
-    Cluster& cluster;
-};
-
-struct Cluster::OutgoingHandler : public FrameHandler {
-    OutgoingHandler(Cluster& c) : cluster(c) {}
-    void handle(AMQFrame& frame) {
-        SessionFrame sf(Uuid(true), frame, SessionFrame::OUT);
-        cluster.mcast(sf);
-    }
-    Cluster& cluster;
-};
 
-// TODO aconway 2007-06-28: Right now everything is backed up via
-// multicast.  When we have point-to-point backups the
-// Incoming/Outgoing handlers must determine where each frame should
-// be sent: to multicast or only to specific backup(s) via AMQP.
 
 
-Cluster::Cluster(const std::string& name_, const std::string& url_) :
+Cluster::Cluster(
+    const std::string& name_, const std::string& url_,
+    const SessionFrameHandler::Chain& next
+) :
+    SessionFrameHandler(next), 
     cpg(new Cpg(*this)),
     name(name_),
     url(url_), 
-    self(cpg->getLocalNoideId(), getpid()),
-    toChains(new IncomingHandler(*this), new OutgoingHandler(*this))
+    self(cpg->getLocalNoideId(), getpid())
 {
-    QPID_LOG(trace, *this << " Joining cluster.");
+    QPID_LOG(trace, *this << " Joining cluster: " << name_);
     cpg->join(name);
     notify();
     dispatcher=Thread(*this);
@@ -102,7 +85,7 @@
     }
 }
 
-void Cluster::mcast(SessionFrame& frame) {
+void Cluster::handle(SessionFrame& frame) {
     QPID_LOG(trace, *this << " SEND: " << frame);
     Buffer buf(frame.size());
     frame.encode(buf);
@@ -114,7 +97,7 @@
 void Cluster::notify() {
     SessionFrame sf;
     sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), 
url)));
-    mcast(sf);
+    handle(sf);
 }
 
 size_t Cluster::size() const {
@@ -122,11 +105,6 @@
     return members.size();
 }
 
-void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) {
-    Mutex::ScopedLock l(lock);
-    receivedChain = chain;
-}
-
 Cluster::MemberList Cluster::getMembers() const {
     // TODO aconway 2007-07-04: use read/write lock?
     Mutex::ScopedLock l(lock);
@@ -152,7 +130,7 @@
     if (frame.uuid.isNull())
         handleClusterFrame(from, frame.frame);
     else
-        receivedChain->handle(frame);
+        next->handle(frame);
 }
 
 bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
@@ -166,24 +144,22 @@
 }
 
 // Handle cluster control frame from the null session.
-bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
+void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
     // TODO aconway 2007-06-20: use visitor pattern here.
     ClusterNotifyBody* notifyIn=
         dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
-    if (notifyIn) {
+    assert(notifyIn);
         MemberList list;
         {
             Mutex::ScopedLock l(lock);
-            if (!members[from]) 
-                members[from].reset(new Member(url));
+        shared_ptr<Member>& member=members[from];
+        if (!member) 
+            member.reset(new Member(notifyIn->getUrl()));
             else 
-                members[from]->url = notifyIn->getUrl();
-            QPID_LOG(trace, *this << ": member update: " << members);
+            member->url = notifyIn->getUrl();
             lock.notifyAll();
+        QPID_LOG(trace, *this << ": members joined: " << members);
         }
-        return true;
-    }
-    return false;
 }
 
 void Cluster::configChange(
@@ -207,7 +183,7 @@
         // We don't record members joining here, we record them when
         // we get their ClusterNotify message.
     }
-    if (newMembers)
+    if (newMembers)             // Notify new members of my presence.
         notify();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Jul 19 
14:52:24 2007
@@ -37,11 +37,17 @@
 namespace qpid { namespace cluster {
 
 /**
- * Represents a cluster, provides access to data about members.
- * Implements HandlerUpdater to manage handlers that route frames to
- * and from the cluster.
+ * Connection to the cluster. Maintains cluster membership
+ * data.
+ *
+ * As SessionFrameHandler, handles frames by sending them to the
+ * cluster, sends frames received from the cluster to the next
+ * SessionFrameHandler.
+ * 
+ * 
  */
-class Cluster : private sys::Runnable, private Cpg::Handler
+class Cluster : public SessionFrameHandler,
+                private sys::Runnable, private Cpg::Handler
 {
   public:
     /** Details of a cluster member */
@@ -57,8 +63,10 @@
      * Join a cluster.
      * @param name of the cluster.
      * @param url of this broker, sent to the cluster.
+     * @param handler for frames received from the cluster.
      */
-    Cluster(const std::string& name, const std::string& url);
+    Cluster(const std::string& name, const std::string& url,
+            const SessionFrameHandler::Chain& next);
 
     virtual ~Cluster();
 
@@ -70,14 +78,6 @@
 
     bool empty() const { return size() == 0; }
     
-    /** Get handler chains to send incoming/outgoing frames to the cluster */ 
-    framing::FrameHandler::Chains getSendChains() {
-        return toChains;
-    }
-
-    /** Set handler for frames received from the cluster */
-    void setReceivedChain(const SessionFrameHandler::Chain& chain);
-
     /** Wait for predicate(*this) to be true, up to timeout.
      [EMAIL PROTECTED] True if predicate became true, false if timed out.
      *Note the predicate may not be true after wait returns,
@@ -86,13 +86,13 @@
     bool wait(boost::function<bool(const Cluster&)> predicate,
               sys::Duration timeout=sys::TIME_INFINITE) const;
 
+    /** Send frame to the cluster */
+    void handle(SessionFrame&);
+    
   private:
     typedef Cpg::Id Id;
     typedef std::map<Id, shared_ptr<Member> >  MemberMap;
-    typedef std::map<
-        framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
     
-    void mcast(SessionFrame&);  ///< send frame by multicast.
     void notify();              ///< Notify cluster of my details.
 
     void deliver(
@@ -112,7 +112,7 @@
     );
 
     void run();
-    bool handleClusterFrame(Id from, framing::AMQFrame&);
+    void handleClusterFrame(Id from, framing::AMQFrame&);
 
     mutable sys::Monitor lock;
     boost::scoped_ptr<Cpg> cpg;
@@ -120,17 +120,8 @@
     std::string url;
     Id self;
     MemberMap members;
-    ChannelMap channels;
     sys::Thread dispatcher;
     boost::function<void()> callback;
-    framing::FrameHandler::Chains toChains;
-    SessionFrameHandler::Chain receivedChain;
-
-    struct IncomingHandler;
-    struct OutgoingHandler;
-
-  friend struct IncomingHandler;
-  friend struct OutgoingHandler;
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const 
MemberMap::value_type&);

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (from 
r557725, 
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp)
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?view=diff&rev=557788&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp&r1=557725&p2=incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Jul 19 
14:52:24 2007
@@ -16,8 +16,8 @@
  *
  */
 #include "qpid/broker/Broker.h"
-#include "qpid/framing/HandlerUpdater.h"
 #include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/SessionManager.h"
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
 
@@ -26,11 +26,11 @@
 
 using namespace std;
 
-struct ClusterPluginProvider : public PluginProvider {
+struct ClusterPlugin : public Plugin {
 
     struct ClusterOptions : public Options {
         string clusterName;
-        ClusterOptions() {
+        ClusterOptions() : Options("Cluster Options") {
             addOptions()
                 ("cluster", optValue(clusterName, "NAME"),
                  "Join the cluster named NAME");
@@ -39,22 +39,25 @@
 
     ClusterOptions options;
     shared_ptr<Cluster> cluster;
+    shared_ptr<SessionManager> sessions;
 
     Options* getOptions() {
         return &options;
     }
 
-    void provide(PluginUser& user) {
-        broker::Broker* broker = dynamic_cast<broker::Broker*>(&user);
+    void initialize(Plugin::Target& target) {
+        broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         // Only provide to a Broker, and only if the --cluster config is set.
         if (broker && !options.clusterName.empty()) {
             assert(!cluster); // A process can only belong to one cluster.
-            cluster.reset(new Cluster(options.clusterName, broker->getUrl()));
-            // FIXME aconway 2007-06-29: register HandlerUpdater.
+            sessions.reset(new SessionManager());
+            cluster.reset(new Cluster(options.clusterName, broker->getUrl(), 
sessions));
+            sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10: 
+            broker->add(sessions);
         }
     }
 };
 
-static ClusterPluginProvider instance; // Static initialization.
+static ClusterPlugin instance; // Static initialization.
     
 }} // namespace qpid::cluster

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp?view=auto&rev=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp Thu Jul 
19 14:52:24 2007
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "SessionManager.h"
+#include "ClassifierHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+using namespace sys;
+
+/** Wrap plain AMQFrames in SessionFrames */
+struct FrameWrapperHandler : public FrameHandler {
+
+    FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain 
next_)
+        : uuid(id), direction(dir), next(next_) {
+        assert(!uuid.isNull());
+    }
+    
+    void handle(AMQFrame& frame) {
+        SessionFrame sf(uuid, frame, direction);
+        assert(next);
+        next->handle(sf);
+    }
+
+    Uuid uuid;
+    bool direction;
+    SessionFrameHandler::Chain next;
+};
+
+SessionManager::SessionManager() {}
+
+void SessionManager::update(FrameHandler::Chains& chains)
+{
+    Mutex::ScopedLock l(lock);
+    // Create a new local session, store local chains.
+    Uuid uuid(true);
+    sessions[uuid] = chains;
+    
+    // Replace local incoming chain. Build from the back.
+    // 
+    // TODO aconway 2007-07-05: Currently mcast wiring, bypass
+    // everythign else.
+    assert(clusterSend);
+    FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, 
clusterSend));
+    FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in));
+    chains.in = classify;
+
+    // FIXME aconway 2007-07-05: Need to stop bypassed frames
+    // from overtaking mcast frames.
+    // 
+
+    // Leave outgoing chain unmodified.
+    // TODO aconway 2007-07-05: Failover will require replication of
+    // outgoing frames to session replicas.
+    
+}
+
+void SessionManager::handle(SessionFrame& frame) {
+    // Incoming from frame.
+    FrameHandler::Chains chains;
+    {
+        Mutex::ScopedLock l(lock);
+        SessionMap::iterator i = sessions.find(frame.uuid);
+        if (i == sessions.end()) {
+            QPID_LOG(trace, "Non-local frame cluster: " << frame.frame);
+            chains = nonLocal;
+        }
+        else {
+            QPID_LOG(trace, "Local frame from cluster: " << frame.frame);
+            chains = i->second;
+        }
+    }
+    FrameHandler::Chain chain =
+        chain = frame.isIncoming ? chains.in : chains.out;
+    // TODO aconway 2007-07-11: Should this be assert(chain)
+    if (chain)
+        chain->handle(frame.frame);
+
+    // TODO aconway 2007-07-05: Here's where we should unblock frame
+    // dispatch for the channel.
+}
+
+}} // namespace qpid::cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h?view=auto&rev=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h Thu Jul 19 
14:52:24 2007
@@ -0,0 +1,68 @@
+#ifndef QPID_CLUSTER_SESSIONMANAGER_H
+#define QPID_CLUSTER_SESSIONMANAGER_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/BrokerChannel.h"
+#include "qpid/cluster/SessionFrame.h"
+#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Manage sessions and handler chains for the cluster.
+ * 
+ */
+class SessionManager : public framing::HandlerUpdater, public 
SessionFrameHandler
+{
+  public:
+    SessionManager();
+
+    /** Set the handler to send to the cluster */
+    void setClusterSend(const SessionFrameHandler::Chain& send) { 
clusterSend=send; }
+    
+    /** As ChannelUpdater update the handler chains. */
+    void update(framing::FrameHandler::Chains& chains);
+
+    /** As SessionFrameHandler handle frames received from the cluster */
+    void handle(SessionFrame&);
+
+    /** Get ChannelID for UUID. Return 0 if no mapping */
+    framing::ChannelId getChannelId(const framing::Uuid&) const;
+    
+  private:
+    typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap;
+
+    sys::Mutex lock;
+    SessionFrameHandler::Chain clusterSend;
+    SessionMap sessions;
+    framing::FrameHandler::Chains nonLocal;
+};
+
+
+}} // namespace qpid::cluster
+
+
+
+#endif  /*!QPID_CLUSTER_CHANNELMANAGER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h Thu Jul 19 
14:52:24 2007
@@ -26,13 +26,15 @@
 namespace qpid {
 namespace framing {
 
-/** Plugin object that can update handler chains. */
-struct HandlerUpdater : public Plugin {
+/** Interface for objects that can update handler chains. */
+struct HandlerUpdater {
+    virtual ~HandlerUpdater() {}
+    
     /** Update the handler chains.
      [EMAIL PROTECTED] id Unique identifier for channel or session.
      [EMAIL PROTECTED] chains Handler chains to be updated.
      */
-    virtual void update(ChannelId id, FrameHandler::Chains& chains) = 0;
+    virtual void update(FrameHandler::Chains& chains) = 0;
 };
 
 }} // namespace qpid::framing

Modified: incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp Thu Jul 19 14:52:24 2007
@@ -24,6 +24,8 @@
 #include "qpid/log/Statement.h"
 #include "qpid/log/Options.h"
 #include "qpid/log/Logger.h"
+#include "qpid/Plugin.h"
+#include "qpid/sys/Shlib.h"
 #include "config.h"
 #include <boost/filesystem/path.hpp>
 #include <iostream>
@@ -54,10 +56,10 @@
 
 
 struct QpiddOptions : public qpid::Options {
-    DaemonOptions daemon;
+    CommonOptions common;
     Broker::Options broker;
+    DaemonOptions daemon;
     qpid::log::Options log;
-    CommonOptions common;
     
     QpiddOptions() : qpid::Options("Options") {
         common.config = "/etc/qpidd.conf";
@@ -65,6 +67,12 @@
         add(broker);
         add(daemon);
         add(log);
+        const Plugin::Plugins& plugins=
+            Plugin::getPlugins();
+        for (Plugin::Plugins::const_iterator i = plugins.begin();
+             i != plugins.end();
+             ++i)
+            add(*(*i)->getOptions());
     }
 
     void usage() const {
@@ -74,7 +82,7 @@
 
 // Globals
 shared_ptr<Broker> brokerPtr;
-QpiddOptions options;
+auto_ptr<QpiddOptions> options;
 
 void shutdownHandler(int signal){
     QPID_LOG(notice, "Shutting down on signal " << signal);
@@ -84,45 +92,60 @@
 struct QpiddDaemon : public Daemon {
     /** Code for parent process */
     void parent() {
-        uint16_t port = wait(options.daemon.wait);
-        if (options.broker.port == 0)
+        uint16_t port = wait(options->daemon.wait);
+        if (options->broker.port == 0)
             cout << port << endl; 
     }
 
     /** Code for forked child process */
     void child() {
-        brokerPtr.reset(new Broker(options.broker));
+        brokerPtr.reset(new Broker(options->broker));
         uint16_t port=brokerPtr->getPort();
         ready(port);            // Notify parent.
         brokerPtr->run();
     }
 };
 
+void tryShlib(const char* libname) {
+    try {
+        Shlib shlib(libname);
+    }
+    catch (const exception& e) {
+        // TODO aconway 2007-07-09: Should log failures as INFO
+        // at least, but we try shlibs before logging is configured.
+    }
+}
+  
 
 int main(int argc, char* argv[])
 {
     try {
-        options.parse(argc, argv, options.common.config);
-        qpid::log::Logger::instance().configure(options.log, argv[0]);
+        // Load optional modules
+        tryShlib("libqpidcluster.so.0");
+
+        // Parse options
+        options.reset(new QpiddOptions());
+        options->parse(argc, argv, options->common.config);
+        qpid::log::Logger::instance().configure(options->log, argv[0]);
 
         // Options that just print information.
-        if(options.common.help || options.common.version) {
-            if (options.common.version) 
+        if(options->common.help || options->common.version) {
+            if (options->common.version) 
                 cout << "qpidd (" << PACKAGE_NAME << ") version "
                      << PACKAGE_VERSION << endl;
-            else if (options.common.help)
-                options.usage();
+            else if (options->common.help)
+                options->usage();
             return 0;
         }
 
         // Options that affect a running daemon.
-        if (options.daemon.check || options.daemon.quit) {
-            pid_t pid = Daemon::getPid(options.broker.port);
+        if (options->daemon.check || options->daemon.quit) {
+            pid_t pid = Daemon::getPid(options->broker.port);
             if (pid < 0) 
                 return 1;
-            if (options.daemon.check)
+            if (options->daemon.check)
                 cout << pid << endl;
-            if (options.daemon.quit && kill(pid, SIGINT) < 0)
+            if (options->daemon.quit && kill(pid, SIGINT) < 0)
                 throw Exception("Failed to stop daemon: " + strError(errno));
             return 0;
         }
@@ -139,14 +162,14 @@
         signal(SIGTTOU,SIG_IGN);
         signal(SIGTTIN,SIG_IGN);
             
-        if (options.daemon.daemon) {
+        if (options->daemon.daemon) {
             // Fork the daemon
             QpiddDaemon d;
             d.fork();
         } 
         else {                  // Non-daemon broker.
-            brokerPtr.reset(new Broker(options.broker));
-            if (options.broker.port == 0)
+            brokerPtr.reset(new Broker(options->broker));
+            if (options->broker.port == 0)
                 cout << uint16_t(brokerPtr->getPort()) << endl; 
             brokerPtr->run(); 
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp Thu Jul 19 14:52:24 2007
@@ -36,11 +36,14 @@
 BOOST_AUTO_TEST_CASE(testClusterOne) {
     TestCluster cluster("clusterOne", "amqp:one:1");
     AMQFrame frame(VER, 1, new ChannelPingBody(VER));
-    cluster.getSendChains().in->handle(frame);
+    Uuid id(true);
+    SessionFrame send(id, frame, true);
+    cluster.handle(send);
     BOOST_REQUIRE(cluster.received.waitFor(1));
 
     SessionFrame& sf=cluster.received[0];
     BOOST_CHECK(sf.isIncoming);
+    BOOST_CHECK_EQUAL(id, sf.uuid);
     BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
     
     BOOST_CHECK_EQUAL(1u, cluster.size());
@@ -60,9 +63,12 @@
 
         // Exchange frames with child.
         AMQFrame frame(VER, 1, new ChannelPingBody(VER));
-        cluster.getSendChains().in->handle(frame);
+        Uuid id(true);
+        SessionFrame send(id, frame, true);
+        cluster.handle(send);
         BOOST_REQUIRE(cluster.received.waitFor(1));
         SessionFrame& sf=cluster.received[0];
+        BOOST_CHECK_EQUAL(id, sf.uuid);
         BOOST_CHECK(sf.isIncoming);
         BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
         BOOST_REQUIRE(cluster.received.waitFor(2));

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h Thu Jul 19 14:52:24 2007
@@ -24,7 +24,10 @@
 #include "qpid/framing/ChannelOkBody.h"
 #include "qpid/framing/BasicGetOkBody.h"
 #include "qpid/log/Logger.h"
+
 #include <boost/bind.hpp>
+#include <boost/test/test_tools.hpp>
+
 #include <iostream>
 #include <vector>
 #include <functional>
@@ -69,13 +72,12 @@
 
 struct TestCluster : public Cluster
 {
-    TestCluster(string name, string url) : Cluster(name, url)
-    {
-        setReceivedChain(make_shared_ptr(&received, nullDeleter));
-    }
+    TestCluster(string name, string url)
+        : Cluster(name, url, make_shared_ptr(&received, nullDeleter)) {}
 
     /** Wait for cluster to be of size n. */
     bool waitFor(size_t n) {
+        BOOST_CHECKPOINT("About to call Cluster::wait");
         return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), 
n));
     }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp Thu Jul 19 
14:52:24 2007
@@ -39,7 +39,9 @@
     BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
 
     AMQFrame frame(VER, 1, new ChannelOkBody(VER));
-    cluster.getSendChains().out->handle(frame);
+    Uuid id(true);
+    SessionFrame sf(id, frame, false);
+    cluster.handle(sf);
     BOOST_REQUIRE(cluster.received.waitFor(2));
     BOOST_CHECK(!cluster.received[1].isIncoming);
     BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, 
*cluster.received[1].frame.getBody());

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Jul 19 14:52:24 2007
@@ -170,7 +170,7 @@
 # ltmain invocations, one may corrupt the temporaries of the other.
 .NOTPARALLEL:
 
-CLEANFILES=valgrind.out qpidd.log .valgrindrc .valgrind.supp dummy_test 
$(unit_wrappers)
+CLEANFILES=valgrind.out *.log *.vglog .valgrindrc .valgrind.supp dummy_test 
$(unit_wrappers)
 MAINTAINERCLEANFILES=gen.mk
 
 interop_runner_SOURCES =       \

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?view=diff&rev=557788&r1=557787&r2=557788
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Thu Jul 19 14:52:24 2007
@@ -20,10 +20,11 @@
 Cpg_SOURCES=Cpg.cpp
 Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
 
-TESTS+=Cluster
-check_PROGRAMS+=Cluster
-Cluster_SOURCES=Cluster.cpp Cluster.h
-Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
+# FIXME aconway 2007-07-19: 
+# TESTS+=Cluster
+# check_PROGRAMS+=Cluster
+# Cluster_SOURCES=Cluster.cpp Cluster.h
+# Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
 
 check_PROGRAMS+=Cluster_child 
 Cluster_child_SOURCES=Cluster_child.cpp Cluster.h


Reply via email to