Author: aconway
Date: Thu Jul  5 09:08:29 2007
New Revision: 553543

URL: http://svn.apache.org/viewvc?view=rev&rev=553543
Log:

        * src/qpid/cluster/SessionFrame.cpp, .h:  Wrap AMQFrame with
          session UUID and direction.
          
        * src/qpid/cluster/Cluster.cpp, .h: Use SessionFrame.

        * src/qpid/framing/AMQFrame.h, .cpp: Added setBody(), inline getBody()

        * src/qpid/framing/Uuid.h, .cpp:  Clean up constructors, inline.

        * src/qpid/framing/Buffer.h: Put/get byte*, size_T.

        * src/qpid/cluster/SessionManager.cpp, .h:
         - Maintain the session map.
         - Handle frames from cluster, dispatch to proper channels.
         - Implement HandlerUpdater for new channels and maintains

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.cpp   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Thu Jul  5 09:08:29 2007
@@ -13,7 +13,9 @@
   qpid/cluster/Dispatchable.h \
   qpid/cluster/ClusterPluginProvider.cpp \
   qpid/cluster/ClassifierHandler.h \
-  qpid/cluster/ClassifierHandler.cpp
+  qpid/cluster/ClassifierHandler.cpp \
+  qpid/cluster/SessionFrame.h \
+  qpid/cluster/SessionFrame.cpp
 
 libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Jul  5 
09:08:29 2007
@@ -19,6 +19,7 @@
 #include "Cluster.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ClusterNotifyBody.h"
+#include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
 #include <boost/bind.hpp>
 #include <algorithm>
@@ -45,24 +46,11 @@
     return out;
 }
 
-namespace {
-
-/** We mark the high bit of a frame's channel number to know if it's
- * an incoming or outgoing frame when frames arrive via multicast.
- */ 
-bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; }
-bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); }
-void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; }
-void markIncoming(AMQFrame&) { /*noop*/ }
-void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; }
-
-}
-
 struct Cluster::IncomingHandler : public FrameHandler {
     IncomingHandler(Cluster& c) : cluster(c) {}
     void handle(AMQFrame& frame) {
-        markIncoming(frame);
-        cluster.mcast(frame);
+        SessionFrame sf(Uuid(true), frame, SessionFrame::IN);
+        cluster.mcast(sf);
     }
     Cluster& cluster;
 };
@@ -70,18 +58,18 @@
 struct Cluster::OutgoingHandler : public FrameHandler {
     OutgoingHandler(Cluster& c) : cluster(c) {}
     void handle(AMQFrame& frame) {
-        markOutgoing(frame);
-        cluster.mcast(frame);
+        SessionFrame sf(Uuid(true), frame, SessionFrame::OUT);
+        cluster.mcast(sf);
     }
     Cluster& cluster;
 };
 
-
 // TODO aconway 2007-06-28: Right now everything is backed up via
 // multicast.  When we have point-to-point backups the
 // Incoming/Outgoing handlers must determine where each frame should
 // be sent: to multicast or only to specific backup(s) via AMQP.
 
+
 Cluster::Cluster(const std::string& name_, const std::string& url_) :
     cpg(new Cpg(*this)),
     name(name_),
@@ -114,7 +102,7 @@
     }
 }
 
-void Cluster::mcast(AMQFrame& frame) {
+void Cluster::mcast(SessionFrame& frame) {
     QPID_LOG(trace, *this << " SEND: " << frame);
     Buffer buf(frame.size());
     frame.encode(buf);
@@ -124,11 +112,9 @@
 }
 
 void Cluster::notify() {
-    // TODO aconway 2007-06-25: Use proxy here.
-    ProtocolVersion version;
-    AMQFrame frame(version, 0,
-                   make_shared_ptr(new ClusterNotifyBody(version, url)));
-    mcast(frame);
+    SessionFrame sf;
+    sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), 
url)));
+    mcast(sf);
 }
 
 size_t Cluster::size() const {
@@ -136,12 +122,13 @@
     return members.size();
 }
 
