Author: aconway
Date: Mon Jul 23 06:08:16 2007
New Revision: 558710

URL: http://svn.apache.org/viewvc?view=rev&rev=558710
Log:

        * src/tests/cluster.mk: Enable cluster test.

        * src/tests/Cluster.h (class TestHandler):
        Fixed race in TestHandler::waitFor

        * src/tests/Cluster.cpp
         - Allow separate start of parent and child processes.

        * src/qpid/Options.cpp (parse): Skip argv parsing if argc=0.

        * src/qpid/cluster/Cluster.cpp (configChange): assert group name.

        * src/qpid/cluster/Cpg.cpp, .h: Additional  logging

        * src/qpid/framing/AMQFrame.cpp: Initialize all fields in ctor,
        avoid valgrind warning.

        * src/qpid/log/Logger.cpp: Initialize singleton automatically
        from environment so logging can be used on tests.

        * src/qpid/sys/Time.h: Avoid overflow in AbsTime(t, TIME_INFINITE)

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/Options.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Options.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Options.cpp?view=diff&rev=558710&r1=558709&r2=558710
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Options.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Options.cpp Mon Jul 23 06:08:16 2007
@@ -27,12 +27,13 @@
 using namespace std;
 
 namespace {
-const std::string prefix("QPID_");
+
 char env2optchar(char env) { return (env=='_') ? '-' : tolower(env); }
 
 struct Mapper {
     Mapper(const Options& o) : opts(o) {}
     string operator()(const string& env) {
+        static const std::string prefix("QPID_");
         if (env.substr(0, prefix.size()) == prefix) {
             string opt = env.substr(prefix.size());
             transform(opt.begin(), opt.end(), opt.begin(), env2optchar);
@@ -58,7 +59,8 @@
     try {
         po::variables_map vm;
         parsing="command line options";
-        po::store(po::parse_command_line(argc, argv, *this), vm);
+        if (argc > 0 && argv != 0)
+            po::store(po::parse_command_line(argc, argv, *this), vm);
         parsing="environment variables";
         po::store(po::parse_environment(*this, Mapper(*this)), vm);
         po::notify(vm); // configFile may be updated from arg/env options.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?view=diff&rev=558710&r1=558709&r2=558710
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Jul 23 
06:08:16 2007
@@ -115,13 +115,14 @@
 }
 
 void Cluster::deliver(
-        cpg_handle_t /*handle*/,
-        struct cpg_name* /* group */,
-        uint32_t nodeid,
-        uint32_t pid,
-        void* msg,
-        int msg_len)
+    cpg_handle_t /*handle*/,
+    cpg_name* group,
+    uint32_t nodeid,
+    uint32_t pid,
+    void* msg,
+    int msg_len)
 {
+    assert(name == *group);
     Id from(nodeid, pid);
     Buffer buf(static_cast<char*>(msg), msg_len);
     SessionFrame frame;
@@ -149,26 +150,27 @@
     ClusterNotifyBody* notifyIn=
         dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
     assert(notifyIn);
-        MemberList list;
-        {
-            Mutex::ScopedLock l(lock);
+    MemberList list;
+    {
+        Mutex::ScopedLock l(lock);
         shared_ptr<Member>& member=members[from];
         if (!member) 
             member.reset(new Member(notifyIn->getUrl()));
-            else 
+        else 
             member->url = notifyIn->getUrl();
-            lock.notifyAll();
+        lock.notifyAll();
         QPID_LOG(trace, *this << ": members joined: " << members);
-        }
+    }
 }
 
 void Cluster::configChange(
     cpg_handle_t /*handle*/,
-    struct cpg_name */*group*/,
-    struct cpg_address */*current*/, int /*nCurrent*/,
-    struct cpg_address *left, int nLeft,
-    struct cpg_address *joined, int nJoined)
+    cpg_name *group,
+    cpg_address */*current*/, int /*nCurrent*/,
+    cpg_address *left, int nLeft,
+    cpg_address *joined, int nJoined)
 {
+    assert(name == *group);
     bool newMembers=false;
     MemberList updated;
     {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?view=diff&rev=558710&r1=558709&r2=558710
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Mon Jul 23 06:08:16 
2007
@@ -90,25 +90,22 @@
     cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
     check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
     handles.put(handle, &handler);
+    QPID_LOG(debug, "Initialize CPG handle " << handle);
 }
 
 Cpg::~Cpg() {
     try {
         shutdown();
     } catch (const std::exception& e) {
-        QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what());
+        QPID_LOG(error, "Exception in Cpg destructor: " << e.what());
     }
 }
 
-struct Cpg::ClearHandleOnExit {
-    ClearHandleOnExit(cpg_handle_t h) : handle(h) {}
-    ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); }
-    cpg_handle_t handle;
-};
-    
 void Cpg::shutdown() {
+    QPID_LOG(debug, "Shutdown CPG handle " << handle);
     if (handles.get(handle)) {
-        ClearHandleOnExit guard(handle); // Exception safe
+        QPID_LOG(debug, "Finalize CPG handle " << handle);
+        handles.put(handle, 0);
         check(cpg_finalize(handle), "Error in shutdown of CPG");
     }
 }
