Author: aconway
Date: Thu Jul 26 08:47:23 2007
New Revision: 559859

URL: http://svn.apache.org/viewvc?view=rev&rev=559859
Log:

        * README: Instructions for openais install.

        * configure.ac: Enable clustering if suitable openais is present.

        * src/tests/Cluster.cpp, .h, Cluster_child: Updated for 0-10

        * src/qpid/sys/ConcurrentQueue.h: Added waitPop()

        * src/Makefile.am, src/qpid/sys/ThreadSafeQueue.h, ProducerConsumer.h:
        Removed unused code, ConcurrentQueue provides same functionality.

Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProducerConsumer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProducerConsumer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ThreadSafeQueue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ProducerConsumerTest.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/README
    incubator/qpid/trunk/qpid/cpp/configure.ac
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk

Modified: incubator/qpid/trunk/qpid/cpp/README
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/README?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/README (original)
+++ incubator/qpid/trunk/qpid/cpp/README Thu Jul 26 08:47:23 2007
@@ -29,45 +29,34 @@
 these as a recommended minimum version. Older unix versions, for example,
 Redhat Linux 3, will almost certainly require some packages to be upgraded.
 
-Qpid can be built using the gcc compiler:
+The following libraries and header files must be installed to build
+a source distribution:
+ * boost   <http://www.boost.org>              (1.33.1)
+ * uuid    <http://e2fsprogs.sourceforge.net/> (1.39)
+ * apr     <http://apr.apache.org>             (1.2.7)
+ * pkgconfig  <http://pkgconfig.freedesktop.org/wiki/> (0.21)
 
- # gcc     <http://gcc.gnu.org/>            (3.2.3)
+Optional cluster functionality requires:
+ * openais <http://openais.org/>              (0.80.3)
 
-Qpid is compiled against libraries:
+Optionally building/running the tests requires:
+ * cppunit <http://cppunit.sourceforge.net>    (1.11.4)
 
- * apr     <http://apr.apache.org>          (1.2.7)
- * boost   <http://www.boost.org>           (1.33.1)
- * cppunit <http://cppunit.sourceforge.net> (1.11.4)
- * uuid    <http://e2fsprogs.sourceforge.net/> (1.39)
+Qpid has been built using the gcc compiler:
+ * gcc     <http://gcc.gnu.org/>            (3.2.3)
 
-Using tools:
+If you want to build directly from the SVN repository you will need
+all of the above plus:
 
- * boost-jam  <http://boost.sourceforge.net/>          (3.1.13)
  * GNU make   <http://www.gnu.org/software/make/>      (3.8.0)
  * autoconf   <http://www.gnu.org/software/autoconf/>  (2.61)
  * automake   <http://www.gnu.org/software/automake/>  (1.9.6)
  * help2man   <http://www.gnu.org/software/help2man/>  (1.36.4)
  * libtool    <http://www.gnu.org/software/libtool/>   (1.5.22)
- * pkgconfig  <http://pkgconfig.freedesktop.org/wiki/> (0.21)
  * doxygen    <ftp://ftp.stack.nl/pub/users/dimitri/>  (1.5.1)
  * graphviz   <http://www.graphviz.org/>               (2.12)
  * JDK 5.0    <http://java.sun.com/j2se/1.5.0/>        (1.5.0.11)
 
-=== Optional tools and libraries ===
-
-The following are only required if you generate documentation.
-(Source distributions contain pre-generated documentation.)
- * help2man
- * doxygen
- * graphviz
-
-cppunit is not required if you do not build/run the tests.
-
-If building from a source distribution you do not need:
- * autoconf
- * automake
- * JDK 5.0
-
 === Installing as root ===
 
 On linux most packages can be installed using your distribution's package
@@ -90,7 +79,27 @@
   # ./configure --prefix=~/qpid-tools
   # make install
 