-void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) {
+void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) {
     Mutex::ScopedLock l(lock);
-    fromChains = chains;
+    receivedChain = chain;
 }
 
 Cluster::MemberList Cluster::getMembers() const {
+    // TODO aconway 2007-07-04: use read/write lock?
     Mutex::ScopedLock l(lock);
     MemberList result(members.size());
     std::transform(members.begin(), members.end(), result.begin(),
@@ -159,15 +146,13 @@
 {
     Id from(nodeid, pid);
     Buffer buf(static_cast<char*>(msg), msg_len);
-    AMQFrame frame;
+    SessionFrame frame;
     frame.decode(buf);
     QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
-    if (!handleClusterFrame(from, frame)) {
-        FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : 
fromChains.out;
-        unMark(frame);
-        if (chain)
-            chain->handle(frame);
-    }
+    if (frame.uuid.isNull())
+        handleClusterFrame(from, frame.frame);
+    else
+        receivedChain->handle(frame);
 }
 
 bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
@@ -179,7 +164,8 @@
         ;
     return (predicate(*this));
 }
-        
+
+// Handle cluster control frame from the null session.
 bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
     // TODO aconway 2007-06-20: use visitor pattern here.
     ClusterNotifyBody* notifyIn=

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Jul  5 
09:08:29 2007
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/SessionFrame.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/shared_ptr.h"
 #include "qpid/sys/Monitor.h"
@@ -69,13 +70,13 @@
 
     bool empty() const { return size() == 0; }
     
-    /** Get handler chains to send frames to the cluster */ 
-    framing::FrameHandler::Chains getToChains() {
+    /** Get handler chains to send incoming/outgoing frames to the cluster */ 
+    framing::FrameHandler::Chains getSendChains() {
         return toChains;
     }
 
-    /** Set handler chains for frames received from the cluster */
-    void setFromChains(const framing::FrameHandler::Chains& chains);
+    /** Set handler for frames received from the cluster */
+    void setReceivedChain(const SessionFrameHandler::Chain& chain);
 
     /** Wait for predicate(*this) to be true, up to timeout.
      [EMAIL PROTECTED] True if predicate became true, false if timed out.
@@ -91,7 +92,7 @@
     typedef std::map<
         framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
     
-    void mcast(framing::AMQFrame&); ///< send frame by multicast.
+    void mcast(SessionFrame&);  ///< send frame by multicast.
     void notify();              ///< Notify cluster of my details.
 
     void deliver(
@@ -123,7 +124,7 @@
     sys::Thread dispatcher;
     boost::function<void()> callback;
     framing::FrameHandler::Chains toChains;
-    framing::FrameHandler::Chains fromChains;
+    SessionFrameHandler::Chain receivedChain;
 
     struct IncomingHandler;
     struct OutgoingHandler;

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.cpp?view=auto&rev=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.cpp Thu Jul  5 
09:08:29 2007
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SessionFrame.h"
+
+#include "qpid/QpidError.h"
+
+namespace qpid {
+namespace cluster {
+
+void SessionFrame::encode(framing::Buffer& buf) {
+    uuid.encode(buf);
+    frame.encode(buf);
+    buf.putOctet(isIncoming);
+}
+
+void SessionFrame::decode(framing::Buffer& buf) {
+    uuid.decode(buf);
+    if (!frame.decode(buf))
+        THROW_QPID_ERROR(FRAMING_ERROR, "Incomplete frame");
+    isIncoming = buf.getOctet();
+}
+
+size_t SessionFrame::size() const {
+    return uuid.size() + frame.size() + 1 /*isIncoming*/;
+}
+
+std::ostream& operator<<(std::ostream& out, const SessionFrame& frame) {
+    return out << "[session=" << frame.uuid
+               << (frame.isIncoming ? ",in: ":",out: ")
+               << frame.frame << "]";
+}
+
+}} // namespace qpid::cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.h?view=auto&rev=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.h Thu Jul  5 
09:08:29 2007
@@ -0,0 +1,71 @@
+#ifndef QPID_CLUSTER_SESSIONFRAME_H
+#define QPID_CLUSTER_SESSIONFRAME_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Handler.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Uuid.h"
+
+#include <ostream>
+
+namespace qpid {
+
+namespace framing {
+class AMQFrame;
+class Buffer;
+}
+
+namespace cluster {
+
+/**
+ * An AMQFrame with a UUID and direction.
+ */
+struct SessionFrame
+{
+    SessionFrame() : isIncoming(false) {}
+    
+    SessionFrame(const framing::Uuid& id, const framing::AMQFrame& f, bool 
incoming)
+        : uuid(id), frame(f), isIncoming(incoming) {}
+    
+    void encode(framing::Buffer&);
+
+    void decode(framing::Buffer&);
+
+    size_t size() const;
+    
+    static const bool IN = true;
+    static const bool OUT = false;
+
+    framing::Uuid uuid;
+    framing::AMQFrame frame;
+    bool isIncoming;
+};
+
+typedef framing::Handler<SessionFrame&> SessionFrameHandler;
+
+std::ostream& operator<<(std::ostream&, const SessionFrame&);
+
+}} // namespace qpid::cluster
+
+
+
+#endif  /*!QPID_CLUSTER_SESSIONFRAME_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Thu Jul  5 
09:08:29 2007
@@ -47,10 +47,6 @@
 
 AMQFrame::~AMQFrame() {}
 
