Author: aconway
Date: Mon Jul  2 15:35:33 2007
New Revision: 552614

URL: http://svn.apache.org/viewvc?view=rev&rev=552614
Log:

2007-06-30    <[EMAIL PROTECTED]>

        * src/qpid/cluster/Cluster.cpp: Refactor - expose 4 handler points
        for all traffic to/from cluster. Removed HandlerUpdater functionality,
        separate class. Cluster only deals with membership and connecting
        the 4 handler points to CPG multicast.

        * src/tests/cluster.mk: Dropped newgrp ais wrapper scripts, its
        much simpler if the user just does "newgrp ais" before building.

        * src/tests/ais_check: Test script to check if users gid is ais
        and give clear notice if not.

        * src/tests/Cluster.cpp: Updated for changes to Cluster. 

        * src/qpid/cluster/Cpg.cpp: Better messages for common errors.

        * Handler.h: Remove nextHandler() minor convenience is outweighted
        by risk of undetected errors if handlers that expect next() to be
        set are called when it's not set.

        * src/qpid/cluster/Cpg.cpp: Added logging. Replaced boost::function
        with traditional virtual interface (nasty stack traces.)

Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/ais_check   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ChannelManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ChannelManager.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Mon Jul  2 15:35:33 2007
@@ -11,8 +11,6 @@
   qpid/cluster/Cpg.cpp \
   qpid/cluster/Cpg.h \
   qpid/cluster/Dispatchable.h \
-  qpid/cluster/ChannelManager.h \
-  qpid/cluster/ChannelManager.cpp \
   qpid/cluster/ClusterPluginProvider.cpp
 
 libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Jul  2 
15:35:33 2007
@@ -32,65 +32,89 @@
 using namespace std;
 
 ostream& operator <<(ostream& out, const Cluster& cluster) {
-    return out << cluster.name.str() << "(" << cluster.self << ")";
+    return out << "cluster[" << cluster.name.str() << " " << cluster.self << 
"]";
 }
 
-namespace {
-Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1];
-struct StatusMapInit {
-    StatusMapInit() {
-       statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN;
-       statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE;
-       statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN;
-       statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP;
-       statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN;
-    }
-} instance;
+ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
+    return out << m.first << "=" << m.second->url;
 }
 
-Cluster::Member::Member(const cpg_address& addr)
-    : status(statusMap[addr.reason]) {}
+ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
+    ostream_iterator<Cluster::MemberMap::value_type> o(out, " ");
+    copy(members.begin(), members.end(), o);
+    return out;
+}
+
+namespace {
+
+/** We mark the high bit of a frame's channel number to know if it's
+ * an incoming or outgoing frame when frames arrive via multicast.
+ */ 
+bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; }
+bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); }
+void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; }
+void markIncoming(AMQFrame&) { /*noop*/ }
+void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; }
 
-void Cluster::notify() {
-    ProtocolVersion version;     
-    // TODO aconway 2007-06-25: Use proxy here.
-    AMQFrame frame(version, 0,
-                   make_shared_ptr(new ClusterNotifyBody(version, url)));
-    handle(frame);
 }
 
+struct Cluster::IncomingHandler : public FrameHandler {
+    IncomingHandler(Cluster& c) : cluster(c) {}
+    void handle(AMQFrame& frame) {
+        markIncoming(frame);
+        cluster.mcast(frame);
+    }
+    Cluster& cluster;
+};
+
+struct Cluster::OutgoingHandler : public FrameHandler {
+    OutgoingHandler(Cluster& c) : cluster(c) {}
+    void handle(AMQFrame& frame) {
+        markOutgoing(frame);
+        cluster.mcast(frame);
+    }
+    Cluster& cluster;
+};
+
+
+// TODO aconway 2007-06-28: Right now everything is backed up via
+// multicast.  When we have point-to-point backups the
+// Incoming/Outgoing handlers must determine where each frame should
+// be sent: to multicast or only to specific backup(s) via AMQP.
+
 Cluster::Cluster(const std::string& name_, const std::string& url_) :
+    cpg(new Cpg(*this)),
     name(name_),
     url(url_), 