- The exceptions to this are boost and JDK 5.0.
+The exceptions are openais, boost, JDK 5.0.
+
+==== To build and install openais from source ====
+
+Unpack the source distribution and do:
+ # make
+ # sudo make install DESTDIR=
+ # sudo ldconfig
+
+This will install in the standard places (/usr/lib, /usr/include etc.)
+
+Next edit /etc/ais/openais.conf and modify the "bindnetaddr" setting
+to your hosts external IP address (don't use 127.0.0.1.) 
+
+Finally start the ais daemon (must be done as root):
+
+ # sudo /sbin/aisexec
+
+Note that to run the AIS tests your primary group must be "ais".  You
+can change your primary group with the usermod command or set it
+temporarily with the newgrp command.
 
 ==== To build the boost library ====
 

Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Thu Jul 26 08:47:23 2007
@@ -159,28 +159,22 @@
 AC_CHECK_HEADERS([boost/shared_ptr.hpp uuid/uuid.h],,
   AC_MSG_ERROR([Missing required header files.]))
 
-# Enable cluster functionality.
-AC_ARG_ENABLE([cluster],
-  [AS_HELP_STRING([--enable-cluster],
-    [Enable cluster functionality, requires openais (default no)])],
-  [case $enableval in
-    yes|no) enable_CLUSTER=$enableval;;
-    *) AC_MSG_ERROR([Invalid value for --enable-apr-cluster: $enableval]);;
-   esac],
-  [enable_CLUSTER=no])
-
-AM_CONDITIONAL([CLUSTER], [test x$enable_CLUSTER = xyes])
-if test x$enable_CLUSTER = xyes; then
-  CPPFLAGS+=" -DCLUSTER"
+# Check for cluster requirements.
+save_ldflags=$LDFLAGS  
   LDFLAGS="$LDFLAGS -L/usr/lib/openais -L/usr/lib64/openais"
-  # cpg_local_get is not yet in a packaged release as of 2007-06-20
-  AC_CHECK_LIB([cpg],[cpg_local_get],,
-    AC_MSG_ERROR([cpg_local_get not available. openais missing/too old.]))
-   AC_CHECK_HEADERS([openais/cpg.h],,
-     AC_MSG_ERROR([Required header files not found.],[]))
+AC_CHECK_LIB([cpg],[cpg_local_get],[cpg_lib=yes])
+AC_CHECK_HEADER([openais/cpg.h],[cpg_h=yes])
+if test x$cpg_lib = xyes -a x$cpg_h = xyes; then
+   enable_CLUSTER=yes; 
+   CPPFLAGS+=" -DCLUSTER"
+else
+   enable_CLUSTER=no; 
+   LDFLAGS=$save_ldflags           
 fi
