Author: aconway
Date: Wed Sep 10 11:15:25 2008
New Revision: 693918

URL: http://svn.apache.org/viewvc?rev=693918&view=rev
Log:

Cluster support for copying shared broker state to new members.

cluster/DumpClient: Copies broker shared state to a new broker via AMQP.
broker/*Registry, Queue, QueueBindings: Added iteration functions for DumpClient
broker/SemanticState.cpp: Allow DumpClient to sidestep setting of 
delivery-properties.exchange.
client/Connection.h: Added Connection::open(Url) overload.
client/SessionImpl: Added send(AMQBody, FrameSet) overload for forwarding 
broker messages.
tests/cluster_test.cpp: Added test for DumpClient copying shared state between 
brokers.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Wed Sep 10 11:15:25 2008
@@ -27,9 +27,11 @@
   qpid/cluster/OutputInterceptor.cpp \
   qpid/cluster/ProxyInputHandler.h \
   qpid/cluster/Event.h \
-  qpid/cluster/Event.cpp
+  qpid/cluster/Event.cpp \
+  qpid/cluster/DumpClient.h \
+  qpid/cluster/DumpClient.cpp
 
-libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
+libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
 
 else
 # Empty stub library to satisfy rpm spec file.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h Wed Sep 10 11:15:25 2008
@@ -45,7 +45,11 @@
 std::ostream& operator<<(std::ostream& os, const TcpAddress& a);
 
 /** Address is a variant of all address types, more coming in future. */
-typedef boost::variant<TcpAddress> Address;
+struct Address : public boost::variant<TcpAddress> {
+    template <class T> Address(const T& t) : boost::variant<TcpAddress>(t) {}
+    template <class T> T* get() { return boost::get<T>(this); }
+    template <class T> const T* get() const { return boost::get<T>(this); }
+};
 
 /** An AMQP URL contains a list of addresses */
 struct Url : public std::vector<Address> {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Wed Sep 10 
11:15:25 2008
@@ -22,51 +22,64 @@
  *
  */
 
-#include <map>
-#include <boost/function.hpp>
 #include "Exchange.h"
 #include "MessageStore.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/management/Manageable.h"
 
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+
+#include <algorithm>
+#include <map>
+
 namespace qpid {
 namespace broker {
-    struct UnknownExchangeTypeException{};
 
-    class ExchangeRegistry{
-     public:
-        typedef boost::function4<Exchange::shared_ptr, const std::string&, 
-                                 bool, const qpid::framing::FieldTable&, 
qpid::management::Manageable*> FactoryFunction;
-
-        ExchangeRegistry () : parent(0) {}
-        std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, 
const std::string& type)
-            throw(UnknownExchangeTypeException);
-        std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, 
const std::string& type, 
-                                                      bool durable, const 
qpid::framing::FieldTable& args = framing::FieldTable())
-            throw(UnknownExchangeTypeException);
-        void destroy(const std::string& name);
-        Exchange::shared_ptr get(const std::string& name);
-        Exchange::shared_ptr getDefault();
-
-        /**
-         * Register the manageable parent for declared exchanges
-         */
-        void setParent (management::Manageable* _parent) { parent = _parent; }
-
-        void registerType(const std::string& type, FactoryFunction);
-      private:
-        typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
-        typedef std::map<std::string, FactoryFunction > FunctionMap;
-
-        ExchangeMap exchanges;
-        FunctionMap factory;
-        qpid::sys::RWlock lock;
-        management::Manageable* parent;
-
-    };
-}
-}
+struct UnknownExchangeTypeException{};
+
+class ExchangeRegistry{
+  public:
+    typedef boost::function4<Exchange::shared_ptr, const std::string&, 
+                             bool, const qpid::framing::FieldTable&, 
qpid::management::Manageable*> FactoryFunction;
+
+    ExchangeRegistry () : parent(0) {}
+    std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, 
const std::string& type)
+        throw(UnknownExchangeTypeException);
+    std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, 
const std::string& type, 
+                                                  bool durable, const 
qpid::framing::FieldTable& args = framing::FieldTable())
+        throw(UnknownExchangeTypeException);
+    void destroy(const std::string& name);
+    Exchange::shared_ptr get(const std::string& name);
+    Exchange::shared_ptr getDefault();
+
+    /**
+     * Register the manageable parent for declared exchanges
+     */
+    void setParent (management::Manageable* _parent) { parent = _parent; }
+
+    void registerType(const std::string& type, FactoryFunction);
+
+    /** Call f for each exchange in the registry. */
+    template <class F> void eachExchange(const F& f) const {
+        qpid::sys::RWlock::ScopedWlock l(lock);
+        std::for_each(exchanges.begin(), exchanges.end(),
+                      boost::bind(f, 
boost::bind(&ExchangeMap::value_type::second, _1)));
+    }
+        
+  private:
+    typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
+    typedef std::map<std::string, FactoryFunction > FunctionMap;
+
+    ExchangeMap exchanges;
+    FunctionMap factory;
+    mutable qpid::sys::RWlock lock;
+    management::Manageable* parent;
+
+};
+
+}} // namespace qpid::broker
 
 
 #endif  /*!_broker_ExchangeRegistry_h*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Wed Sep 10 11:15:25 