-    cpg(new Cpg(
-            boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6),
-            boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, 
_6, _7, _8))),
-    self(cpg->getLocalNoideId(), getpid())
-{}
-
-void Cluster::join(FrameHandler::Chain next) {
+    self(cpg->getLocalNoideId(), getpid()),
+    toChains(new IncomingHandler(*this), new OutgoingHandler(*this))
+{
     QPID_LOG(trace, *this << " Joining cluster.");
-    next = next;
-    dispatcher=Thread(*this);
     cpg->join(name);
     notify();
+    dispatcher=Thread(*this);
+    // Wait till we show up in the cluster map.
+    {
+        Mutex::ScopedLock l(lock);
+        while (empty())
+            lock.wait();
+    }
 }
 
 Cluster::~Cluster() {
-    if (cpg) {
-        try {
-            QPID_LOG(trace, *this << " Leaving cluster.");
-            cpg->leave(name);
-            cpg.reset();
-            dispatcher.join();
-        } catch (const std::exception& e) {
-            QPID_LOG(error, "Exception leaving cluster " << e.what());
-        }
+    QPID_LOG(trace, *this << " Leaving cluster.");
+    try {
+        cpg->leave(name);
+        cpg.reset();
+        dispatcher.join();
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(error, "Exception leaving cluster " << *this << ": "
+                 << e.what());
     }
 }
 
-void Cluster::handle(AMQFrame& frame) {
-    assert(cpg);
+void Cluster::mcast(AMQFrame& frame) {
     QPID_LOG(trace, *this << " SEND: " << frame);
     Buffer buf(frame.size());
     frame.encode(buf);
@@ -99,11 +123,24 @@
     cpg->mcast(name, &iov, 1);
 }
 
+void Cluster::notify() {
+    // TODO aconway 2007-06-25: Use proxy here.
+    ProtocolVersion version;
+    AMQFrame frame(version, 0,
+                   make_shared_ptr(new ClusterNotifyBody(version, url)));
+    mcast(frame);
+}
+
 size_t Cluster::size() const {
     Mutex::ScopedLock l(lock);
     return members.size();
 }
 
+void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) {
+    Mutex::ScopedLock l(lock);
+    fromChains = chains;
+}
+
 Cluster::MemberList Cluster::getMembers() const {
     Mutex::ScopedLock l(lock);
     MemberList result(members.size());
@@ -112,7 +149,7 @@
     return result;        
 }
 
-void Cluster::cpgDeliver(
+void Cluster::deliver(
         cpg_handle_t /*handle*/,
         struct cpg_name* /* group */,
         uint32_t nodeid,
@@ -124,61 +161,71 @@
     Buffer buf(static_cast<char*>(msg), msg_len);
     AMQFrame frame;
     frame.decode(buf);
-    QPID_LOG(trace, *this << " RECV: " << frame);
-    // TODO aconway 2007-06-20: use visitor pattern.
+    QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
+    if (!handleClusterFrame(from, frame)) {
+        FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : 
fromChains.out;
+        unMark(frame);
+        if (chain)
+            chain->handle(frame);
+    }
+}
+
+bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
+                   Duration timeout) const
+{
+    AbsTime deadline(now(), timeout);
+    Mutex::ScopedLock l(lock);
+    while (!predicate(*this) && lock.wait(deadline))
+        ;
+    return (predicate(*this));
+}
+        
+bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
+    // TODO aconway 2007-06-20: use visitor pattern here.
     ClusterNotifyBody* notifyIn=
         dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
     if (notifyIn) {
+        MemberList list;
         {
             Mutex::ScopedLock l(lock);
-            assert(members[from]);
-            members[from]->url = notifyIn->getUrl();
-            members[from]->status = Member::BROKER;
+            if (!members[from]) 
+                members[from].reset(new Member(url));
+            else 
+                members[from]->url = notifyIn->getUrl();
+            QPID_LOG(trace, *this << ": member update: " << members);
+            lock.notifyAll();
         }
-        if (callback) 
-            callback();
+        return true;
     }
-    else 
-        next->handle(frame);
+    return false;
 }
 
-void Cluster::cpgConfigChange(
+void Cluster::configChange(
     cpg_handle_t /*handle*/,
     struct cpg_name */*group*/,
-    struct cpg_address *current, int nCurrent,
+    struct cpg_address */*current*/, int /*nCurrent*/,
     struct cpg_address *left, int nLeft,
-    struct cpg_address *joined, int nJoined
-)
+    struct cpg_address *joined, int nJoined)
 {
-    QPID_LOG(trace,
-             *this << " Configuration change. " << endl
-             << "  Joined: " << make_pair(joined, nJoined) << endl
-             << "  Left: " << make_pair(left, nLeft) << endl
-             << "  Current: " << make_pair(current, nCurrent));
-
-    bool needNotify=false;
+    bool newMembers=false;
     MemberList updated;
     {
         Mutex::ScopedLock l(lock);
-        for (int i = 0; i < nJoined; ++i) {
-            Id id(current[i]);
-            members[id].reset(new Member(current[i]));
-            if (id != self)
-                needNotify = true; // Notify new members other than myself.
+        if (nLeft) {
+            for (int i = 0; i < nLeft; ++i) 
+                members.erase(Id(left[i]));
+            QPID_LOG(trace, *this << ": members left: " << members);
+            lock.notifyAll();
         }
-        for (int i = 0; i < nLeft; ++i) 
-            members.erase(Id(current[i]));
-    } // End of locked scope.
-    if (needNotify)
+        newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);
+        // We don't record members joining here, we record them when
+        // we get their ClusterNotify message.
+    }
+    if (newMembers)
         notify();
-    if (callback)
-        callback();
 }
 
