Author: aconway
Date: Thu Jun 26 11:12:12 2008
New Revision: 671969

URL: http://svn.apache.org/viewvc?rev=671969&view=rev
Log:
Consolidated cluster tests in cluster_test.cpp
Improvements to BrokerFixture for testing.

Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
      - copied, changed from r671655, 
incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
Removed:
    incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
    incubator/qpid/trunk/qpid/cpp/src/tests/stop_cluster
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp Thu Jun 26 11:12:12 2008
@@ -19,16 +19,16 @@
  */
 
 #include "Plugin.h"
+#include "qpid/Options.h"
 
 namespace qpid {
 
 namespace {
-// This is a single threaded singleton implementation so
-// it is important to be sure that the first use of this
-// singleton is when the program is still single threaded
 Plugin::Plugins& thePlugins() {
+    // This is a single threaded singleton implementation so
+    // it is important to be sure that the first use of this
+    // singleton is when the program is still single threaded
     static Plugin::Plugins plugins;
-
     return plugins;
 }
 }
@@ -42,8 +42,13 @@
 
 Options*  Plugin::getOptions() { return 0; }
 
-const Plugin::Plugins& Plugin::getPlugins() {
-    return thePlugins();
+const Plugin::Plugins& Plugin::getPlugins() { return thePlugins(); }
+
+void Plugin::addOptions(Options& opts) {
+    for (Plugins::const_iterator i = getPlugins().begin(); i != 
getPlugins().end(); ++i) {
+        if ((*i)->getOptions())
+            opts.add(*(*i)->getOptions());
+    }
 }
 
 } // namespace qpid

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h Thu Jun 26 11:12:12 2008
@@ -88,6 +88,9 @@
      * Caller must not delete plugin pointers.
      */
     static const Plugins& getPlugins();
+
+    /** For each registered plugin, add plugin.getOptions() to opts. */
+    static void addOptions(Options& opts);
 };
  
 } // namespace qpid

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Thu Jun 26 11:12:12 
2008
@@ -92,7 +92,7 @@
     cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
     check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
     handles.put(handle, &handler);
-    QPID_LOG(debug, "Initialize CPG handle " << handle);
+    QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle);
 }
 
 Cpg::~Cpg() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp Thu Jun 26 11:12:12 2008
@@ -102,13 +102,7 @@
         add(broker);
         add(daemon);
         add(log);
