Author: aconway
Date: Thu Jun 26 11:12:12 2008
New Revision: 671969
URL: http://svn.apache.org/viewvc?rev=671969&view=rev
Log:
Consolidated cluster tests in cluster_test.cpp
Improvements to BrokerFixture for testing.
Added:
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
- copied, changed from r671655,
incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
Removed:
incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
incubator/qpid/trunk/qpid/cpp/src/tests/stop_cluster
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp Thu Jun 26 11:12:12 2008
@@ -19,16 +19,16 @@
*/
#include "Plugin.h"
+#include "qpid/Options.h"
namespace qpid {
namespace {
-// This is a single threaded singleton implementation so
-// it is important to be sure that the first use of this
-// singleton is when the program is still single threaded
Plugin::Plugins& thePlugins() {
+ // This is a single threaded singleton implementation so
+ // it is important to be sure that the first use of this
+ // singleton is when the program is still single threaded
static Plugin::Plugins plugins;
-
return plugins;
}
}
@@ -42,8 +42,13 @@
Options* Plugin::getOptions() { return 0; }
-const Plugin::Plugins& Plugin::getPlugins() {
- return thePlugins();
+const Plugin::Plugins& Plugin::getPlugins() { return thePlugins(); }
+
+void Plugin::addOptions(Options& opts) {
+ for (Plugins::const_iterator i = getPlugins().begin(); i !=
getPlugins().end(); ++i) {
+ if ((*i)->getOptions())
+ opts.add(*(*i)->getOptions());
+ }
}
} // namespace qpid
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h Thu Jun 26 11:12:12 2008
@@ -88,6 +88,9 @@
* Caller must not delete plugin pointers.
*/
static const Plugins& getPlugins();
+
+ /** For each registered plugin, add plugin.getOptions() to opts. */
+ static void addOptions(Options& opts);
};
} // namespace qpid
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Thu Jun 26 11:12:12
2008
@@ -92,7 +92,7 @@
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
handles.put(handle, &handler);
- QPID_LOG(debug, "Initialize CPG handle " << handle);
+ QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle);
}
Cpg::~Cpg() {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp Thu Jun 26 11:12:12 2008
@@ -102,13 +102,7 @@
add(broker);
add(daemon);
add(log);
- const Plugin::Plugins& plugins=
- Plugin::getPlugins();
- for (Plugin::Plugins::const_iterator i = plugins.begin();
- i != plugins.end();
- ++i)
- if ((*i)->getOptions() != 0)
- add(*(*i)->getOptions());
+ Plugin::addOptions(*this);
}
void usage() const {
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Thu Jun 26 11:12:12
2008
@@ -29,19 +29,19 @@
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
+#include <boost/noncopyable.hpp>
/**
* A fixture with an in-process broker.
*/
-struct BrokerFixture {
+struct BrokerFixture : private boost::noncopyable {
typedef qpid::broker::Broker Broker;
typedef boost::shared_ptr<Broker> BrokerPtr;
BrokerPtr broker;
qpid::sys::Thread brokerThread;
- BrokerFixture() {
- Broker::Options opts;
+ BrokerFixture(Broker::Options opts=Broker::Options()) {
opts.port=0;
// Management doesn't play well with multiple in-process brokers.
opts.enableMgmt=false;
@@ -65,8 +65,11 @@
void open(qpid::client::Connection& c) {
c.open("localhost", broker->getPort());
}
+
+ uint16_t getPort() { return broker->getPort(); }
};
+/** Connection that opens in its constructor */
struct LocalConnection : public qpid::client::Connection {
LocalConnection(uint16_t port) { open("localhost", port); }
};
@@ -80,25 +83,36 @@
~ProxyConnection() { close(); }
};
-/**
- * A BrokerFixture with open Connection, Session and
- * SubscriptionManager and LocalQueue for convenience.
+/** Convenience class to create and open a connection and session
+ * and some related useful objects.
*/
-template <class ConnectionType>
-struct SessionFixtureT : BrokerFixture {
+template <class ConnectionType=ProxyConnection, class
SessionType=qpid::client::Session>
+struct ClientT {
ConnectionType connection;
- qpid::client::Session session;
+ SessionType session;
qpid::client::SubscriptionManager subs;
qpid::client::LocalQueue lq;
+ ClientT(uint16_t port) : connection(port),
+ session(connection.newSession("Client")),
+ subs(session)
+ {}
+
+ ~ClientT() { connection.close(); }
+};
- SessionFixtureT() : connection(broker->getPort()),
- session(connection.newSession("SessionFixture")),
- subs(session)
+typedef ClientT<> Client;
+
+/**
+ * A BrokerFixture and ready-connected BrokerFixture::Client all in one.
+ */
+template <class ConnectionType, class SessionType=qpid::client::Session>
+struct SessionFixtureT : BrokerFixture, ClientT<ConnectionType,SessionType> {
+
+ SessionFixtureT(Broker::Options opts=Broker::Options()) :
+ BrokerFixture(opts),
+ ClientT<ConnectionType,SessionType>(broker->getPort())
{}
- ~SessionFixtureT() {
- connection.close();
- }
};
typedef SessionFixtureT<LocalConnection> SessionFixture;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_check Thu Jun 26 11:12:12 2008
@@ -23,11 +23,14 @@
exit 0; # A warning, not a failure.
fi
+# Execute command with the ais group set.
+with_ais_group() {
+ id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the
ais group."; exit 1; }
+ echo $* | newgrp ais
+}
+
# Run the tests
srcdir=`dirname $0`
-$srcdir/start_cluster 4
-./ais_test
-ret=$?
-$srcdir/stop_cluster
-exit $ret
+with_ais_group ./cluster_test || ERROR=1
+exit $ERROR
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=671969&r1=671968&r2=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Thu Jun 26 11:12:12 2008
@@ -11,10 +11,10 @@
# ais_check checks conditions for AIS tests and runs if ok.
TESTS+=ais_check
-EXTRA_DIST+=ais_check start_cluster stop_cluster
+EXTRA_DIST+=ais_check
-check_PROGRAMS+=ais_test
-ais_test_SOURCES=ais_test.cpp Cpg.cpp
-ais_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
+check_PROGRAMS+=cluster_test
+cluster_test_SOURCES=unit_test.cpp cluster_test.cpp
+cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
endif
Copied: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (from r671655,
incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp)
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp&r1=671655&r2=671969&rev=671969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Jun 26
11:12:12 2008
@@ -18,11 +18,14 @@
#include "test_tools.h"
+#include "unit_test.h"
+#include "BrokerFixture.h"
+
#include "qpid/cluster/Cpg.h"
#include "qpid/framing/AMQBody.h"
#include <boost/bind.hpp>
-#include "unit_test.h"
+#include <boost/ptr_container/ptr_vector.hpp>
#include <string>
#include <iostream>
@@ -36,6 +39,8 @@
using namespace std;
using namespace qpid::cluster;
using namespace qpid::framing;
+using namespace qpid::client;
+using boost::ptr_vector;
// For debugging: op << for CPG types.
@@ -112,4 +117,99 @@
}
+QPID_AUTO_TEST_CASE(CpgMulti) {
+ // Verify using multiple handles in one process.
+ //
+ Cpg::Name group("CpgMulti");
+ Callback cb1(group.str());
+ Cpg cpg1(cb1);
+
+ Callback cb2(group.str());
+ Cpg cpg2(cb2);
+
+ cpg1.join(group);
+ cpg2.join(group);
+ iovec iov1 = { (void*)"Hello1", 6 };
+ iovec iov2 = { (void*)"Hello2", 6 };
+ cpg1.mcast(group, &iov1, 1);
+ cpg2.mcast(group, &iov2, 1);
+ cpg1.leave(group);
+ cpg2.leave(group);
+
+ cpg1.dispatchSome();
+ BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
+ BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
+ BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
+
+ cpg2.dispatchSome();
+ BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
+ BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
+ BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
+}
+
+// Test cluster of BrokerFixtures.
+struct ClusterFixture : public ptr_vector<BrokerFixture> {
+ ClusterFixture(size_t n=0) { add(n); }
+ void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
+ void add();
+};
+
+void ClusterFixture::add() {
+ qpid::broker::Broker::Options opts;
+ // Assumes the cluster plugin is loaded.
+ qpid::Plugin::addOptions(opts);
+ const char* argv[] = { "--cluster-name=$CLUSTER" };
+ // FIXME aconway 2008-06-26: fix parse() signature, should not need cast.
+ opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv));
+ push_back(new BrokerFixture(opts));
+}
+
+#if 0 // FIXME aconway 2008-06-26: TODO
+QPID_AUTO_TEST_CASE(testWiringReplication) {
+ const size_t SIZE=3;
+ ClusterFixture cluster(SIZE);
+ Client c0(cluster[0].getPort());
+ BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
+ BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
+ c0.session.queueDeclare("q");
+ c0.session.exchangeDeclare("ex", arg::type="direct");
+ BOOST_CHECK_EQUAL("q", c0.session.queueQuery("q").getQueue());
+ BOOST_CHECK_EQUAL("direct", c0.session.exchangeQuery("ex").getType());
+ c0.close();
+
+ // Verify all brokers get wiring update.
+ for (size_t i = 1; i < cluster.size(); ++i) {
+ Client c(cluster[i].getPort());
+ BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
+ BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
+ c.close();
+ }
+}
+
+QPID_AUTO_TEST_CASE(testMessageReplication) {
+ // Enqueue on one broker, dequeue on another.
+ ClusterConnections cluster;
+ BOOST_REQUIRE(cluster.size() > 1);
+
+ Session broker0 = cluster[0]->newSession();
+ broker0.queueDeclare(queue="q");
+ broker0.messageTransfer(content=TransferContent("data", "q"));
+ broker0.close();
+
+ Session broker1 = cluster[1]->newSession();
+ broker1.
+ c.session.messageSubscribe(queue="q", destination="q");
+ c.session.messageFlow(destination="q", unit=0, value=1);//messages
+ FrameSet::shared_ptr msg = c.session.get();
+ BOOST_CHECK(msg->isA<MessageTransferBody>());
+ BOOST_CHECK_EQUAL(string("data"), msg->getContent());
+ c.session.getExecution().completed(msg->getId(), true, true);
+ cluster[i]->close();
+ }
+}
+
+// TODO aconway 2008-06-25: dequeue replication, exactly once delivery,
failover.
+
+#endif
+
QPID_AUTO_TEST_SUITE_END()