-void Cluster::setCallback(boost::function<void()> f) { callback=f; }
-
 void Cluster::run() {
-    assert(cpg);
     cpg->dispatchBlocking();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Jul  2 
15:35:33 2007
@@ -21,89 +21,80 @@
 
 #include "qpid/cluster/Cpg.h"
 #include "qpid/framing/FrameHandler.h"
-#include "qpid/sys/Thread.h"
+#include "qpid/shared_ptr.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Runnable.h"
-#include "qpid/shared_ptr.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/log/Logger.h"
+
 #include <boost/function.hpp>
 #include <boost/scoped_ptr.hpp>
+
 #include <map>
 #include <vector>
 
-namespace qpid {
-
-namespace broker {
-class HandlerUpdater;
-}
-
-namespace cluster {
-
-class ChannelManager;
+namespace qpid { namespace cluster {
 
 /**
  * Represents a cluster, provides access to data about members.
- *
- * Implements a FrameHandler that multicasts frames to the cluster.
- *
- * Requires a handler for frames arriving from the cluster,
- * normally a ChannelManager but other handlers could be interposed
- * for testing, logging etc.
+ * Implements HandlerUpdater to manage handlers that route frames to
+ * and from the cluster.
  */
-class Cluster : public framing::FrameHandler, private sys::Runnable {
+class Cluster : private sys::Runnable, private Cpg::Handler
+{
   public:
     /** Details of a cluster member */
     struct Member {
         typedef shared_ptr<const Member> Ptr;
-        /** Status of a cluster member. */
-        enum Status {
-            JOIN,               ///< Process joined the group.
-            LEAVE,              ///< Process left the group cleanly.
-            NODEDOWN,           ///< Process's node went down.
-            NODEUP,             ///< Process's node joined the cluster.
-            PROCDOWN,           ///< Process died without leaving.
-            BROKER              ///< Broker details are available.
-        };
-
-        Member(const cpg_address&);
-        std::string url;
-        Status status;
+        Member(const std::string& url_) : url(url_) {}
+        std::string url;        ///< Broker address.
     };
     
     typedef std::vector<Member::Ptr> MemberList;
-    
+
     /**
-     * Create a cluster object but do not joing.
+     * Join a cluster.
      * @param name of the cluster.
      * @param url of this broker, sent to the cluster.
      */
     Cluster(const std::string& name, const std::string& url);
 
-    ~Cluster();
-
-    /** Join the cluster.
-     [EMAIL PROTECTED] is the handler for frames arriving from the cluster.
-     */
-    void join(framing::FrameHandler::Chain handler);
-    
-    /** Multicast a frame to the cluster. */
-    void handle(framing::AMQFrame&);
+    virtual ~Cluster();
 
     /** Get the current cluster membership. */
     MemberList getMembers() const;
 
-    /** Called when membership changes. */
-    void setCallback(boost::function<void()>);
-    
     /** Number of members in the cluster. */
     size_t size() const;
 
+    bool empty() const { return size() == 0; }
+    
+    /** Get handler chains to send frames to the cluster */ 
+    framing::FrameHandler::Chains getToChains() {
+        return toChains;
+    }
+
+    /** Set handler chains for frames received from the cluster */
+    void setFromChains(const framing::FrameHandler::Chains& chains);
+
+    /** Wait for predicate(*this) to be true, up to timeout.
+     [EMAIL PROTECTED] True if predicate became true, false if timed out.
+     *Note the predicate may not be true after wait returns,
+     *all the caller can say is it was true at some earlier point.
+     */
+    bool wait(boost::function<bool(const Cluster&)> predicate,
+              sys::Duration timeout=sys::TIME_INFINITE) const;
+
   private:
     typedef Cpg::Id Id;
     typedef std::map<Id, shared_ptr<Member> >  MemberMap;
+    typedef std::map<
+        framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
+    
+    void mcast(framing::AMQFrame&); ///< send frame by multicast.
+    void notify();              ///< Notify cluster of my details.
 
-    void run();
-    void notify();
-    void cpgDeliver(
+    void deliver(
         cpg_handle_t /*handle*/,
         struct cpg_name *group,
         uint32_t /*nodeid*/,
@@ -111,7 +102,7 @@
         void* /*msg*/,
         int /*msg_len*/);
 
-    void cpgConfigChange(
+    void configChange(
         cpg_handle_t /*handle*/,
         struct cpg_name */*group*/,
         struct cpg_address */*members*/, int /*nMembers*/,
@@ -119,16 +110,30 @@
         struct cpg_address */*joined*/, int /*nJoined*/
     );
 
+    void run();
+    bool handleClusterFrame(Id from, framing::AMQFrame&);
+
     mutable sys::Monitor lock;
+    boost::scoped_ptr<Cpg> cpg;
     Cpg::Name name;
     std::string url;
-    boost::scoped_ptr<Cpg> cpg;
     Id self;
     MemberMap members;
+    ChannelMap channels;
     sys::Thread dispatcher;
     boost::function<void()> callback;
+    framing::FrameHandler::Chains toChains;
+    framing::FrameHandler::Chains fromChains;
+
+    struct IncomingHandler;
+    struct OutgoingHandler;
+
+  friend struct IncomingHandler;
+  friend struct OutgoingHandler;
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
+  friend std::ostream& operator <<(std::ostream&, const 
MemberMap::value_type&);
+  friend std::ostream& operator <<(std::ostream&, const MemberMap&);
 };
 
 }} // namespace qpid::cluster

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp 
Mon Jul  2 15:35:33 2007
@@ -18,7 +18,6 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/framing/HandlerUpdater.h"
 #include "qpid/cluster/Cluster.h"
-#include "qpid/cluster/ChannelManager.h"
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
 
@@ -51,12 +50,7 @@
         if (broker && !options.clusterName.empty()) {
             assert(!cluster); // A process can only belong to one cluster.
             cluster.reset(new Cluster(options.clusterName, broker->getUrl()));
-
-            // Channel manager is both the next handler for the cluster
-            // and the HandlerUpdater plugin for the broker.
-            shared_ptr<ChannelManager> manager(new ChannelManager(cluster));
-            cluster->join(manager);
-            broker->use(manager);
+            // FIXME aconway 2007-06-29: register HandlerUpdater.
         }
     }
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Mon Jul  2 15:35:33 
2007
@@ -17,7 +17,10 @@
  */
 
 #include "Cpg.h"
+
 #include "qpid/sys/Mutex.h"
+#include "qpid/log/Statement.h"
+
 #include <vector>
 #include <limits>
 #include <iterator>
@@ -33,16 +36,15 @@
 class Cpg::Handles
 {
   public:
-    void put(cpg_handle_t handle, Cpg* object) {
+    void put(cpg_handle_t handle, Cpg::Handler* handler) {
         sys::Mutex::ScopedLock l(lock);
-        assert(object);
         uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
         if (index >= handles.size())
             handles.resize(index+1, 0);
-        handles[index] = object;
+        handles[index] = handler;
     }
     
-    Cpg* get(cpg_handle_t handle) {
+    Cpg::Handler* get(cpg_handle_t handle) {
         sys::Mutex::ScopedLock l(lock);
         uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
         assert(index < handles.size());
@@ -52,7 +54,7 @@
     
   private:
     sys::Mutex lock;
-    vector<Cpg*>  handles;
+    vector<Cpg::Handler*>  handles;
 };
 
 Cpg::Handles Cpg::handles;
@@ -66,7 +68,9 @@
     void* msg,
     int msg_len)
 {
-    handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len);
+    Cpg::Handler* handler=handles.get(handle);
+    if (handler)
+        handler->deliver(handle, group, nodeid, pid, msg, msg_len);
 }
 
 void Cpg::globalConfigChange(
@@ -77,23 +81,35 @@
     struct cpg_address *joined, int nJoined
 )
 {
-    handles.get(handle)->configChange(handle, group, members, nMembers, left, 
nLeft, joined, nJoined);
+    Cpg::Handler* handler=handles.get(handle);
+    if (handler)
+        handler->configChange(handle, group, members, nMembers, left, nLeft, 
joined, nJoined);
 }
 
-Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c)
-{
+Cpg::Cpg(Handler& h) : handler(h) {
     cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
     check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
-    handles.put(handle, this);
+    handles.put(handle, &handler);
 }
 
 Cpg::~Cpg() {
     try {
-        check(cpg_finalize(handle), "Error in shutdown of CPG");
+        shutdown();
+    } catch (const std::exception& e) {
+        QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what());
     }
-    catch (...) {
-        handles.put(handle, 0);
-        throw;
+}
+
+struct Cpg::ClearHandleOnExit {
+    ClearHandleOnExit(cpg_handle_t h) : handle(h) {}
+    ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); }
+    cpg_handle_t handle;
+};
+    
+void Cpg::shutdown() {
+    if (handles.get(handle)) {
+        ClearHandleOnExit guard(handle); // Exception safe
+        check(cpg_finalize(handle), "Error in shutdown of CPG");
     }
 }
 
@@ -102,11 +118,11 @@
       case CPG_OK: return msg+": ok";
       case CPG_ERR_LIBRARY: return msg+": library";
       case CPG_ERR_TIMEOUT: return msg+": timeout";
-      case CPG_ERR_TRY_AGAIN: return msg+": try again";
+      case CPG_ERR_TRY_AGAIN: return msg+": timeout. The aisexec daemon may 
not be running";
       case CPG_ERR_INVALID_PARAM: return msg+": invalid param";
       case CPG_ERR_NO_MEMORY: return msg+": no memory";
       case CPG_ERR_BAD_HANDLE: return msg+": bad handle";
-      case CPG_ERR_ACCESS: return msg+": access";
+      case CPG_ERR_ACCESS: return msg+": access denied. You may need to set 
your group ID to 'ais'";
       case CPG_ERR_NOT_EXIST: return msg+": not exist";
       case CPG_ERR_EXIST: return msg+": exist";
       case CPG_ERR_NOT_SUPPORTED: return msg+": not supported";

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Mon Jul  2 15:35:33 
2007
@@ -21,8 +21,9 @@
 
 #include "qpid/Exception.h"
 #include "qpid/cluster/Dispatchable.h"
-#include <boost/function.hpp>
+
 #include <cassert>
+
 extern "C" {
 #include <openais/cpg.h>
 }
@@ -69,31 +70,36 @@
         return std::string(n.value, n.length);
     }
 
-    typedef boost::function<void (
-        cpg_handle_t /*handle*/,
-        struct cpg_name *group,
-        uint32_t /*nodeid*/,
-        uint32_t /*pid*/,
-        void* /*msg*/,
-        int /*msg_len*/)> DeliverFn;
-
-    typedef boost::function<void (
-        cpg_handle_t /*handle*/,
-        struct cpg_name */*group*/,
-        struct cpg_address */*members*/, int /*nMembers*/,
-        struct cpg_address */*left*/, int /*nLeft*/,
-        struct cpg_address */*joined*/, int /*nJoined*/
-    )> ConfigChangeFn;
+    struct Handler {
+        virtual ~Handler() {};
+        virtual void deliver(
+            cpg_handle_t /*handle*/,
+            struct cpg_name *group,
+            uint32_t /*nodeid*/,
+            uint32_t /*pid*/,
+            void* /*msg*/,
+            int /*msg_len*/) = 0;
+
+        virtual void configChange(
+            cpg_handle_t /*handle*/,
+            struct cpg_name */*group*/,
+            struct cpg_address */*members*/, int /*nMembers*/,
+            struct cpg_address */*left*/, int /*nLeft*/,
+            struct cpg_address */*joined*/, int /*nJoined*/
+        ) = 0;
+    };
 
     /** Open a CPG handle.
-     [EMAIL PROTECTED] deliver - free function called when a message is 
delivered.
-     [EMAIL PROTECTED] reconfig - free function called when CPG configuration 
changes.
+     [EMAIL PROTECTED] handler for CPG events.
      */
-    Cpg(DeliverFn deliver, ConfigChangeFn reconfig);
-
-    /** Disconnect from CPG. */
+    Cpg(Handler&);
+    
+    /** Destructor calls shutdown. */
     ~Cpg();
 
+    /** Disconnect from CPG */
+    void shutdown();
+    
     /** Dispatch CPG events.
      [EMAIL PROTECTED] type one of
      * - CPG_DISPATCH_ONE - dispatch exactly one event.
@@ -128,7 +134,9 @@
     
   private:
     class Handles;
+    struct ClearHandleOnExit;
   friend class Handles;
+  friend struct ClearHandleOnExit;
     
     static std::string errorStr(cpg_error_t err, const std::string& msg);
     static std::string cantJoinMsg(const Name&);
@@ -159,8 +167,7 @@
 
     static Handles handles;
     cpg_handle_t handle;
-    DeliverFn deliver;
-    ConfigChangeFn configChange;
+    Handler& handler;
 };
 
 std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h Mon Jul  2 
15:35:33 2007
@@ -36,6 +36,9 @@
 
     /** Handler chains for incoming and outgoing traffic. */
     struct Chains {
+        Chains() {}
+        Chains(Chain i, Chain o) : in(i), out(o) {}
+        Chains(Handler* i, Handler* o) : in(i), out(o) {}
         Chain in;
         Chain out;
     };
@@ -48,12 +51,6 @@
 
     /** Next handler. Public so chains can be modified by altering next. */
     Chain next;
-
-  protected:
-    /** Derived handle() implementations call nextHandler to invoke the
-     * next handler in the chain. */
-    void nextHandler(T data) { if (next) next->handle(data); }
-
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h Mon Jul  2 15:35:33 
2007
@@ -43,6 +43,8 @@
     Selector(Level l, const std::string& s=std::string()) {
         enable(l,s);
     }
+
+    Selector(const std::string& enableStr) { enable(enableStr); }
     /**
      * Enable messages with level in levels where the file 
      * name contains substring. Empty string matches all.

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp Mon Jul  2 15:35:33 2007
@@ -20,51 +20,52 @@
 #include <boost/test/auto_unit_test.hpp>
 #include "test_tools.h"
 #include "Cluster.h"
+#include "qpid/framing/ChannelPingBody.h"
 #include "qpid/framing/ChannelOkBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-
 
 static const ProtocolVersion VER;
 
-/** Verify membership ind a cluster with one member. */
+using namespace qpid::log;
+
+/** Verify membership in a cluster with one member. */
 BOOST_AUTO_TEST_CASE(clusterOne) {
-    Cluster cluster("Test", "amqp:one:1");
-    TestClusterHandler handler(cluster);
-    AMQFrame frame(VER, 1, new ChannelOkBody(VER));
-    cluster.handle(frame);
-    BOOST_REQUIRE(handler.waitFrames(1));
+    TestCluster cluster("clusterOne", "amqp:one:1");
+    AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+    cluster.getToChains().in->handle(frame);
+    BOOST_REQUIRE(cluster.in.waitFor(1));
+
+    BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody());
     BOOST_CHECK_EQUAL(1u, cluster.size());
     Cluster::MemberList members = cluster.getMembers();
     BOOST_CHECK_EQUAL(1u, members.size());
-    BOOST_REQUIRE_EQUAL(members.front()->url, "amqp:one:1");
-    BOOST_CHECK_EQUAL(1u, handler.size());
-    BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody());
+    shared_ptr<const Cluster::Member> me=members.front();
+    BOOST_REQUIRE_EQUAL(me->url, "amqp:one:1");
 }
 
-/** Fork a process to verify membership in a cluster with two members */
+/** Fork a process to test a cluster with two members */
 BOOST_AUTO_TEST_CASE(clusterTwo) {
     pid_t pid=fork();
     BOOST_REQUIRE(pid >= 0);
-    if (pid) {                  // Parent see Cluster_child.cpp for child.
-        Cluster cluster("Test", "amqp::1");
-        TestClusterHandler handler(cluster);
-        BOOST_REQUIRE(handler.waitMembers(2));
+    if (pid) {              // Parent, see Cluster_child.cpp for child.
+        TestCluster cluster("clusterTwo", "amqp::1");
+        BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
 
         // Exchange frames with child.
-        AMQFrame frame(VER, 1, new ChannelOkBody(VER));
-        cluster.handle(frame);
-        BOOST_REQUIRE(handler.waitFrames(2));
-        BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody());
-        BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *handler[1].getBody());
+        AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+        cluster.getToChains().in->handle(frame);
+        BOOST_REQUIRE(cluster.in.waitFor(1));
+        BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody());
+        BOOST_REQUIRE(cluster.out.waitFor(1));
+        BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.out[0].getBody());
 
         // Wait for child to exit.
         int status;
         BOOST_CHECK_EQUAL(::wait(&status), pid);
         BOOST_CHECK_EQUAL(0, status);