-AMQBody::shared_ptr AMQFrame::getBody(){
-    return body;
-}
-
 void AMQFrame::encode(Buffer& buffer)
 {
     buffer.putOctet(body->type());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Thu Jul  5 
09:08:29 2007
@@ -32,7 +32,8 @@
 #include "AMQHeartbeatBody.h"
 #include "qpid/framing/AMQP_MethodVersionMap.h"
 #include "qpid/framing/AMQP_HighestVersion.h"
-#include "Buffer.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/shared_ptr.h"
 
 namespace qpid {
 namespace framing {
@@ -49,7 +50,9 @@
     virtual bool decode(Buffer& buffer); 
     virtual uint32_t size() const;
     uint16_t getChannel() const { return channel; }
-    AMQBody::shared_ptr getBody();
+
+    shared_ptr<AMQBody> getBody() { return body; }
+    void setBody(const shared_ptr<AMQBody>& b) { body = b; }
 
     /** Convenience template to cast the body to an expected type */
     template <class T> boost::shared_ptr<T> castBody() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h Thu Jul  5 09:08:29 
2007
@@ -81,6 +81,8 @@
     void putRawData(const uint8_t* data, size_t size);
     void getRawData(uint8_t* data, size_t size);
 
+    template <class T> void put(const T& data) { data.encode(*this); }
+    template <class T> void get(T& data) { data.decode(*this); }
 };
 
 }} // namespace qpid::framing

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp Thu Jul  5 09:08:29 
2007
@@ -21,20 +21,14 @@
 #include "qpid/QpidError.h"
 #include "qpid/framing/Buffer.h"
 