-        const Plugin::Plugins& plugins=
-            Plugin::getPlugins();
-        for (Plugin::Plugins::const_iterator i = plugins.begin();
-             i != plugins.end();
-             ++i)
-            if ((*i)->getOptions() != 0)
-                add(*(*i)->getOptions());
+        Plugin::addOptions(*this);
     }
 
     void usage() const {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Thu Jun 26 11:12:12 
2008
@@ -29,19 +29,19 @@
 #include "qpid/client/ConnectionImpl.h"
 #include "qpid/client/Session.h"
 #include "qpid/client/SubscriptionManager.h"
+#include <boost/noncopyable.hpp>
 
 /**
  * A fixture with an in-process broker.
  */
-struct  BrokerFixture {
+struct  BrokerFixture : private boost::noncopyable {
     typedef qpid::broker::Broker Broker;
     typedef boost::shared_ptr<Broker> BrokerPtr;
 
     BrokerPtr broker;
     qpid::sys::Thread brokerThread;
 
-    BrokerFixture() {
-        Broker::Options opts;
+    BrokerFixture(Broker::Options opts=Broker::Options()) {
         opts.port=0;
         // Management doesn't play well with multiple in-process brokers.
         opts.enableMgmt=false;  
@@ -65,8 +65,11 @@
     void open(qpid::client::Connection& c) {
         c.open("localhost", broker->getPort());
     }
+
+    uint16_t getPort() { return broker->getPort(); }
 };
 
+/** Connection that opens in its constructor */
 struct LocalConnection : public qpid::client::Connection {
     LocalConnection(uint16_t port) { open("localhost", port); }
 };
@@ -80,25 +83,36 @@
     ~ProxyConnection() { close(); }
 };
 
-/**
- * A BrokerFixture with open Connection, Session and
- * SubscriptionManager and LocalQueue for convenience.
+/** Convenience class to create and open a connection and session
+ * and some related useful objects.
  */
-template <class ConnectionType>
-struct  SessionFixtureT : BrokerFixture {
+template <class ConnectionType=ProxyConnection, class 
SessionType=qpid::client::Session>
+struct ClientT {
     ConnectionType connection;
-    qpid::client::Session session;
+    SessionType session;
     qpid::client::SubscriptionManager subs;
     qpid::client::LocalQueue lq;
+    ClientT(uint16_t port) : connection(port),
+                            session(connection.newSession("Client")),
+                            subs(session)
+    {}
+
+    ~ClientT() { connection.close(); }
+};
 
-    SessionFixtureT() : connection(broker->getPort()),
-                        session(connection.newSession("SessionFixture")),
-                        subs(session)
+typedef ClientT<> Client;
+
+/**
+ * A BrokerFixture and ready-connected BrokerFixture::Client all in one.
+ */
+template <class ConnectionType, class SessionType=qpid::client::Session>
+struct  SessionFixtureT : BrokerFixture, ClientT<ConnectionType,SessionType> {
+
+    SessionFixtureT(Broker::Options opts=Broker::Options()) :
+        BrokerFixture(opts),
+        ClientT<ConnectionType,SessionType>(broker->getPort())
     {}
 
-    ~SessionFixtureT() {
-        connection.close();
-    }
 };
 
 typedef SessionFixtureT<LocalConnection> SessionFixture;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_check Thu Jun 26 11:12:12 2008
@@ -23,11 +23,14 @@
     exit 0;                    # A warning, not a failure.
 fi
 
+# Execute command with the ais group set.
+with_ais_group() {
+    id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the 
ais group."; exit 1; }
+    echo $* | newgrp ais
+}
+
 # Run the tests
 srcdir=`dirname $0`
-$srcdir/start_cluster 4
-./ais_test
-ret=$?
-$srcdir/stop_cluster 
-exit $ret
+with_ais_group ./cluster_test || ERROR=1
+exit $ERROR
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Thu Jun 26 11:12:12 2008
@@ -11,10 +11,10 @@
 
 # ais_check checks conditions for AIS tests and runs if ok.
 TESTS+=ais_check
-EXTRA_DIST+=ais_check start_cluster stop_cluster
+EXTRA_DIST+=ais_check
 
-check_PROGRAMS+=ais_test
-ais_test_SOURCES=ais_test.cpp Cpg.cpp 
-ais_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
+check_PROGRAMS+=cluster_test
+cluster_test_SOURCES=unit_test.cpp cluster_test.cpp
+cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
 
 endif

Copied: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (from r671655, 
incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp)
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp&r1=671655&r2=671969&rev=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Jun 26 
11:12:12 2008
@@ -18,11 +18,14 @@
 
 
 #include "test_tools.h"
+#include "unit_test.h"
+#include "BrokerFixture.h"
+
 #include "qpid/cluster/Cpg.h"
 #include "qpid/framing/AMQBody.h"
 
 #include <boost/bind.hpp>
-#include "unit_test.h"
+#include <boost/ptr_container/ptr_vector.hpp>
 
 #include <string>
 #include <iostream>
@@ -36,6 +39,8 @@
 using namespace std;
 using namespace qpid::cluster;
 using namespace qpid::framing;
+using namespace qpid::client;
+using boost::ptr_vector;
 
 // For debugging: op << for CPG types.
 
@@ -112,4 +117,99 @@
 }
 
 
+QPID_AUTO_TEST_CASE(CpgMulti) {
+    // Verify using multiple handles in one process.
+    //
+    Cpg::Name group("CpgMulti");
+    Callback cb1(group.str());
+    Cpg cpg1(cb1);
+
+    Callback cb2(group.str());
+    Cpg cpg2(cb2);
+    
+    cpg1.join(group);
+    cpg2.join(group);
+    iovec iov1 = { (void*)"Hello1", 6 };
+    iovec iov2 = { (void*)"Hello2", 6 };
+    cpg1.mcast(group, &iov1, 1);
+    cpg2.mcast(group, &iov2, 1);
+    cpg1.leave(group);
+    cpg2.leave(group);
+
+    cpg1.dispatchSome();
+    BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
+    BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
+    BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
+
+    cpg2.dispatchSome();
+    BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
+    BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
+    BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
+}
+
+// Test cluster of BrokerFixtures.
+struct ClusterFixture : public ptr_vector<BrokerFixture> {
+    ClusterFixture(size_t n=0) { add(n); }
+    void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
+    void add();
+};
+
+void ClusterFixture::add() {
+    qpid::broker::Broker::Options opts;
+    // Assumes the cluster plugin is loaded.
+    qpid::Plugin::addOptions(opts);
+    const char* argv[] = { "--cluster-name=$CLUSTER" };
+    // FIXME aconway 2008-06-26: fix parse() signature, should not need cast.
+    opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv));
+    push_back(new BrokerFixture(opts));
+}
+
+#if 0                           // FIXME aconway 2008-06-26: TODO
+QPID_AUTO_TEST_CASE(testWiringReplication) {
+    const size_t SIZE=3;
+    ClusterFixture cluster(SIZE);
+    Client c0(cluster[0].getPort());
+    BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
+    BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); 
+    c0.session.queueDeclare("q");
+    c0.session.exchangeDeclare("ex", arg::type="direct");
+    BOOST_CHECK_EQUAL("q", c0.session.queueQuery("q").getQueue());
+    BOOST_CHECK_EQUAL("direct", c0.session.exchangeQuery("ex").getType());
+    c0.close();
+
+    // Verify all brokers get wiring update.
+    for (size_t i = 1; i < cluster.size(); ++i) {
+        Client c(cluster[i].getPort());
+        BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
+        BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
+        c.close();
+    }    
+}
+
+QPID_AUTO_TEST_CASE(testMessageReplication) {
+    // Enqueue on one broker, dequeue on another.
+    ClusterConnections cluster;
+    BOOST_REQUIRE(cluster.size() > 1);
+
+    Session broker0 = cluster[0]->newSession();
+    broker0.queueDeclare(queue="q");
+    broker0.messageTransfer(content=TransferContent("data", "q"));
+    broker0.close();
+    
+    Session broker1 = cluster[1]->newSession();
+    broker1.
+        c.session.messageSubscribe(queue="q", destination="q");
+        c.session.messageFlow(destination="q", unit=0, value=1);//messages
+        FrameSet::shared_ptr msg = c.session.get();
+        BOOST_CHECK(msg->isA<MessageTransferBody>());
+        BOOST_CHECK_EQUAL(string("data"), msg->getContent());
+        c.session.getExecution().completed(msg->getId(), true, true);
+        cluster[i]->close();
+    }    
+}
+
+// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, 
failover.
+
+#endif
+
 QPID_AUTO_TEST_SUITE_END()


Reply via email to