2008
@@ -84,6 +84,11 @@
         return p->get<T>(true);
     }
 
+    template <class T> const T* hasProperties() const {
+        const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+        return p->get<T>();
+    }
+
     template <class T> const T* getMethod() const {
         return frames.as<T>();
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Sep 10 11:15:25 
2008
@@ -34,14 +34,15 @@
 #include "qpid/management/Queue.h"
 #include "qpid/framing/amqp_types.h"
 
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/intrusive_ptr.hpp>
+
 #include <list>
 #include <vector>
 #include <memory>
 #include <deque>
-
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/intrusive_ptr.hpp>
+#include <algorithm>
 
 namespace qpid {
     namespace broker {
@@ -172,6 +173,8 @@
             inline const framing::FieldTable& getSettings() const { return 
settings; }
             inline bool isAutoDelete() const { return autodelete; }
             bool canAutoDelete() const;
+            const QueueBindings& getBindings() const { return bindings; }
+
 
             bool enqueue(TransactionContext* ctxt, 
boost::intrusive_ptr<Message> msg);
             /**
@@ -205,6 +208,17 @@
             management::ManagementObject* GetManagementObject (void) const;
             management::Manageable::status_t
             ManagementMethod (uint32_t methodId, management::Args& args);
+
+            /** Apply f to each Message on the queue. */
+            template <class F> void eachMessage(const F& f) const {
+                sys::Mutex::ScopedLock l(messageLock);
+                std::for_each(messages.begin(), messages.end(), f);
+            }
+
+            /** Apply f to each QueueBinding on the queue */
+            template <class F> void eachBinding(const F& f) {
+                bindings.eachBinding(f);
+            }
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.cpp Wed Sep 10 
11:15:25 2008
@@ -29,7 +29,7 @@
 
 void QueueBindings::add(const string& exchange, const string& key, const 
FieldTable& args)
 {
-    bindings.push_back(new Binding(exchange, key, args));
+    bindings.push_back(QueueBinding(exchange, key, args));
 }
 
 void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr 
queue)
@@ -37,11 +37,10 @@
     for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
         try {
             exchanges.get(i->exchange)->unbind(queue, i->key, &(i->args));
-        } catch (const NotFoundException&) {
-        }
+        } catch (const NotFoundException&) {}
     }
 }
 
-QueueBindings::Binding::Binding(const string& _exchange, const string& _key, 
const FieldTable& _args)
+QueueBinding::QueueBinding(const string& _exchange, const string& _key, const 
FieldTable& _args)
     : exchange(_exchange), key(_key), args(_args)
 {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.h Wed Sep 10 
11:15:25 2008
@@ -24,32 +24,38 @@
 #include "qpid/framing/FieldTable.h"
 #include <boost/ptr_container/ptr_list.hpp>
 #include <boost/shared_ptr.hpp>
+#include <algorithm>
 
 namespace qpid {
 namespace broker {
 
 class ExchangeRegistry;
 class Queue;
+
+struct QueueBinding{
+    std::string exchange;
+    std::string key;
+    qpid::framing::FieldTable args;
+    QueueBinding(const std::string& exchange, const std::string& key, const 
qpid::framing::FieldTable& args);
+};
+
 class QueueBindings
 {
-    struct Binding{
-        const std::string exchange;
-        const std::string key;
-        const qpid::framing::FieldTable args;
-        Binding(const std::string& exchange, const std::string& key, const 
qpid::framing::FieldTable& args);
-    };
-    
-    typedef boost::ptr_list<Binding> Bindings;
-    Bindings bindings;
+  public:
     
-public:
+    /** Apply f to each QueueBinding. */
+    template <class F> void eachBinding(const F& f) const { 
std::for_each(bindings.begin(), bindings.end(), f); }
+
     void add(const std::string& exchange, const std::string& key, const 
qpid::framing::FieldTable& args);
     void unbind(ExchangeRegistry& exchanges, boost::shared_ptr<Queue> queue);
+
+  private:
+    typedef std::vector<QueueBinding> Bindings;
+    Bindings bindings;
 };
 
 
-}
-}
+}} // namespace qpid::broker
 
 
 #endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Wed Sep 10 