@@ -173,8 +170,11 @@
     return out << ":" << id.pid();
 }
 
+ostream& operator <<(ostream& out, const cpg_name& name) {
+    return out << string(name.value, name.length);
+}
 
-}} // namespace qpid::cpg
+}} // namespace qpid::cluster
 
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?view=diff&rev=558710&r1=558709&r2=558710
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Mon Jul 23 06:08:16 
2007
@@ -170,9 +170,14 @@
     Handler& handler;
 };
 
+std::ostream& operator <<(std::ostream& out, const cpg_name& name);
 std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
 std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> 
addresses);
 
+inline bool operator==(const cpg_name& a, const cpg_name& b) {
+    return a.length==b.length &&  strncmp(a.value, b.value, a.length) == 0;
+}
+inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == 
b); }
 
 }} // namespace qpid::cluster
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?view=diff&rev=558710&r1=558709&r2=558710
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Mon Jul 23 
06:08:16 2007
@@ -33,8 +33,8 @@
 
 AMQP_MethodVersionMap AMQFrame::versionMap;
 
-AMQFrame::AMQFrame(ProtocolVersion _version):
-version(_version)
+AMQFrame::AMQFrame(ProtocolVersion _version)
+    : channel(0), type(0), version(_version)
  {
      assert(version != ProtocolVersion(0,0));
  }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp?view=diff&rev=558710&r1=558709&r2=558710
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp Mon Jul 23 06:08:16 
2007
@@ -85,7 +85,15 @@
     return boost::details::pool::singleton_default<Logger>::instance();
 }
 
-Logger::Logger() : flags(0) {}
+Logger::Logger() : flags(0) {
+    // Initialize myself from env variables so all programs
+    // (e.g. tests) can use logging even if they don't parse
+    // command line args.
+    Options opts;
+    opts.parse(0, 0);           
+    configure(opts,"");
+}
+
 Logger::~Logger() {}
 
 void Logger::select(const Selector& s) {
@@ -190,6 +198,7 @@
 
 void Logger::configure(const Options& opts, const std::string& prog)
 {
+    clear();
     Options o(opts);
     if (o.trace)
         o.selectors.push_back("trace+");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h?view=diff&rev=558710&r1=558709&r2=558710
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h Mon Jul 23 06:08:16 2007
@@ -32,41 +32,43 @@
 
 /** Times in nanoseconds */
 class AbsTime {
+    static int64_t max() { return std::numeric_limits<int64_t>::max(); }
     int64_t time_ns;
-
-       friend class Duration;
+    
+  friend class Duration;
        
-public:
-       inline AbsTime() {}
-       inline AbsTime(const AbsTime& time0, const Duration& duration);
-       // Default asignment operation fine
-       // Default copy constructor fine
+  public:
+    inline AbsTime() {}
+    inline AbsTime(const AbsTime& time0, const Duration& duration);
+    // Default asignment operation fine
+    // Default copy constructor fine
         
-       static AbsTime now();
-       inline static AbsTime FarFuture();
+    static AbsTime now();
+    inline static AbsTime FarFuture();
 
-        friend bool operator<(const AbsTime& a, const AbsTime& b);
-        friend bool operator>(const AbsTime& a, const AbsTime& b);
+  friend bool operator<(const AbsTime& a, const AbsTime& b);
+  friend bool operator>(const AbsTime& a, const AbsTime& b);
 };
 
 class Duration {
+    static int64_t max() { return std::numeric_limits<int64_t>::max(); }
     int64_t nanosecs;
 
-       friend class AbsTime;
+  friend class AbsTime;
 
-public:
-       inline Duration(int64_t time0);
-       inline explicit Duration(const AbsTime& time0);
-       inline explicit Duration(const AbsTime& start, const AbsTime& finish);
-       inline operator int64_t() const;
+  public:
+    inline Duration(int64_t time0);
+    inline explicit Duration(const AbsTime& time0);
+    inline explicit Duration(const AbsTime& start, const AbsTime& finish);
+    inline operator int64_t() const;
 };
 
 
-AbsTime::AbsTime(const AbsTime& time0, const Duration& duration0) :
-       time_ns(time0.time_ns+duration0.nanosecs)
+AbsTime::AbsTime(const AbsTime& t, const Duration& d) :
+    time_ns(d == Duration::max() ? max() : t.time_ns+d.nanosecs)
 {}
 
-AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = 
std::numeric_limits<int64_t>::max(); return ff;}
+AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = max(); return ff;}
 
 inline AbsTime now() { return AbsTime::now(); }
 
@@ -74,15 +76,15 @@
 inline bool operator>(const AbsTime& a, const AbsTime& b) { return a.time_ns > 
b.time_ns; }
 
 Duration::Duration(int64_t time0) :
-       nanosecs(time0)
+    nanosecs(time0)
 {}
 
 Duration::Duration(const AbsTime& time0) :
