Author: aconway
Date: Thu Jul 26 08:47:23 2007
New Revision: 559859
URL: http://svn.apache.org/viewvc?view=rev&rev=559859
Log:
* README: Instructions for openais install.
* configure.ac: Enable clustering if suitable openais is present.
* src/tests/Cluster.cpp, .h, Cluster_child: Updated for 0-10
* src/qpid/sys/ConcurrentQueue.h: Added waitPop()
* src/Makefile.am, src/qpid/sys/ThreadSafeQueue.h, ProducerConsumer.h:
Removed unused code, ConcurrentQueue provides same functionality.
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProducerConsumer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProducerConsumer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ThreadSafeQueue.h
incubator/qpid/trunk/qpid/cpp/src/tests/ProducerConsumerTest.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/README
incubator/qpid/trunk/qpid/cpp/configure.ac
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
Modified: incubator/qpid/trunk/qpid/cpp/README
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/README?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/README (original)
+++ incubator/qpid/trunk/qpid/cpp/README Thu Jul 26 08:47:23 2007
@@ -29,45 +29,34 @@
these as a recommended minimum version. Older unix versions, for example,
Redhat Linux 3, will almost certainly require some packages to be upgraded.
-Qpid can be built using the gcc compiler:
+The following libraries and header files must be installed to build
+a source distribution:
+ * boost <http://www.boost.org> (1.33.1)
+ * uuid <http://e2fsprogs.sourceforge.net/> (1.39)
+ * apr <http://apr.apache.org> (1.2.7)
+ * pkgconfig <http://pkgconfig.freedesktop.org/wiki/> (0.21)
- # gcc <http://gcc.gnu.org/> (3.2.3)
+Optional cluster functionality requires:
+ * openais <http://openais.org/> (0.80.3)
-Qpid is compiled against libraries:
+Optionally building/running the tests requires:
+ * cppunit <http://cppunit.sourceforge.net> (1.11.4)
- * apr <http://apr.apache.org> (1.2.7)
- * boost <http://www.boost.org> (1.33.1)
- * cppunit <http://cppunit.sourceforge.net> (1.11.4)
- * uuid <http://e2fsprogs.sourceforge.net/> (1.39)
+Qpid has been built using the gcc compiler:
+ * gcc <http://gcc.gnu.org/> (3.2.3)
-Using tools:
+If you want to build directly from the SVN repository you will need
+all of the above plus:
- * boost-jam <http://boost.sourceforge.net/> (3.1.13)
* GNU make <http://www.gnu.org/software/make/> (3.8.0)
* autoconf <http://www.gnu.org/software/autoconf/> (2.61)
* automake <http://www.gnu.org/software/automake/> (1.9.6)
* help2man <http://www.gnu.org/software/help2man/> (1.36.4)
* libtool <http://www.gnu.org/software/libtool/> (1.5.22)
- * pkgconfig <http://pkgconfig.freedesktop.org/wiki/> (0.21)
* doxygen <ftp://ftp.stack.nl/pub/users/dimitri/> (1.5.1)
* graphviz <http://www.graphviz.org/> (2.12)
* JDK 5.0 <http://java.sun.com/j2se/1.5.0/> (1.5.0.11)
-=== Optional tools and libraries ===
-
-The following are only required if you generate documentation.
-(Source distributions contain pre-generated documentation.)
- * help2man
- * doxygen
- * graphviz
-
-cppunit is not required if you do not build/run the tests.
-
-If building from a source distribution you do not need:
- * autoconf
- * automake
- * JDK 5.0
-
=== Installing as root ===
On linux most packages can be installed using your distribution's package
@@ -90,7 +79,27 @@
# ./configure --prefix=~/qpid-tools
# make install
- The exceptions to this are boost and JDK 5.0.
+The exceptions are openais, boost, JDK 5.0.
+
+==== To build and install openais from source ====
+
+Unpack the source distribution and do:
+ # make
+ # sudo make install DESTDIR=
+ # sudo ldconfig
+
+This will install in the standard places (/usr/lib, /usr/include etc.)
+
+Next edit /etc/ais/openais.conf and modify the "bindnetaddr" setting
+to your hosts external IP address (don't use 127.0.0.1.)
+
+Finally start the ais daemon (must be done as root):
+
+ # sudo /sbin/aisexec
+
+Note that to run the AIS tests your primary group must be "ais". You
+can change your primary group with the usermod command or set it
+temporarily with the newgrp command.
==== To build the boost library ====
Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Thu Jul 26 08:47:23 2007
@@ -159,28 +159,22 @@
AC_CHECK_HEADERS([boost/shared_ptr.hpp uuid/uuid.h],,
AC_MSG_ERROR([Missing required header files.]))
-# Enable cluster functionality.
-AC_ARG_ENABLE([cluster],
- [AS_HELP_STRING([--enable-cluster],
- [Enable cluster functionality, requires openais (default no)])],
- [case $enableval in
- yes|no) enable_CLUSTER=$enableval;;
- *) AC_MSG_ERROR([Invalid value for --enable-apr-cluster: $enableval]);;
- esac],
- [enable_CLUSTER=no])
-
-AM_CONDITIONAL([CLUSTER], [test x$enable_CLUSTER = xyes])
-if test x$enable_CLUSTER = xyes; then
- CPPFLAGS+=" -DCLUSTER"
+# Check for cluster requirements.
+save_ldflags=$LDFLAGS
LDFLAGS="$LDFLAGS -L/usr/lib/openais -L/usr/lib64/openais"
- # cpg_local_get is not yet in a packaged release as of 2007-06-20
- AC_CHECK_LIB([cpg],[cpg_local_get],,
- AC_MSG_ERROR([cpg_local_get not available. openais missing/too old.]))
- AC_CHECK_HEADERS([openais/cpg.h],,
- AC_MSG_ERROR([Required header files not found.],[]))
+AC_CHECK_LIB([cpg],[cpg_local_get],[cpg_lib=yes])
+AC_CHECK_HEADER([openais/cpg.h],[cpg_h=yes])
+if test x$cpg_lib = xyes -a x$cpg_h = xyes; then
+ enable_CLUSTER=yes;
+ CPPFLAGS+=" -DCLUSTER"
+else
+ enable_CLUSTER=no;
+ LDFLAGS=$save_ldflags
fi
+AM_CONDITIONAL([CLUSTER], [test x$enable_CLUSTER = xyes])
+# Files to generate
AC_CONFIG_FILES([
qpidc.spec
Makefile
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Jul 26 08:47:23 2007
@@ -176,7 +176,6 @@
qpid/sys/Runnable.cpp \
qpid/sys/Shlib.h \
qpid/sys/Shlib.cpp \
- qpid/sys/ProducerConsumer.cpp \
qpid/Options.cpp \
qpid/Options.h \
qpid/log/Options.cpp \
@@ -380,7 +379,6 @@
qpid/sys/Monitor.h \
qpid/sys/Mutex.h \
qpid/sys/Poller.h \
- qpid/sys/ProducerConsumer.h \
qpid/sys/Runnable.h \
qpid/sys/ScopedIncrement.h \
qpid/sys/ShutdownHandler.h \
@@ -388,7 +386,6 @@
qpid/sys/Thread.h \
qpid/sys/ConcurrentQueue.h \
qpid/sys/Serializer.h \
- qpid/sys/ThreadSafeQueue.h \
qpid/sys/Time.h \
qpid/sys/TimeoutHandler.h \
qpid/Exception.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h Thu Jul 26
08:47:23 2007
@@ -22,7 +22,10 @@
*
*/
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/ScopedIncrement.h"
+
+#include <boost/bind.hpp>
#include <deque>
@@ -33,9 +36,24 @@
* Thread-safe queue that allows threads to push items onto
* the queue concurrently with threads popping items off the
* queue.
+ *
+ * Also allows consuming threads to wait until an item is available.
*/
template <class T> class ConcurrentQueue {
public:
+ ConcurrentQueue() : waiters(0), shutdown(false) {}
+
+ /** Threads in wait() are woken with ShutdownException before
+ * destroying the queue.
+ */
+ ~ConcurrentQueue() {
+ Mutex::ScopedLock l(lock);
+ shutdown = true;
+ lock.notifyAll();
+ while (waiters > 0)
+ lock.wait();
+ }
+
/** Push a data item onto the back of the queue */
void push(const T& data) {
Mutex::ScopedLock l(lock);
@@ -47,6 +65,28 @@
*/
bool pop(T& data) {
Mutex::ScopedLock l(lock);
+ return popInternal(data);
+ }
+
+ /** Wait up to deadline for a data item to be available.
+ [EMAIL PROTECTED] true if data was available, false if timed out.
+ [EMAIL PROTECTED] ShutdownException if the queue is destroyed.
+ */
+ bool waitPop(T& data, Duration timeout) {
+ Mutex::ScopedLock l(lock);
+ ScopedIncrement<size_t> w(
+ waiters, boost::bind(&ConcurrentQueue::noWaiters, this));
+ AbsTime deadline(now(), timeout);
+ while (queue.empty() && lock.wait(deadline))
+ ;
+ return popInternal(data);
+ }
+
+ private:
+
+ bool popInternal(T& data) {
+ if (shutdown)
+ throw ShutdownException();
if (queue.empty())
return false;
else {
@@ -56,9 +96,16 @@
}
}
- private:
- Mutex lock;
+ void noWaiters() {
+ assert(waiters == 0);
+ if (shutdown)
+ lock.notify(); // Notify dtor thread.
+ }
+
+ Monitor lock;
std::deque<T> queue;
+ size_t waiters;
+ bool shutdown;
};
}} // namespace qpid::sys
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp Thu Jul 26 08:47:23 2007
@@ -19,8 +19,8 @@
#include "Cluster.h"
#include "test_tools.h"
-#include "qpid/framing/ChannelPingBody.h"
-#include "qpid/framing/ChannelOkBody.h"
+#include "qpid/framing/SessionPingBody.h"
+#include "qpid/framing/SessionPongBody.h"
#include "qpid/cluster/ClassifierHandler.h"
#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*>
@@ -33,16 +33,16 @@
/** Verify membership in a cluster with one member. */
BOOST_AUTO_TEST_CASE(testClusterOne) {
TestCluster cluster("clusterOne", "amqp:one:1");
- AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+ AMQFrame frame(VER, 1, new SessionPingBody(VER));
Uuid id(true);
SessionFrame send(id, frame, true);
cluster.handle(send);
- BOOST_REQUIRE(cluster.received.waitFor(1));
+ SessionFrame sf;
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
- SessionFrame& sf=cluster.received[0];
BOOST_CHECK(sf.isIncoming);
BOOST_CHECK_EQUAL(id, sf.uuid);
- BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
+ BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
BOOST_CHECK_EQUAL(1u, cluster.size());
Cluster::MemberList members = cluster.getMembers();
@@ -65,17 +65,18 @@
BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
// Exchange frames with child.
- AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+ AMQFrame frame(VER, 1, new SessionPingBody(VER));
Uuid id(true);
SessionFrame send(id, frame, true);
cluster.handle(send);
- BOOST_REQUIRE(cluster.received.waitFor(1));
- SessionFrame& sf=cluster.received[0];
+ SessionFrame sf;
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
BOOST_CHECK_EQUAL(id, sf.uuid);
BOOST_CHECK(sf.isIncoming);
- BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
- BOOST_REQUIRE(cluster.received.waitFor(2));
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody,
*cluster.received[1].frame.getBody());
+ BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
+
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
+ BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody());
if (!nofork) {
// Wait for child to exit.
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h Thu Jul 26 08:47:23 2007
@@ -20,16 +20,13 @@
*/
#include "qpid/cluster/Cluster.h"
+#include "qpid/sys/ConcurrentQueue.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelOkBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
#include <boost/test/test_tools.hpp>
#include <iostream>
-#include <vector>
#include <functional>
/**
@@ -48,26 +45,12 @@
void null_deleter(void*) {}
template <class T>
-class TestHandler : public Handler<T&>, public vector<T>
+class TestHandler : public Handler<T&>, public ConcurrentQueue<T>
{
- Monitor lock;
-
public:
- void handle(T& frame) {
- Mutex::ScopedLock l(lock);
- push_back(frame);
- BOOST_MESSAGE(getpid()<<" TestHandler::handle: " << this->size());
- lock.notifyAll();
- }
-
- bool waitFor(size_t n) {
- Mutex::ScopedLock l(lock);
- BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<")
"<<this->size());
- AbsTime deadline(now(), 2*TIME_SEC);
- while (this->size() < n && lock.wait(deadline))
- ;
- return this->size() >= n;
- }
+ void handle(T& frame) { push(frame); }
+ bool waitPop(T& x) { return waitPop(x, TIME_SEC); }
+ using ConcurrentQueue<T>::waitPop;
};
typedef TestHandler<AMQFrame> TestFrameHandler;
@@ -83,7 +66,8 @@
/** Wait for cluster to be of size n. */
bool waitFor(size_t n) {
BOOST_CHECKPOINT("About to call Cluster::wait");
- return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this),
n));
+ return wait(boost::bind(
+ equal_to<size_t>(), bind(&Cluster::size,this), n));
}
TestSessionFrameHandler received;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp Thu Jul 26
08:47:23 2007
@@ -20,6 +20,8 @@
#include "Cluster.h"
#include "test_tools.h"
+#include "qpid/framing/SessionPingBody.h"
+#include "qpid/framing/SessionPongBody.h"
using namespace std;
using namespace qpid;
@@ -33,17 +35,18 @@
/** Chlid part of Cluster::clusterTwo test */
void clusterTwo() {
TestCluster cluster("clusterTwo", "amqp:child:2");
- BOOST_REQUIRE(cluster.received.waitFor(1)); // Frame from parent.
- BOOST_CHECK(cluster.received[0].isIncoming);
- BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody,
*cluster.received[0].frame.getBody());
+ SessionFrame sf;
+ BOOST_REQUIRE(cluster.received.waitPop(sf)); // Frame from parent.
+ BOOST_CHECK(sf.isIncoming);
+ BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
- AMQFrame frame(VER, 1, new ChannelOkBody(VER));
- SessionFrame sf(cluster.received[0].uuid, frame, false);
- cluster.handle(sf);
- BOOST_REQUIRE(cluster.received.waitFor(2));
- BOOST_CHECK(!cluster.received[1].isIncoming);
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody,
*cluster.received[1].frame.getBody());
+ AMQFrame frame(VER, 1, new SessionPongBody(VER));
+ SessionFrame sendframe(sf.uuid, frame, false);
+ cluster.handle(sendframe);
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
+ BOOST_CHECK(!sf.isIncoming);
+ BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody());
}
int test_main(int, char**) {
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Jul 26 08:47:23 2007
@@ -87,9 +87,6 @@
HeaderTest \
SequenceNumberTest
-misc_unit_tests = \
- ProducerConsumerTest
-
posix_unit_tests = \
EventChannelTest \
EventChannelThreadsTest
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Thu Jul 26 08:47:23 2007
@@ -20,6 +20,7 @@
Cpg_SOURCES=Cpg.cpp
Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
+# TODO aconway 2007-07-26: Fix this test.
#TESTS+=Cluster
check_PROGRAMS+=Cluster
Cluster_SOURCES=Cluster.cpp Cluster.h