11:15:25 2008
@@ -21,10 +21,12 @@
 #ifndef _QueueRegistry_
 #define _QueueRegistry_
 
-#include <map>
-#include "qpid/sys/Mutex.h"
 #include "Queue.h"
+#include "qpid/sys/Mutex.h"
 #include "qpid/management/Manageable.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+#include <map>
 
 namespace qpid {
 namespace broker {
@@ -98,11 +100,18 @@
      * Register the manageable parent for declared queues
      */
     void setParent (management::Manageable* _parent) { parent = _parent; }
+
+    /** Call f for each queue in the registry. */
+    template <class F> void eachQueue(const F& f) const {
+        qpid::sys::RWlock::ScopedWlock l(lock);
+        std::for_each(queues.begin(), queues.end(),
+                      boost::bind(f, 
boost::bind(&QueueMap::value_type::second, _1)));
+    }
     
 private:
     typedef std::map<string, Queue::shared_ptr> QueueMap;
     QueueMap queues;
-    qpid::sys::RWlock lock;
+    mutable qpid::sys::RWlock lock;
     int counter;
     MessageStore* store;
     management::Manageable* parent;
@@ -112,8 +121,7 @@
 };
 
     
-}
-}
+}} // namespace qpid::broker
 
 
 #endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Sep 10 
11:15:25 2008
@@ -353,8 +353,14 @@
 void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
     std::string exchangeName = msg->getExchangeName();
     //TODO: the following should be hidden behind message (using 