-       nanosecs(time0.time_ns)
+    nanosecs(time0.time_ns)
 {}
 
 Duration::Duration(const AbsTime& start, const AbsTime& finish) :
-       nanosecs(finish.time_ns - start.time_ns)
+    nanosecs(finish.time_ns - start.time_ns)
 {}
 
 Duration::operator int64_t() const

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp?view=diff&rev=558710&r1=558709&r2=558710
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp Mon Jul 23 06:08:16 2007
@@ -30,8 +30,6 @@
 
 static const ProtocolVersion VER;
 
-using namespace qpid::log;
-
 /** Verify membership in a cluster with one member. */
 BOOST_AUTO_TEST_CASE(testClusterOne) {
     TestCluster cluster("clusterOne", "amqp:one:1");
@@ -55,10 +53,15 @@
 
 /** Fork a process to test a cluster with two members */
 BOOST_AUTO_TEST_CASE(testClusterTwo) {
-    pid_t pid=fork();
-    BOOST_REQUIRE(pid >= 0);
-    if (pid) {              // Parent, see Cluster_child.cpp for child.
-        TestCluster cluster("clusterTwo", "amqp::1");
+    bool nofork=getenv("NOFORK") != 0;
+    pid_t pid=0;
+    if (!nofork) {
+        pid = fork();
+        BOOST_REQUIRE(pid >= 0);
+    }
+    if (pid || nofork) {        // Parent
+        BOOST_MESSAGE("Parent start");
+        TestCluster cluster("clusterTwo", "amqp:parent:1");
         BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
 
         // Exchange frames with child.
@@ -74,12 +77,14 @@
         BOOST_REQUIRE(cluster.received.waitFor(2));
         BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, 
*cluster.received[1].frame.getBody());
 
-        // Wait for child to exit.
-        int status;
-        BOOST_CHECK_EQUAL(::wait(&status), pid);
-        BOOST_CHECK_EQUAL(0, status);
-        BOOST_CHECK(cluster.waitFor(1));
-        BOOST_CHECK_EQUAL(1u, cluster.size());
+        if (!nofork) {
+            // Wait for child to exit.
+            int status;
+            BOOST_CHECK_EQUAL(::wait(&status), pid);
+            BOOST_CHECK_EQUAL(0, status);
+            BOOST_CHECK(cluster.waitFor(1));
+            BOOST_CHECK_EQUAL(1u, cluster.size());
+        }
     }
     else {                      // Child
         BOOST_REQUIRE(execl("./Cluster_child", "./Cluster_child", NULL));

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h?view=diff&rev=558710&r1=558709&r2=558710
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h Mon Jul 23 06:08:16 2007
@@ -48,20 +48,25 @@
 void null_deleter(void*) {}
 
 template <class T>
-struct TestHandler : public Handler<T&>, public vector<T>, public Monitor
+class TestHandler : public Handler<T&>, public vector<T>
 {
+    Monitor lock;
+
+  public:
     void handle(T& frame) {
-        Mutex::ScopedLock l(*this);
+        Mutex::ScopedLock l(lock);
         push_back(frame);
-        notifyAll();
+        BOOST_MESSAGE(getpid()<<" TestHandler::handle: " << this->size());
+        lock.notifyAll();
     }
 
     bool waitFor(size_t n) {
-        Mutex::ScopedLock l(*this);
-        AbsTime deadline(now(), 5*TIME_SEC);
-        while (vector<T>::size() != n && wait(deadline))
+        Mutex::ScopedLock l(lock);
+        BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<") 
"<<this->size());
+        AbsTime deadline(now(), 2*TIME_SEC);
+        while (vector<T>::size() < n && lock.wait(deadline))
             ;
-        return vector<T>::size() == n;
+        return vector<T>::size() >= n;
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp?view=diff&rev=558710&r1=558709&r2=558710
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp Mon Jul 23 06:08:16 2007
@@ -82,11 +82,11 @@
     }
 };
 
-BOOST_AUTO_TEST_CASE(Cpg_basic) {
+BOOST_AUTO_TEST_CASE(CpgBasic) {
     // Verify basic functionality of cpg. This will catch any
     // openais configuration or permission errors.
     //
-    Cpg::Name group("foo");
+    Cpg::Name group("CpgBasic");
     Callback cb(group.str());
     Cpg cpg(cb);
     cpg.join(group);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?view=diff&rev=558710&r1=558709&r2=558710
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Mon Jul 23 06:08:16 2007
@@ -20,11 +20,10 @@
 Cpg_SOURCES=Cpg.cpp
 Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
 
-# FIXME aconway 2007-07-19: 
-# TESTS+=Cluster
-# check_PROGRAMS+=Cluster
-# Cluster_SOURCES=Cluster.cpp Cluster.h
-# Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
+TESTS+=Cluster
+check_PROGRAMS+=Cluster
+Cluster_SOURCES=Cluster.cpp Cluster.h
+Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
 
 check_PROGRAMS+=Cluster_child 
 Cluster_child_SOURCES=Cluster_child.cpp Cluster.h


Reply via email to