-        BOOST_CHECK(handler.waitMembers(1));
+        BOOST_CHECK(cluster.waitFor(1));
         BOOST_CHECK_EQUAL(1u, cluster.size());
     }
     else {                      // Child
-        BOOST_REQUIRE(execl("Cluster_child", "Cluster_child", NULL));
+        BOOST_REQUIRE(execl("./Cluster_child", "./Cluster_child", NULL));
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h Mon Jul  2 15:35:33 2007
@@ -27,6 +27,7 @@
 #include <boost/bind.hpp>
 #include <iostream>
 #include <vector>
+#include <functional>
 
 /**
  * Definitions for the Cluster.cpp and Cluster_child.cpp child program.
@@ -39,45 +40,48 @@
 using namespace qpid::cluster;
 using namespace qpid::framing;
 using namespace qpid::sys;
+using namespace boost;
 
 void null_deleter(void*) {}
 
-struct TestClusterHandler :
-    public std::vector<AMQFrame>, public FrameHandler, public Monitor
-    
+struct TestFrameHandler :
+    public FrameHandler, public vector<AMQFrame>, public Monitor
 {
-    TestClusterHandler(Cluster& c) : cluster(c) {
-        cluster.join(make_shared_ptr(this, &null_deleter));
-        cluster.setCallback(boost::bind(&Monitor::notify, this));
-    }
-    
-    void handle(AMQFrame& f) {
-        ScopedLock l(*this);
-        push_back(f);
+    void handle(AMQFrame& frame) {
+        Mutex::ScopedLock l(*this);
+        push_back(frame);
         notifyAll();
     }
 
-    /** Wait for the vector to contain n frames. */
-    bool waitFrames(size_t n) {
-        ScopedLock l(*this);
-        AbsTime deadline(now(), TIME_SEC);
+    bool waitFor(size_t n) {
+        Mutex::ScopedLock l(*this);
+        AbsTime deadline(now(), 5*TIME_SEC);
         while (size() != n && wait(deadline))
             ;
         return size() == n;
     }
+};
 
-    /** Wait for the cluster to have n members */
-    bool waitMembers(size_t n) {
-        ScopedLock l(*this);
-        AbsTime deadline(now(), TIME_SEC);
-        while (cluster.size() != n && wait(deadline))
-            ;
-        return cluster.size() == n;
+void nullDeleter(void*) {}
+
+struct TestCluster : public Cluster
+{
+    TestCluster(string name, string url) : Cluster(name, url)
+    {
+        setFromChains(
+            FrameHandler::Chains(
+                make_shared_ptr(&in, nullDeleter),
+                make_shared_ptr(&out, nullDeleter)
+            ));
     }
 
-    Cluster& cluster;
-};
+    /** Wait for cluster to be of size n. */
+    bool waitFor(size_t n) {
+        return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), 
n));
+    }
 