MessageAdapter or similar)
+
+    // Do not replace the delivery-properties.exchange if it is is already set.
+    // This is used internally (by the cluster) to force the exchange name on 
a message.
+    // The client library ensures this is always empty for messages from 
normal clients.
     if (msg->isA<MessageTransferBody>()) {
-        msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+        if (!msg->hasProperties<DeliveryProperties>() ||
+            msg->getProperties<DeliveryProperties>()->getExchange().empty()) 
+            
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
     }
     if (!cacheExchange || cacheExchange->getName() != exchangeName){
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
@@ -392,11 +398,6 @@
 {    
     if(c.isBlocked())
         outputTasks.activateOutput();
-    // TODO aconway 2008-07-16:  we could directly call
-    //  c.doOutput();
-    // since we are in the connections thread but for consistency
-    // activateOutput() will set it up to be called in the next write idle.
-    // Current cluster code depends on this, review cluster code to change.
 }
 
 void SemanticState::complete(DeliveryRecord& delivery)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed Sep 10 
11:15:25 2008
@@ -117,7 +117,6 @@
     SessionHandler* handler;    
     sys::AbsTime expiry;        // Used by SessionManager.
     bool ignoring;
-    std::string name;
     SemanticState semanticState;
     SessionAdapter adapter;
     MessageBuilder msgBuilder;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Wed Sep 10 
11:15:25 2008
@@ -23,6 +23,7 @@
 #include "Message.h"
 #include "SessionImpl.h"
 #include "SessionBase_0_10Access.h"
+#include "qpid/Url.h"
 #include "qpid/log/Logger.h"
 #include "qpid/log/Options.h"
 #include "qpid/log/Statement.h"
@@ -48,6 +49,37 @@
 Connection::~Connection(){ }
 
 void Connection::open(
+    const Url& url,
+    const std::string& uid, const std::string& pwd, 
+    const std::string& vhost,
+    uint16_t maxFrameSize)
+{
+    if (url.empty())
+        throw Exception(QPID_MSG("Attempt to open URL with no addresses."));
+    Url::const_iterator i = url.begin();
+    do {
+        const TcpAddress* tcp = i->get<TcpAddress>();
+        i++;
+        if (tcp) {
+            try {
+                ConnectionSettings settings;
+                settings.host = tcp->host;
+                settings.port = tcp->port;
+                settings.username = uid;
+                settings.password = pwd;
+                settings.virtualhost = vhost;
+                settings.maxFrameSize = maxFrameSize;
+                open(settings);
+                break;
+            }
+            catch (const Exception& e) {
+                if (i == url.end()) throw;
+            }
+        }
+    } while (i != url.end());
+}
+
+void Connection::open(
     const std::string& host, int port,
     const std::string& uid, const std::string& pwd, 
     const std::string& vhost,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Wed Sep 10 
11:15:25 2008
@@ -1,5 +1,5 @@
-#ifndef _client_Connection_
-#define _client_Connection_
+#ifndef QPID_CLIENT_CONNECTION_H
+#define QPID_CLIENT_CONNECTION_H
 
 /*
  *
@@ -26,6 +26,9 @@
 #include "qpid/client/Session.h"
 
 namespace qpid {
+
+class Url;
+
 namespace client {
 
 class ConnectionSettings;
@@ -77,6 +80,28 @@
               const std::string& virtualhost = "/", uint16_t 
maxFrameSize=65535);
 
     /**
+     * Opens a connection to a broker using a URL.
+     * If the URL contains multiple addresses, try each in turn
+     * till connection is successful.
+     * 
+     * @url address of the broker to connect to.
+     *
+     * @param uid the userid to connect with.
+     *
+     * @param pwd the password to connect with (currently SASL
+     * PLAIN is the only authentication method supported so this
+     * is sent in clear text).
+     * 
+     * @param virtualhost the AMQP virtual host to use (virtual
+     * hosts, where implemented(!), provide namespace partitioning
+     * within a single broker).
+     */
+    void open(const Url& url,
+              const std::string& uid = "guest",
+              const std::string& pwd = "guest", 
+              const std::string& virtualhost = "/", uint16_t 
maxFrameSize=65535);
+
+    /**
      * Opens a connection to a broker.
      * 
      * @param the settings to use (host, port etc) @see ConnectionSettings
@@ -146,4 +171,4 @@
 }} // namespace qpid::client
 
 
-#endif
+#endif  /*!QPID_CLIENT_CONNECTION_H*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp Wed Sep 10 
11:15:25 2008
@@ -24,9 +24,7 @@
 namespace qpid {
 namespace client {
 
-Message::Message(const std::string& data_,
-                 const std::string& routingKey,
-                 const std::string& exchange) : TransferContent(data_, 
routingKey, exchange) {}
+Message::Message(const std::string& data, const std::string& routingKey) : 
TransferContent(data, routingKey) {}
 
 std::string Message::getDestination() const 
 { 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Wed Sep 10 11:15:25 
2008
@@ -40,11 +40,9 @@
     /** Create a Message.
      [EMAIL PROTECTED] data Data for the message body.
      [EMAIL PROTECTED] routingKey Passed to the exchange that routes the 
message.
-     [EMAIL PROTECTED] exchange Name of the exchange that should route the 
message.
      */
     Message(const std::string& data=std::string(),
-            const std::string& routingKey=std::string(),
-            const std::string& exchange=std::string());
+            const std::string& routingKey=std::string());
 
     /** The destination of messages sent to the broker is the exchange
      * name.  The destination of messages received from the broker is

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Wed Sep 10 
11:15:25 2008
@@ -28,9 +28,11 @@
 #include "qpid/framing/ClientInvoker.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/FrameSet.h"
+#include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MethodContent.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/DeliveryProperties.h"
 #include "qpid/log/Statement.h"
 
 #include <boost/bind.hpp>
@@ -272,6 +274,41 @@
     return sendCommand(command, &content);
 }
 
+namespace {
+// Functor for FrameSet::map to send header + content frames but, not method 
frames.
+struct SendContentFn {
+    FrameHandler& handler;
+    void operator()(const AMQFrame& f) {
+        if (!f.getMethod()) 
+            handler(const_cast<AMQFrame&>(f));
+    }
+    SendContentFn(FrameHandler& h) : handler(h) {}
+};
+}
+    
+Future SessionImpl::send(const AMQBody& command, const FrameSet& content) {
+    Acquire a(sendLock);
+    SequenceNumber id = nextOut++;
+    {
+        Lock l(state);
+        checkOpen();    
+        incompleteOut.add(id);
+    }
+    Future f(id);
+    if (command.getMethod()->resultExpected()) {        
+        Lock l(state);
+        //result listener must be set before the command is sent
+        f.setFutureResult(results.listenForResult(id));
+    }
+    AMQFrame frame(command);
+    frame.setEof(false);
+    handleOut(frame);
+
+    SendContentFn send(out);
+    content.map(send);
+    return f;
+}
+
 Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* 
content)
 {
     Acquire a(sendLock);
@@ -297,9 +334,16 @@
     }
     return f;
 }
+
 void SessionImpl::sendContent(const MethodContent& content)
 {
     AMQFrame header(content.getHeader());
+
+    // Client is not allowed to set the delivery-properties.exchange.
+    AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
+    if (headerp && headerp->get<DeliveryProperties>())
+        headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
+
     header.setFirstSegment(false);
     uint64_t data_length = content.getData().length();
     if(data_length > 0){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Wed Sep 10 
11:15:25 2008
@@ -81,6 +81,7 @@
 
     Future send(const framing::AMQBody& command);
     Future send(const framing::AMQBody& command, const framing::MethodContent& 
content);
+    Future send(const framing::AMQBody& command, const framing::FrameSet& 
content);
 
     Demux& getDemux();
     void markCompleted(const framing::SequenceNumber& id, bool cumulative, 
bool notifyPeer);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Sep 10 
11:15:25 2008
@@ -25,7 +25,7 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AMQP_AllOperations.h"
 #include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterUrlNoticeBody.h"
+#include "qpid/framing/ClusterJoiningBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
 #include "qpid/log/Statement.h"
@@ -50,7 +50,13 @@
     Cluster& cluster;
     MemberId member;
     ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) 
{}
-    void urlNotice(const std::string& u) { cluster.urlNotice (member, u); }
+    void joining(const std::string& u) { cluster.joining (member, u); }
+    void ready() { cluster.ready(member); }
+
+    void members(const framing::FieldTable& , const framing::FieldTable& , 
const framing::FieldTable& ) {
+        assert(0); // Not passed to cluster, used to start a brain dump over 
TCP.
+    }
+
     bool invoke(AMQFrame& f) { return framing::invoke(*this, 
*f.getBody()).wasHandled(); }
 };
 
@@ -237,7 +243,7 @@
     
     if (nJoined)                // Notfiy new members of my URL.
         mcastFrame(
-            AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), 
url.str())),
+            AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), 
url.str())),
             ConnectionId(self,0));
 
 
@@ -261,11 +267,15 @@
     broker->shutdown();
 }
 
-void Cluster::urlNotice(const MemberId& m, const string& url) {
+void Cluster::joining(const MemberId& m, const string& url) {
     QPID_LOG(notice, "Cluster member " << m << " has URL " << url);
     urls.insert(UrlMap::value_type(m,Url(url)));
 }
 
+void Cluster::ready(const MemberId& ) {
+    // FIXME aconway 2008-09-08: TODO
+}
+    
 }} // namespace qpid::cluster
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Sep 10 
11:15:25 2008
@@ -75,7 +75,8 @@
     /** Leave the cluster */
     void leave();
     
-    void urlNotice(const MemberId&, const std::string& url);
+    void joining(const MemberId&, const std::string& url);
+    void ready(const MemberId&);
 
     broker::Broker& getBroker() { assert(broker); return *broker; }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Sep 10 
11:15:25 2008
@@ -26,6 +26,7 @@
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
 #include "qpid/shared_ptr.h"
+#include "qpid/log/Statement.h"
 
 #include <boost/utility/in_place_factory.hpp>
 
@@ -75,7 +76,7 @@
     void initialize(Plugin::Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         if (!broker || values.name.empty()) return;  // Only if --cluster-name 
option was specified.
-        if (cluster) throw Exception("Cluster plugin cannot be initialized 
twice in one process.");
+        QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of 
cluster plugin.");
         cluster = new Cluster(values.name, values.getUrl(broker->getPort()), 
*broker);
         broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
         broker->setConnectionFactory(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Sep 10 
11:15:25 2008
@@ -89,6 +89,18 @@
     // ConnectionInputHandlerFactory
     sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, 
const std::string& id, bool isClient);
 
+    // State dump methods.
+    virtual void sessionState(const framing::SequenceNumber& /*replayId*/,
+                              const framing::SequenceNumber& /*sendId*/,
+                              const framing::SequenceSet& /*sentIncomplete*/,
+                              const framing::SequenceNumber& /*expectedId*/,
+                              const framing::SequenceNumber& /*receivedId*/,
+                              const framing::SequenceSet& /*unknownCompleted*/,
+                              const framing::SequenceSet& 
/*receivedIncomplete*/) {}
+    
+    virtual void shadowReady(uint64_t /*clusterId*/,
+                             const std::string& /*userId*/) {}
+
   private:
     void sendDoOutput();
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=693918&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Wed Sep 10 
11:15:25 2008
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DumpClient.h"
+#include "qpid/client/SessionBase_0_10Access.h" 
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/enum.h"
+#include "qpid/Url.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace cluster {
+
+using broker::Broker;
+using broker::Exchange;
+using broker::Queue;
+using broker::QueueBinding;
+using broker::Message;
+using namespace framing::message;
+
+using namespace client;
+
+DumpClient::DumpClient(const Url& url) {
+    connection.open(url);
+    session = connection.newSession();
+}
+
+DumpClient::~DumpClient() {
+    session.close();
+    connection.close();
+}
+
+// Catch-up exchange name: an illegal AMQP exchange name to avoid clashes.
+static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
+static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); 
+
+void DumpClient::dump(Broker& donor) {
+    // TODO aconway 2008-09-08: Caller must handle exceptions
+    // FIXME aconway 2008-09-08: send cluster map frame first.
+    donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, 
this, _1));
+    // Catch-up exchange is used to route messages to the proper queue without 
modifying routing key.
+    session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", 
arg::autoDelete=true);
+    donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
+    session.sync();
+}
+
+void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
+    session.exchangeDeclare(
+        ex->getName(), ex->getType(),
+        ex->getAlternate() ? ex->getAlternate()->getName() : std::string(),
+        arg::passive=false,
+        arg::durable=ex->isDurable(),
+        arg::autoDelete=false,
+        arg::arguments=ex->getArgs());
+}
+
+void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
+    session.queueDeclare(
+        q->getName(),
+        q->getAlternateExchange() ? q->getAlternateExchange()->getName() : 
std::string(),
+        arg::passive=false,
+        arg::durable=q->isDurable(),
+        arg::exclusive=q->hasExclusiveConsumer(),
+        arg::autoDelete=q->isAutoDelete(),
+        arg::arguments=q->getSettings());
+
+    session.exchangeBind(q->getName(), CATCH_UP, std::string());
+    q->eachMessage(boost::bind(&DumpClient::dumpMessage, this, _1));
+    session.exchangeUnbind(q->getName(), CATCH_UP, std::string());
+    q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), 
_1));
+}
+
+void DumpClient::dumpMessage(const broker::QueuedMessage& message) {
+    SessionBase_0_10Access sb(session);
+    framing::MessageTransferBody transfer(framing::ProtocolVersion(), 
CATCH_UP, ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED);
+    sb.get()->send(transfer, message.payload->getFrames());
+}
+
+void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& 
binding) {
+    session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
+}
+
+
+}} // namespace qpid::cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=693918&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Wed Sep 10 
11:15:25 2008
@@ -0,0 +1,74 @@
+#ifndef QPID_CLUSTER_DUMPCLIENT_H
+#define QPID_CLUSTER_DUMPCLIENT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include <boost/shared_ptr.hpp>
+
+
+namespace qpid {
+
+class Url;
+
+namespace broker {
+
+class Broker;
+class Queue;
+class Exchange;
+class QueueBindings;
+class QueueBinding;
+class QueuedMessage;
+} // namespace broker
+
+namespace cluster {
+
+/**
+ * A client that dumps the contents of a local broker to a remote one using 
AMQP.
+ */
+class DumpClient {
+  public:
+    DumpClient(const Url& receiver);
+    ~DumpClient();
+    
+    void dump(broker::Broker& donor);
+
+  private:
+    void dumpQueue(const boost::shared_ptr<broker::Queue>&);
+    void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
+    void dumpMessage(const broker::QueuedMessage&);
+    void dumpBinding(const std::string& queue, const broker::QueueBinding& 
binding);
+
+  private:
+    client::Connection connection;
+    client::AsyncSession session;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_DUMPCLIENT_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp Wed Sep 
10 11:15:25 2008
@@ -24,15 +24,12 @@
 namespace qpid {
 namespace framing {
 
-TransferContent::TransferContent(const std::string& data,
-                                 const std::string& routingKey,
-                                 const std::string& exchange)
-{
+TransferContent::TransferContent(const std::string& data, const std::string& 
key) {
     setData(data);
-    if (routingKey.size()) getDeliveryProperties().setRoutingKey(routingKey);
-    if (exchange.size()) getDeliveryProperties().setExchange(exchange);
+    if (!key.empty()) getDeliveryProperties().setRoutingKey(key);
 }
 
+
 AMQHeaderBody TransferContent::getHeader() const
 {
     return header;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h Wed Sep 10 
11:15:25 2008
@@ -36,9 +36,7 @@
     AMQHeaderBody header;
     std::string data;
 public:
-    TransferContent(const std::string& data = std::string(),
-                    const std::string& routingKey = std::string(),
-                    const std::string& exchange = std::string());
+    TransferContent(const std::string& data = std::string(), const 
std::string& key=std::string());
 
     ///@internal
     AMQHeaderBody getHeader() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Sep 10 
11:15:25 2008
@@ -23,6 +23,7 @@
 
 #include "qpid/cluster/Cpg.h"
 #include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/DumpClient.h"
 #include "qpid/framing/AMQBody.h"
 #include "qpid/client/Connection.h"
 #include "qpid/client/Session.h"
@@ -51,6 +52,7 @@
 using namespace qpid::cluster;
 using namespace qpid::framing;
 using namespace qpid::client;
+using qpid::sys::TIME_SEC;
 using qpid::broker::Broker;
 using boost::ptr_vector;
 using qpid::cluster::Cluster;
@@ -133,6 +135,86 @@
     return o;
 }
 
+QPID_AUTO_TEST_CASE(testDumpClient) {
+    BrokerFixture donor, receiver;
+    {
+        Client c(donor.getPort());
+        FieldTable args;
+        args.setString("x", "y");
+        c.session.queueDeclare("qa", arg::arguments=args);
+        c.session.queueDeclare("qb", arg::alternateExchange="amq.direct");
+
+        c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", 
arg::arguments=args);
+        c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", 
arg::bindingKey="foo");
+        c.session.messageTransfer(arg::destination="exd", 
arg::content=TransferContent("one", "foo"));
+
+        c.session.exchangeDeclare("ext", arg::type="topic");
+        c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", 
arg::bindingKey="bar");
+        c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0));
+        c.session.messageTransfer(arg::destination="ext", 
arg::content=TransferContent("one", "bar"));
+        c.session.messageTransfer(arg::destination="ext", 
arg::content=TransferContent("two", "bar"));
+
+        c.session.close();
+        c.connection.close();
+    }
+    qpid::cluster::DumpClient dump(Url::getIpAddressesUrl(receiver.getPort()));
+    dump.dump(*donor.broker);
+    {
+        Client r(receiver.getPort());
+        // Verify exchanges
+        ExchangeQueryResult ex=r.session.exchangeQuery("exd");
+        BOOST_CHECK_EQUAL(ex.getType(), "direct");
+        BOOST_CHECK_EQUAL(ex.getDurable(), false);
+        BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+        BOOST_CHECK_EQUAL(ex.getArguments().getString("x"), "y");
+
+        ex = r.session.exchangeQuery("ext");
+        BOOST_CHECK_EQUAL(ex.getType(), "topic");
+        BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+
+        // Verify queues
+        QueueQueryResult qq = r.session.queueQuery("qa");
+        BOOST_CHECK_EQUAL(qq.getQueue(), "qa");
+        BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "");
+        BOOST_CHECK_EQUAL(qq.getArguments().getString("x"), "y");
+        BOOST_CHECK_EQUAL(qq.getMessageCount(), 1);
+
+        qq = r.session.queueQuery("qb");
+        BOOST_CHECK_EQUAL(qq.getQueue(), "qb");
+        BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "amq.direct");
+        BOOST_CHECK_EQUAL(qq.getMessageCount(), 2);
+
+        // Verify messages
+        Message m;
+        BOOST_CHECK(r.subs.get(m, "qa", TIME_SEC));
+        BOOST_CHECK_EQUAL(m.getData(), "one");
+        BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "exd");
+        BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "foo");
+
+        BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+        BOOST_CHECK_EQUAL(m.getData(), "one");
+        BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+        BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+        BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+        BOOST_CHECK_EQUAL(m.getData(), "two");
+        BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+        BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+        // Verify bindings
+        r.session.messageTransfer(arg::destination="exd", 
arg::content=TransferContent("xxx", "foo"));
+        BOOST_CHECK(r.subs.get(m, "qa"));
+        BOOST_CHECK_EQUAL(m.getData(), "xxx");
+        
+        r.session.messageTransfer(arg::destination="ext", 
arg::content=TransferContent("yyy", "bar"));
+        BOOST_CHECK(r.subs.get(m, "qb"));
+        BOOST_CHECK_EQUAL(m.getData(), "yyy");
+
+        r.session.close();
+        r.connection.close();
+    }
+}
+
 QPID_AUTO_TEST_CASE(testForkedBroker) {
     // Verify the ForkedBroker works as expected.
     const char* argv[] = { "", "--auth=no", "--no-data-dir", 
"--log-prefix=testForkedBroker" };
@@ -146,8 +228,7 @@
     ClusterFixture cluster(1);
     Client c(cluster[0]);
     BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
-    BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty());
-    // FIXME aconway 2008-09-01: leaks if aisexec not running, investigate.
+    BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound());
 }
 
 QPID_AUTO_TEST_CASE(testWiringReplication) {
@@ -234,5 +315,4 @@
     BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
 }
 