+AM_CONDITIONAL([CLUSTER], [test x$enable_CLUSTER = xyes])
 
        
+# Files to generate    
 AC_CONFIG_FILES([
   qpidc.spec
   Makefile

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Jul 26 08:47:23 2007
@@ -176,7 +176,6 @@
   qpid/sys/Runnable.cpp \
   qpid/sys/Shlib.h \
   qpid/sys/Shlib.cpp \
-  qpid/sys/ProducerConsumer.cpp \
   qpid/Options.cpp \
   qpid/Options.h \
   qpid/log/Options.cpp \
@@ -380,7 +379,6 @@
   qpid/sys/Monitor.h \
   qpid/sys/Mutex.h \
   qpid/sys/Poller.h \
-  qpid/sys/ProducerConsumer.h \
   qpid/sys/Runnable.h \
   qpid/sys/ScopedIncrement.h \
   qpid/sys/ShutdownHandler.h \
@@ -388,7 +386,6 @@
   qpid/sys/Thread.h \
   qpid/sys/ConcurrentQueue.h \
   qpid/sys/Serializer.h \
-  qpid/sys/ThreadSafeQueue.h \
   qpid/sys/Time.h \
   qpid/sys/TimeoutHandler.h \
   qpid/Exception.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h Thu Jul 26 
08:47:23 2007
@@ -22,7 +22,10 @@
  *
  */
 
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/ScopedIncrement.h"
+
+#include <boost/bind.hpp>
 
 #include <deque>
 
@@ -33,9 +36,24 @@
  * Thread-safe queue that allows threads to push items onto
  * the queue concurrently with threads popping items off the
  * queue.
+ *
+ * Also allows consuming threads to wait until an item is available.
  */
 template <class T> class ConcurrentQueue {
   public:
+    ConcurrentQueue() : waiters(0), shutdown(false) {}
+
+    /** Threads in wait() are woken with ShutdownException before
+     * destroying the queue.
+     */
+    ~ConcurrentQueue() {
+        Mutex::ScopedLock l(lock);
+        shutdown = true;
+        lock.notifyAll();
+        while (waiters > 0)
+            lock.wait();
+    }
+    
     /** Push a data item onto the back of the queue */
     void push(const T& data) {
         Mutex::ScopedLock l(lock);
@@ -47,6 +65,28 @@
      */
     bool pop(T& data) {
         Mutex::ScopedLock l(lock);
+        return popInternal(data);
+    }
+
+    /** Wait up to deadline for a data item to be available.
+     [EMAIL PROTECTED] true if data was available, false if timed out.
+     [EMAIL PROTECTED] ShutdownException if the queue is destroyed.
+     */
+    bool waitPop(T& data, Duration timeout) {
+        Mutex::ScopedLock l(lock);
+        ScopedIncrement<size_t> w(
+            waiters, boost::bind(&ConcurrentQueue::noWaiters, this));
+        AbsTime deadline(now(), timeout);
+        while (queue.empty() && lock.wait(deadline))
+            ;
+        return popInternal(data);
+    }
+
+  private:
+    
+    bool popInternal(T& data) {
+        if (shutdown) 
+            throw ShutdownException();
         if (queue.empty())
             return false;
         else {
@@ -56,9 +96,16 @@
         }
     }
     
-  private:
-    Mutex lock;
+    void noWaiters() {
+        assert(waiters == 0);
+        if (shutdown)
+            lock.notify();  // Notify dtor thread.
+    }
+        
+    Monitor lock;
     std::deque<T> queue;
+    size_t waiters;
+    bool shutdown;
 };
 
 }} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp Thu Jul 26 08:47:23 2007
@@ -19,8 +19,8 @@
 #include "Cluster.h"
 #include "test_tools.h"
 
-#include "qpid/framing/ChannelPingBody.h"
-#include "qpid/framing/ChannelOkBody.h"
+#include "qpid/framing/SessionPingBody.h"
+#include "qpid/framing/SessionPongBody.h"
 #include "qpid/cluster/ClassifierHandler.h"
 
 #define BOOST_AUTO_TEST_MAIN    // Must come before #include<boost/test/*>
@@ -33,16 +33,16 @@
 /** Verify membership in a cluster with one member. */
 BOOST_AUTO_TEST_CASE(testClusterOne) {
     TestCluster cluster("clusterOne", "amqp:one:1");
-    AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+    AMQFrame frame(VER, 1, new SessionPingBody(VER));
     Uuid id(true);
     SessionFrame send(id, frame, true);
     cluster.handle(send);
-    BOOST_REQUIRE(cluster.received.waitFor(1));
+    SessionFrame sf;
+    BOOST_REQUIRE(cluster.received.waitPop(sf));
 
-    SessionFrame& sf=cluster.received[0];
     BOOST_CHECK(sf.isIncoming);
     BOOST_CHECK_EQUAL(id, sf.uuid);
-    BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
+    BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
     
     BOOST_CHECK_EQUAL(1u, cluster.size());
     Cluster::MemberList members = cluster.getMembers();
@@ -65,17 +65,18 @@
         BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
 
         // Exchange frames with child.
-        AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+        AMQFrame frame(VER, 1, new SessionPingBody(VER));
         Uuid id(true);
         SessionFrame send(id, frame, true);
         cluster.handle(send);
-        BOOST_REQUIRE(cluster.received.waitFor(1));
-        SessionFrame& sf=cluster.received[0];
+        SessionFrame sf;
+        BOOST_REQUIRE(cluster.received.waitPop(sf));
         BOOST_CHECK_EQUAL(id, sf.uuid);
         BOOST_CHECK(sf.isIncoming);
-        BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
-        BOOST_REQUIRE(cluster.received.waitFor(2));
-        BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, 
*cluster.received[1].frame.getBody());
+        BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
+        
+        BOOST_REQUIRE(cluster.received.waitPop(sf));
+        BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody());
 
         if (!nofork) {
             // Wait for child to exit.

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h Thu Jul 26 08:47:23 2007
@@ -20,16 +20,13 @@
  */
 
 #include "qpid/cluster/Cluster.h"
+#include "qpid/sys/ConcurrentQueue.h"
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelOkBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/log/Logger.h"
 
 #include <boost/bind.hpp>
 #include <boost/test/test_tools.hpp>
 
 #include <iostream>
-#include <vector>
 #include <functional>
 
 /**
@@ -48,26 +45,12 @@
 void null_deleter(void*) {}
 
 template <class T>
-class TestHandler : public Handler<T&>, public vector<T>
+class TestHandler : public Handler<T&>, public ConcurrentQueue<T>
 {
-    Monitor lock;
-
   public:
-    void handle(T& frame) {
-        Mutex::ScopedLock l(lock);
-        push_back(frame);
-        BOOST_MESSAGE(getpid()<<" TestHandler::handle: " << this->size());
-        lock.notifyAll();
-    }
-
-    bool waitFor(size_t n) {
-        Mutex::ScopedLock l(lock);
-        BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<") 
"<<this->size());
-        AbsTime deadline(now(), 2*TIME_SEC);
-        while (this->size() < n && lock.wait(deadline))
-            ;
-        return this->size() >= n;
-    }
+    void handle(T& frame) { push(frame); }
+    bool waitPop(T& x) { return waitPop(x, TIME_SEC); }
+    using ConcurrentQueue<T>::waitPop;
 };
 
 typedef TestHandler<AMQFrame> TestFrameHandler;
@@ -83,7 +66,8 @@
     /** Wait for cluster to be of size n. */
     bool waitFor(size_t n) {
         BOOST_CHECKPOINT("About to call Cluster::wait");
-        return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), 
n));
+        return wait(boost::bind(
+                        equal_to<size_t>(), bind(&Cluster::size,this), n));
     }
 
     TestSessionFrameHandler received;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp Thu Jul 26 