+    TestFrameHandler in, out;
+};
 
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp Mon Jul  2 
15:35:33 2007
@@ -26,20 +26,20 @@
 using namespace qpid::cluster;
 using namespace qpid::framing;
 using namespace qpid::sys;
-
+using namespace qpid::log;
 
 static const ProtocolVersion VER;
 
 /** Chlid part of Cluster::clusterTwo test */
 void clusterTwo() {
-    Cluster cluster("Test", "amqp::2");
-    TestClusterHandler handler(cluster);
-    BOOST_REQUIRE(handler.waitFrames(1));
-    BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody());
-    AMQFrame frame(VER, 1, new BasicGetOkBody(VER));
-    cluster.handle(frame);
-    BOOST_REQUIRE(handler.waitFrames(2));
-    BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *handler[1].getBody());
+    TestCluster cluster("clusterTwo", "amqp::2");
+    BOOST_REQUIRE(cluster.in.waitFor(1)); // Frame from parent.
+    BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody());
+    BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
+    AMQFrame frame(VER, 1, new ChannelOkBody(VER));
+    cluster.getToChains().out->handle(frame);
+    BOOST_REQUIRE(cluster.out.waitFor(1));
+    BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.out[0].getBody());
 } 
 
 int test_main(int, char**) {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp Mon Jul  2 15:35:33 2007
@@ -47,11 +47,11 @@
     o << "{ ";
     ostream_iterator<cpg_address> i(o, " ");
     copy(array.first, array.first+array.second, i);
-    cout << "}";
+    o << "}";
     return o;
 }
 