-
 QPID_AUTO_TEST_SUITE_END()

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Wed Sep 10 11:15:25 2008
@@ -22,20 +22,66 @@
 
 <amqp major="0" minor="10" port="5672">
 
+  <!-- Controls sent between cluster nodes. -->
+
   <class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
     <doc>Qpid extension class to allow clustered brokers to communicate.</doc>
 
-    <control name = "url-notice" code="0x1">
-      <field name="url-notice" type="str16" />
+    <!-- Cluster membership -->
+
+    <control name = "joining" code="0x1">
+      <field name="joining" type="str16" label="URL of new member joining 
cluster."/>
+    </control>
+
+
+    <control name="ready" code="0x2" label="New member is ready."/>
+
+    <control name="members" code="0x3" label="Cluster map sent to new 
members.">
+      <field name="members" type="map"/>     <!-- member-id -> URL -->
+      <field name="donors" type="map"/>             <!-- member-id -> uint32 
(donor-count) -->
+      <field name="newbies" type="map"/>     <!-- member-id -> URL -->
     </control>
+
+    <!-- Transferring broker state -->
+
   </class>
 
+  <!-- TODO aconway 2008-09-10: support for un-attached connections. -->
+  
+  <!-- Controls associated with a specific connection. -->
+
   <class name="cluster-connection" code="0x81" label="Qpid clustering 