-#include <uuid/uuid.h>
-
 namespace qpid {
 namespace framing {
 
 using namespace std;
 
-Uuid::Uuid() { uuid_generate(c_array()); }
-
-Uuid::Uuid(uint8_t* uu) { uuid_copy(c_array(),uu); }
-
 static const size_t UNPARSED_SIZE=36; 
 
-void Uuid::encode(Buffer& buf) {
+void Uuid::encode(Buffer& buf) const {
     buf.putRawData(data(), size());
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.h?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.h Thu Jul  5 09:08:29 
2007
@@ -20,9 +20,12 @@
  */
 
 #include <boost/array.hpp>
+
 #include <ostream>
 #include <istream>
 
+#include <uuid/uuid.h>
+
 namespace qpid {
 namespace framing {
 
@@ -35,16 +38,29 @@
  * boost::array so Uuid can be the key type in a map etc.
  */
 struct Uuid : public boost::array<uint8_t, 16> {
-    /** Geneate universally  unique identifier */
-    Uuid();
+    /** If unique is true, generate a unique ID else a null ID. */
+    Uuid(bool unique=false) { if (unique) generate(); else clear(); }
+
+    /** Copy from 16 bytes of data */
+    Uuid(const uint8_t* data) { assign(data); }
 
-    /** Initialize from 16 bytes of data */
-    Uuid(uint8_t* data);
+    /** Copy from 16 bytes of data */
+    void assign(const uint8_t* data) { uuid_copy(c_array(), data); }
+    
+    /** Set to a new unique identifier */
+    void generate() { uuid_generate(c_array()); }
+
+    /** Set to all zeros */
+    void clear() { uuid_clear(c_array()); }
+    
+    /** Test for null (all zeros) */
+    bool isNull() const { return uuid_is_null(data()); }
 
     // Default op= and copy ctor are fine.
     // boost::array gives us ==, < etc.
 
-    void encode(framing::Buffer& buf);
+    void encode(framing::Buffer& buf) const;
+    
     void decode(framing::Buffer& buf);
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp Thu Jul  5 09:08:29 2007
@@ -36,10 +36,13 @@
 BOOST_AUTO_TEST_CASE(testClusterOne) {
     TestCluster cluster("clusterOne", "amqp:one:1");
     AMQFrame frame(VER, 1, new ChannelPingBody(VER));
-    cluster.getToChains().in->handle(frame);
-    BOOST_REQUIRE(cluster.in.waitFor(1));
+    cluster.getSendChains().in->handle(frame);
+    BOOST_REQUIRE(cluster.received.waitFor(1));
 
-    BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody());
+    SessionFrame& sf=cluster.received[0];
+    BOOST_CHECK(sf.isIncoming);
+    BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
+    
     BOOST_CHECK_EQUAL(1u, cluster.size());
     Cluster::MemberList members = cluster.getMembers();
     BOOST_CHECK_EQUAL(1u, members.size());
@@ -57,11 +60,13 @@
 
         // Exchange frames with child.
         AMQFrame frame(VER, 1, new ChannelPingBody(VER));
-        cluster.getToChains().in->handle(frame);
-        BOOST_REQUIRE(cluster.in.waitFor(1));
-        BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody());
-        BOOST_REQUIRE(cluster.out.waitFor(1));
-        BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.out[0].getBody());
+        cluster.getSendChains().in->handle(frame);
+        BOOST_REQUIRE(cluster.received.waitFor(1));
+        SessionFrame& sf=cluster.received[0];
+        BOOST_CHECK(sf.isIncoming);
+        BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
+        BOOST_REQUIRE(cluster.received.waitFor(2));
+        BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, 
*cluster.received[1].frame.getBody());
 
         // Wait for child to exit.
         int status;
@@ -99,3 +104,4 @@
     BOOST_CHECK_EQUAL(1u, other->count);
 }
     
+

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h Thu Jul  5 09:08:29 2007
@@ -44,10 +44,10 @@
 
 void null_deleter(void*) {}
 
-struct TestFrameHandler :
-    public FrameHandler, public vector<AMQFrame>, public Monitor
+template <class T>
+struct TestHandler : public Handler<T&>, public vector<T>, public Monitor
 {
-    void handle(AMQFrame& frame) {
+    void handle(T& frame) {
         Mutex::ScopedLock l(*this);
         push_back(frame);
         notifyAll();
@@ -56,23 +56,22 @@
     bool waitFor(size_t n) {
         Mutex::ScopedLock l(*this);
         AbsTime deadline(now(), 5*TIME_SEC);
-        while (size() != n && wait(deadline))
+        while (vector<T>::size() != n && wait(deadline))
             ;
-        return size() == n;
+        return vector<T>::size() == n;
     }
 };
 
+typedef TestHandler<AMQFrame> TestFrameHandler;
+typedef TestHandler<SessionFrame> TestSessionFrameHandler;
+
 void nullDeleter(void*) {}
 
 struct TestCluster : public Cluster
 {
     TestCluster(string name, string url) : Cluster(name, url)
     {
-        setFromChains(
-            FrameHandler::Chains(
-                make_shared_ptr(&in, nullDeleter),
-                make_shared_ptr(&out, nullDeleter)
-            ));
+        setReceivedChain(make_shared_ptr(&received, nullDeleter));
     }
 
     /** Wait for cluster to be of size n. */
@@ -80,7 +79,7 @@
         return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), 
n));
     }
 
-    TestFrameHandler in, out;
+    TestSessionFrameHandler received;
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp Thu Jul  5 
09:08:29 2007
@@ -33,13 +33,16 @@
 /** Chlid part of Cluster::clusterTwo test */
 void clusterTwo() {
     TestCluster cluster("clusterTwo", "amqp::2");
-    BOOST_REQUIRE(cluster.in.waitFor(1)); // Frame from parent.
-    BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody());
+    BOOST_REQUIRE(cluster.received.waitFor(1)); // Frame from parent.
+    BOOST_CHECK(cluster.received[0].isIncoming);
+    BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, 
*cluster.received[0].frame.getBody());
     BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
+
     AMQFrame frame(VER, 1, new ChannelOkBody(VER));
-    cluster.getToChains().out->handle(frame);
-    BOOST_REQUIRE(cluster.out.waitFor(1));
-    BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.out[0].getBody());
+    cluster.getSendChains().out->handle(frame);
+    BOOST_REQUIRE(cluster.received.waitFor(2));
+    BOOST_CHECK(!cluster.received[1].isIncoming);
+    BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, 
*cluster.received[1].frame.getBody());
 } 
 
 int test_main(int, char**) {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp?view=diff&rev=553543&r1=553542&r2=553543
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp Thu Jul  5 09:08:29 2007
@@ -37,6 +37,7 @@
 BOOST_AUTO_TEST_CASE(testUuidCtor) {
     // Uniqueness
     boost::array<Uuid,1000> uuids;
+    for_each(uuids.begin(), uuids.end(), mem_fun_ref(&Uuid::generate));
     UniqueSet unique;
     for_each(uuids.begin(), uuids.end(), unique);
 }
@@ -62,10 +63,11 @@
 
 BOOST_AUTO_TEST_CASE(testUuidEncodeDecode) {
     Buffer buf(Uuid::size());
-    Uuid uuid;
+    Uuid uuid(sample.c_array());
     uuid.encode(buf);
     buf.flip();
     Uuid decoded;
     decoded.decode(buf);
-    BOOST_CHECK(uuid==decoded);
+    BOOST_CHECK_EQUAL(string(sample.begin(), sample.end()),
+                      string(decoded.begin(), decoded.end()));
 }


Reply via email to