-struct Callback {
+struct Callback : public Cpg::Handler {
     Callback(const string group_) : group(group_) {}
     string group;
     vector<string> delivered;
@@ -88,10 +88,7 @@
     //
     Cpg::Name group("foo");
     Callback cb(group.str());
-    Cpg::DeliverFn deliver=boost::bind(&Callback::deliver, &cb, _1, _2, _3, 
_4, _5, _6);
-    Cpg::ConfigChangeFn reconfig=boost::bind<void>(&Callback::configChange, 
&cb, _1, _2, _3, _4, _5, _6, _7, _8);
-
-    Cpg cpg(deliver, reconfig);
+    Cpg cpg(cb);
     cpg.join(group);
     iovec iov = { (void*)"Hello!", 6 };
     cpg.mcast(group, &iov, 1);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Mon Jul  2 15:35:33 2007
@@ -11,17 +11,19 @@
 # Initialize variables that are incremented with +=
 # 
 check_PROGRAMS=
-unit_progs=
-unit_wrappers=
+TESTS=
+EXTRA_DIST=
 
 #
 # Unit test programs.
 # 
-unit_progs+=logging
+TESTS+=logging
+check_PROGRAMS+=logging
 logging_SOURCES=logging.cpp test_tools.h
 logging_LDADD=-lboost_unit_test_framework -lboost_regex $(lib_common) 
 
-unit_progs+=Url
+TESTS+=Url
+check_PROGRAMS+=Url
 Url_SOURCES=Url.cpp test_tools.h
 Url_LDADD=-lboost_unit_test_framework $(lib_common) 
 
@@ -76,21 +78,20 @@
 
 # Executables for client tests
 
-testprogs =            \
+testprogs=             \
   client_test          \
   echo_service         \
   topic_listener       \
-  topic_publisher
-
-
-check_PROGRAMS += $(unit_progs) $(testprogs) interop_runner 
+  topic_publisher      \
+  interop_runner 
+check_PROGRAMS += $(testprogs)
 
 TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test
 
 system_tests = client_test quick_topictest
-TESTS = dummy_test $(unit_progs) $(unit_wrappers) run-unit-tests start_broker 
$(system_tests) python_tests kill_broker 
+TESTS += run-unit-tests start_broker $(system_tests) python_tests kill_broker 
 
-EXTRA_DIST =                                                           \
+EXTRA_DIST +=                                                          \
   test_env run_test                                                    \
   run-unit-tests start_broker python_tests kill_broker                         
\
   quick_topictest                                                      \
@@ -131,10 +132,8 @@
 check-unit:
        $(MAKE) check TESTS=$(UNIT_TESTS) run-unit-tests
 
-# Dummy test to force necessary test files to be generated.
-dummy_test: .valgrind.supp .valgrindrc
-       { echo "#!/bin/sh";  echo "# Dummy test, does nothing. "; } > $@
-       chmod a+x $@
+# Make sure valgrind files are generated.
+all: .valgrind.supp .valgrindrc
 
 # Create a copy so that can be modified without risk of committing the changes.
 .valgrindrc: .valgrindrc-default

Added: incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_check?view=auto&rev=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_check (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_check Mon Jul  2 15:35:33 2007
@@ -0,0 +1,16 @@
+#!/bin/sh
+test `id -ng` = "ais" || {
+    cat <<EOF
+    =========================== NOTICE==============================
+
+    You do not appear to have you group ID set to "ais".
+
+    Cluster tests that require the openais library will fail.Make sure
+    you are a member of group ais and run "newgrp ais" before running
+    the tests.
+
+    ================================================================
+    
+EOF
+exit 1;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?view=diff&rev=552614&r1=552613&r2=552614
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Mon Jul  2 15:35:33 2007
@@ -5,33 +5,25 @@
 lib_cluster = $(abs_builddir)/../libqpidcluster.la
 
 # NOTE: Programs using the openais library must be run with gid=ais
-# Such programs are built as *.ais, with a wrapper script *.sh that
-# runs the program with newgrp ais.
+# You should do "newgrp ais" before running the tests to run these.
 # 
 
-# Rule to generate wrapper scripts for tests that require gid=ais.
-run_test="env VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test"
-.ais.sh:
-       echo "if groups | grep '\bais\b' >/dev/null;" > [EMAIL PROTECTED]
-       echo "then echo $(run_test) ./$< \"$$@  \"| newgrp ais;" >>[EMAIL 
PROTECTED]
-       echo "else echo WARNING: `whoami` not in group ais, skipping $<.;" 
>>[EMAIL PROTECTED]
-       echo "fi"  >> [EMAIL PROTECTED]
-       mv [EMAIL PROTECTED] $@
-       chmod a+x $@
-
 #
 # Cluster tests.
 # 
-check_PROGRAMS+=Cpg.ais
-Cpg_ais_SOURCES=Cpg.cpp
-Cpg_ais_LDADD=$(lib_cluster) -lboost_unit_test_framework
-unit_wrappers+=Cpg.sh
 
-# FIXME aconway 2007-06-29: Fixing problems with the test.
-# check_PROGRAMS+=Cluster.ais
-# Cluster_ais_SOURCES=Cluster.cpp Cluster.h
-# Cluster_ais_LDADD=$(lib_cluster) -lboost_unit_test_framework
-# unit_wrappers+=Cluster.sh
+TESTS+=ais_check
+EXTRA_DIST+=ais_check
+
+TESTS+=Cpg
+check_PROGRAMS+=Cpg
+Cpg_SOURCES=Cpg.cpp
+Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
+
+TESTS+=Cluster
+check_PROGRAMS+=Cluster
+Cluster_SOURCES=Cluster.cpp Cluster.h
+Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
 
 check_PROGRAMS+=Cluster_child 
 Cluster_child_SOURCES=Cluster_child.cpp Cluster.h


Reply via email to