extensions.">
+
     <control name="deliver-close" code="0x2">
     </control>
 
     <control name="deliver-do-output" code="0x3">
       <field name="bytes" type="uint32"/>
     </control>
+
+    <!-- Brain-dump controls. Sent to a new broker in joining mode.
+        A connection is dumped as followed:
+        - open as a normal connection.
+        - attach sessions, create consumers, set flow with normal AMQP 
cokmmands.
+        - reset session state by sending session-state for each session.
+         - frames following session-state are replay frames.
+        - send shadow-ready to mark end of dump.
+    -->
+    <control name="session-state" code="0x4" label="Set session state during a 
brain dump.">
+      <!-- Target session deduced from channel number.  -->
+      <field name="replay-id" type="sequence-no"/>
+      <field name="send-id" type="sequence-no"/>
+      <field name="sent-incomplete" type="sequence-set"/>
+
+      <field name="expected-id" type="sequence-no"/>
+      <field name="received-id" type="sequence-no"/>
+      <field name="unknown-completed" type="sequence-set"/>
+      <field name="received-incomplete" type="sequence-set"/>
+    </control>
+
+    <control name="shadow-ready" code="0x5" label="End of shadow connection 
dump.">
+      <field name="cluster-id" type="uint64"/>
+      <field name="user-id" type="vbin16"/>
+    </control>
   </class>
 </amqp>


Reply via email to