08:47:23 2007
@@ -20,6 +20,8 @@
 
 #include "Cluster.h"
 #include "test_tools.h"
+#include "qpid/framing/SessionPingBody.h"
+#include "qpid/framing/SessionPongBody.h"
 
 using namespace std;
 using namespace qpid;
@@ -33,17 +35,18 @@
 /** Chlid part of Cluster::clusterTwo test */
 void clusterTwo() {
     TestCluster cluster("clusterTwo", "amqp:child:2");
-    BOOST_REQUIRE(cluster.received.waitFor(1)); // Frame from parent.
-    BOOST_CHECK(cluster.received[0].isIncoming);
-    BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, 
*cluster.received[0].frame.getBody());
+    SessionFrame sf;
+    BOOST_REQUIRE(cluster.received.waitPop(sf)); // Frame from parent.
+    BOOST_CHECK(sf.isIncoming);
+    BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
     BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
 
-    AMQFrame frame(VER, 1, new ChannelOkBody(VER));
-    SessionFrame sf(cluster.received[0].uuid, frame, false);
-    cluster.handle(sf);
-    BOOST_REQUIRE(cluster.received.waitFor(2));
-    BOOST_CHECK(!cluster.received[1].isIncoming);
-    BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, 
*cluster.received[1].frame.getBody());
+    AMQFrame frame(VER, 1, new SessionPongBody(VER));
+    SessionFrame sendframe(sf.uuid, frame, false);
+    cluster.handle(sendframe);
+    BOOST_REQUIRE(cluster.received.waitPop(sf));
+    BOOST_CHECK(!sf.isIncoming);
+    BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody());
 } 
 
 int test_main(int, char**) {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Jul 26 08:47:23 2007
@@ -87,9 +87,6 @@
   HeaderTest           \
   SequenceNumberTest
 
-misc_unit_tests =      \
-  ProducerConsumerTest
-
 posix_unit_tests =     \
   EventChannelTest     \
   EventChannelThreadsTest

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?view=diff&rev=559859&r1=559858&r2=559859
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Thu Jul 26 08:47:23 2007
@@ -20,6 +20,7 @@
 Cpg_SOURCES=Cpg.cpp
 Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
 
+# TODO aconway 2007-07-26: Fix this test.
 #TESTS+=Cluster
 check_PROGRAMS+=Cluster
 Cluster_SOURCES=Cluster.cpp Cluster.h


Reply via email to