Author: aconway
Date: Wed Sep 10 11:15:25 2008
New Revision: 693918
URL: http://svn.apache.org/viewvc?rev=693918&view=rev
Log:
Cluster support for copying shared broker state to new members.
cluster/DumpClient: Copies broker shared state to a new broker via AMQP.
broker/*Registry, Queue, QueueBindings: Added iteration functions for DumpClient
broker/SemanticState.cpp: Allow DumpClient to sidestep setting of
delivery-properties.exchange.
client/Connection.h: Added Connection::open(Url) overload.
client/SessionImpl: Added send(AMQBody, FrameSet) overload for forwarding
broker messages.
tests/cluster_test.cpp: Added test for DumpClient copying shared state between
brokers.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Wed Sep 10 11:15:25 2008
@@ -27,9 +27,11 @@
qpid/cluster/OutputInterceptor.cpp \
qpid/cluster/ProxyInputHandler.h \
qpid/cluster/Event.h \
- qpid/cluster/Event.cpp
+ qpid/cluster/Event.cpp \
+ qpid/cluster/DumpClient.h \
+ qpid/cluster/DumpClient.cpp
-libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
+libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
else
# Empty stub library to satisfy rpm spec file.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h Wed Sep 10 11:15:25 2008
@@ -45,7 +45,11 @@
std::ostream& operator<<(std::ostream& os, const TcpAddress& a);
/** Address is a variant of all address types, more coming in future. */
-typedef boost::variant<TcpAddress> Address;
+struct Address : public boost::variant<TcpAddress> {
+ template <class T> Address(const T& t) : boost::variant<TcpAddress>(t) {}
+ template <class T> T* get() { return boost::get<T>(this); }
+ template <class T> const T* get() const { return boost::get<T>(this); }
+};
/** An AMQP URL contains a list of addresses */
struct Url : public std::vector<Address> {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Wed Sep 10
11:15:25 2008
@@ -22,51 +22,64 @@
*
*/
-#include <map>
-#include <boost/function.hpp>
#include "Exchange.h"
#include "MessageStore.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
#include "qpid/management/Manageable.h"
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+
+#include <algorithm>
+#include <map>
+
namespace qpid {
namespace broker {
- struct UnknownExchangeTypeException{};
- class ExchangeRegistry{
- public:
- typedef boost::function4<Exchange::shared_ptr, const std::string&,
- bool, const qpid::framing::FieldTable&,
qpid::management::Manageable*> FactoryFunction;
-
- ExchangeRegistry () : parent(0) {}
- std::pair<Exchange::shared_ptr, bool> declare(const std::string& name,
const std::string& type)
- throw(UnknownExchangeTypeException);
- std::pair<Exchange::shared_ptr, bool> declare(const std::string& name,
const std::string& type,
- bool durable, const
qpid::framing::FieldTable& args = framing::FieldTable())
- throw(UnknownExchangeTypeException);
- void destroy(const std::string& name);
- Exchange::shared_ptr get(const std::string& name);
- Exchange::shared_ptr getDefault();
-
- /**
- * Register the manageable parent for declared exchanges
- */
- void setParent (management::Manageable* _parent) { parent = _parent; }
-
- void registerType(const std::string& type, FactoryFunction);
- private:
- typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
- typedef std::map<std::string, FactoryFunction > FunctionMap;
-
- ExchangeMap exchanges;
- FunctionMap factory;
- qpid::sys::RWlock lock;
- management::Manageable* parent;
-
- };
-}
-}
+struct UnknownExchangeTypeException{};
+
+class ExchangeRegistry{
+ public:
+ typedef boost::function4<Exchange::shared_ptr, const std::string&,
+ bool, const qpid::framing::FieldTable&,
qpid::management::Manageable*> FactoryFunction;
+
+ ExchangeRegistry () : parent(0) {}
+ std::pair<Exchange::shared_ptr, bool> declare(const std::string& name,
const std::string& type)
+ throw(UnknownExchangeTypeException);
+ std::pair<Exchange::shared_ptr, bool> declare(const std::string& name,
const std::string& type,
+ bool durable, const
qpid::framing::FieldTable& args = framing::FieldTable())
+ throw(UnknownExchangeTypeException);
+ void destroy(const std::string& name);
+ Exchange::shared_ptr get(const std::string& name);
+ Exchange::shared_ptr getDefault();
+
+ /**
+ * Register the manageable parent for declared exchanges
+ */
+ void setParent (management::Manageable* _parent) { parent = _parent; }
+
+ void registerType(const std::string& type, FactoryFunction);
+
+ /** Call f for each exchange in the registry. */
+ template <class F> void eachExchange(const F& f) const {
+ qpid::sys::RWlock::ScopedWlock l(lock);
+ std::for_each(exchanges.begin(), exchanges.end(),
+ boost::bind(f,
boost::bind(&ExchangeMap::value_type::second, _1)));
+ }
+
+ private:
+ typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
+ typedef std::map<std::string, FactoryFunction > FunctionMap;
+
+ ExchangeMap exchanges;
+ FunctionMap factory;
+ mutable qpid::sys::RWlock lock;
+ management::Manageable* parent;
+
+};
+
+}} // namespace qpid::broker
#endif /*!_broker_ExchangeRegistry_h*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Wed Sep 10 11:15:25
2008
@@ -84,6 +84,11 @@
return p->get<T>(true);
}
+ template <class T> const T* hasProperties() const {
+ const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+ return p->get<T>();
+ }
+
template <class T> const T* getMethod() const {
return frames.as<T>();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Sep 10 11:15:25
2008
@@ -34,14 +34,15 @@
#include "qpid/management/Queue.h"
#include "qpid/framing/amqp_types.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/intrusive_ptr.hpp>
+
#include <list>
#include <vector>
#include <memory>
#include <deque>
-
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/intrusive_ptr.hpp>
+#include <algorithm>
namespace qpid {
namespace broker {
@@ -172,6 +173,8 @@
inline const framing::FieldTable& getSettings() const { return
settings; }
inline bool isAutoDelete() const { return autodelete; }
bool canAutoDelete() const;
+ const QueueBindings& getBindings() const { return bindings; }
+
bool enqueue(TransactionContext* ctxt,
boost::intrusive_ptr<Message> msg);
/**
@@ -205,6 +208,17 @@
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
+
+ /** Apply f to each Message on the queue. */
+ template <class F> void eachMessage(const F& f) const {
+ sys::Mutex::ScopedLock l(messageLock);
+ std::for_each(messages.begin(), messages.end(), f);
+ }
+
+ /** Apply f to each QueueBinding on the queue */
+ template <class F> void eachBinding(const F& f) {
+ bindings.eachBinding(f);
+ }
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.cpp Wed Sep 10
11:15:25 2008
@@ -29,7 +29,7 @@
void QueueBindings::add(const string& exchange, const string& key, const
FieldTable& args)
{
- bindings.push_back(new Binding(exchange, key, args));
+ bindings.push_back(QueueBinding(exchange, key, args));
}
void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr
queue)
@@ -37,11 +37,10 @@
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
try {
exchanges.get(i->exchange)->unbind(queue, i->key, &(i->args));
- } catch (const NotFoundException&) {
- }
+ } catch (const NotFoundException&) {}
}
}
-QueueBindings::Binding::Binding(const string& _exchange, const string& _key,
const FieldTable& _args)
+QueueBinding::QueueBinding(const string& _exchange, const string& _key, const
FieldTable& _args)
: exchange(_exchange), key(_key), args(_args)
{}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueBindings.h Wed Sep 10
11:15:25 2008
@@ -24,32 +24,38 @@
#include "qpid/framing/FieldTable.h"
#include <boost/ptr_container/ptr_list.hpp>
#include <boost/shared_ptr.hpp>
+#include <algorithm>
namespace qpid {
namespace broker {
class ExchangeRegistry;
class Queue;
+
+struct QueueBinding{
+ std::string exchange;
+ std::string key;
+ qpid::framing::FieldTable args;
+ QueueBinding(const std::string& exchange, const std::string& key, const
qpid::framing::FieldTable& args);
+};
+
class QueueBindings
{
- struct Binding{
- const std::string exchange;
- const std::string key;
- const qpid::framing::FieldTable args;
- Binding(const std::string& exchange, const std::string& key, const
qpid::framing::FieldTable& args);
- };
-
- typedef boost::ptr_list<Binding> Bindings;
- Bindings bindings;
+ public:
-public:
+ /** Apply f to each QueueBinding. */
+ template <class F> void eachBinding(const F& f) const {
std::for_each(bindings.begin(), bindings.end(), f); }
+
void add(const std::string& exchange, const std::string& key, const
qpid::framing::FieldTable& args);
void unbind(ExchangeRegistry& exchanges, boost::shared_ptr<Queue> queue);
+
+ private:
+ typedef std::vector<QueueBinding> Bindings;
+ Bindings bindings;
};
-}
-}
+}} // namespace qpid::broker
#endif
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Wed Sep 10
11:15:25 2008
@@ -21,10 +21,12 @@
#ifndef _QueueRegistry_
#define _QueueRegistry_
-#include <map>
-#include "qpid/sys/Mutex.h"
#include "Queue.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+#include <map>
namespace qpid {
namespace broker {
@@ -98,11 +100,18 @@
* Register the manageable parent for declared queues
*/
void setParent (management::Manageable* _parent) { parent = _parent; }
+
+ /** Call f for each queue in the registry. */
+ template <class F> void eachQueue(const F& f) const {
+ qpid::sys::RWlock::ScopedWlock l(lock);
+ std::for_each(queues.begin(), queues.end(),
+ boost::bind(f,
boost::bind(&QueueMap::value_type::second, _1)));
+ }
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
- qpid::sys::RWlock lock;
+ mutable qpid::sys::RWlock lock;
int counter;
MessageStore* store;
management::Manageable* parent;
@@ -112,8 +121,7 @@
};
-}
-}
+}} // namespace qpid::broker
#endif
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Sep 10
11:15:25 2008
@@ -353,8 +353,14 @@
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
//TODO: the following should be hidden behind message (using
MessageAdapter or similar)
+
+ // Do not replace the delivery-properties.exchange if it is is already set.
+ // This is used internally (by the cluster) to force the exchange name on
a message.
+ // The client library ensures this is always empty for messages from
normal clients.
if (msg->isA<MessageTransferBody>()) {
- msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+ if (!msg->hasProperties<DeliveryProperties>() ||
+ msg->getProperties<DeliveryProperties>()->getExchange().empty())
+
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
}
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
@@ -392,11 +398,6 @@
{
if(c.isBlocked())
outputTasks.activateOutput();
- // TODO aconway 2008-07-16: we could directly call
- // c.doOutput();
- // since we are in the connections thread but for consistency
- // activateOutput() will set it up to be called in the next write idle.
- // Current cluster code depends on this, review cluster code to change.
}
void SemanticState::complete(DeliveryRecord& delivery)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed Sep 10
11:15:25 2008
@@ -117,7 +117,6 @@
SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
bool ignoring;
- std::string name;
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Wed Sep 10
11:15:25 2008
@@ -23,6 +23,7 @@
#include "Message.h"
#include "SessionImpl.h"
#include "SessionBase_0_10Access.h"
+#include "qpid/Url.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
@@ -48,6 +49,37 @@
Connection::~Connection(){ }
void Connection::open(
+ const Url& url,
+ const std::string& uid, const std::string& pwd,
+ const std::string& vhost,
+ uint16_t maxFrameSize)
+{
+ if (url.empty())
+ throw Exception(QPID_MSG("Attempt to open URL with no addresses."));
+ Url::const_iterator i = url.begin();
+ do {
+ const TcpAddress* tcp = i->get<TcpAddress>();
+ i++;
+ if (tcp) {
+ try {
+ ConnectionSettings settings;
+ settings.host = tcp->host;
+ settings.port = tcp->port;
+ settings.username = uid;
+ settings.password = pwd;
+ settings.virtualhost = vhost;
+ settings.maxFrameSize = maxFrameSize;
+ open(settings);
+ break;
+ }
+ catch (const Exception& e) {
+ if (i == url.end()) throw;
+ }
+ }
+ } while (i != url.end());
+}
+
+void Connection::open(
const std::string& host, int port,
const std::string& uid, const std::string& pwd,
const std::string& vhost,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Wed Sep 10
11:15:25 2008
@@ -1,5 +1,5 @@
-#ifndef _client_Connection_
-#define _client_Connection_
+#ifndef QPID_CLIENT_CONNECTION_H
+#define QPID_CLIENT_CONNECTION_H
/*
*
@@ -26,6 +26,9 @@
#include "qpid/client/Session.h"
namespace qpid {
+
+class Url;
+
namespace client {
class ConnectionSettings;
@@ -77,6 +80,28 @@
const std::string& virtualhost = "/", uint16_t
maxFrameSize=65535);
/**
+ * Opens a connection to a broker using a URL.
+ * If the URL contains multiple addresses, try each in turn
+ * till connection is successful.
+ *
+ * @url address of the broker to connect to.
+ *
+ * @param uid the userid to connect with.
+ *
+ * @param pwd the password to connect with (currently SASL
+ * PLAIN is the only authentication method supported so this
+ * is sent in clear text).
+ *
+ * @param virtualhost the AMQP virtual host to use (virtual
+ * hosts, where implemented(!), provide namespace partitioning
+ * within a single broker).
+ */
+ void open(const Url& url,
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
+ const std::string& virtualhost = "/", uint16_t
maxFrameSize=65535);
+
+ /**
* Opens a connection to a broker.
*
* @param the settings to use (host, port etc) @see ConnectionSettings
@@ -146,4 +171,4 @@
}} // namespace qpid::client
-#endif
+#endif /*!QPID_CLIENT_CONNECTION_H*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp Wed Sep 10
11:15:25 2008
@@ -24,9 +24,7 @@
namespace qpid {
namespace client {
-Message::Message(const std::string& data_,
- const std::string& routingKey,
- const std::string& exchange) : TransferContent(data_,
routingKey, exchange) {}
+Message::Message(const std::string& data, const std::string& routingKey) :
TransferContent(data, routingKey) {}
std::string Message::getDestination() const
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Wed Sep 10 11:15:25
2008
@@ -40,11 +40,9 @@
/** Create a Message.
[EMAIL PROTECTED] data Data for the message body.
[EMAIL PROTECTED] routingKey Passed to the exchange that routes the
message.
- [EMAIL PROTECTED] exchange Name of the exchange that should route the
message.
*/
Message(const std::string& data=std::string(),
- const std::string& routingKey=std::string(),
- const std::string& exchange=std::string());
+ const std::string& routingKey=std::string());
/** The destination of messages sent to the broker is the exchange
* name. The destination of messages received from the broker is
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Wed Sep 10
11:15:25 2008
@@ -28,9 +28,11 @@
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MethodContent.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/DeliveryProperties.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -272,6 +274,41 @@
return sendCommand(command, &content);
}
+namespace {
+// Functor for FrameSet::map to send header + content frames but, not method
frames.
+struct SendContentFn {
+ FrameHandler& handler;
+ void operator()(const AMQFrame& f) {
+ if (!f.getMethod())
+ handler(const_cast<AMQFrame&>(f));
+ }
+ SendContentFn(FrameHandler& h) : handler(h) {}
+};
+}
+
+Future SessionImpl::send(const AMQBody& command, const FrameSet& content) {
+ Acquire a(sendLock);
+ SequenceNumber id = nextOut++;
+ {
+ Lock l(state);
+ checkOpen();
+ incompleteOut.add(id);
+ }
+ Future f(id);
+ if (command.getMethod()->resultExpected()) {
+ Lock l(state);
+ //result listener must be set before the command is sent
+ f.setFutureResult(results.listenForResult(id));
+ }
+ AMQFrame frame(command);
+ frame.setEof(false);
+ handleOut(frame);
+
+ SendContentFn send(out);
+ content.map(send);
+ return f;
+}
+
Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent*
content)
{
Acquire a(sendLock);
@@ -297,9 +334,16 @@
}
return f;
}
+
void SessionImpl::sendContent(const MethodContent& content)
{
AMQFrame header(content.getHeader());
+
+ // Client is not allowed to set the delivery-properties.exchange.
+ AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
+ if (headerp && headerp->get<DeliveryProperties>())
+ headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
+
header.setFirstSegment(false);
uint64_t data_length = content.getData().length();
if(data_length > 0){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Wed Sep 10
11:15:25 2008
@@ -81,6 +81,7 @@
Future send(const framing::AMQBody& command);
Future send(const framing::AMQBody& command, const framing::MethodContent&
content);
+ Future send(const framing::AMQBody& command, const framing::FrameSet&
content);
Demux& getDemux();
void markCompleted(const framing::SequenceNumber& id, bool cumulative,
bool notifyPeer);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Sep 10
11:15:25 2008
@@ -25,7 +25,7 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterUrlNoticeBody.h"
+#include "qpid/framing/ClusterJoiningBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -50,7 +50,13 @@
Cluster& cluster;
MemberId member;
ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id)
{}
- void urlNotice(const std::string& u) { cluster.urlNotice (member, u); }
+ void joining(const std::string& u) { cluster.joining (member, u); }
+ void ready() { cluster.ready(member); }
+
+ void members(const framing::FieldTable& , const framing::FieldTable& ,
const framing::FieldTable& ) {
+ assert(0); // Not passed to cluster, used to start a brain dump over
TCP.
+ }
+
bool invoke(AMQFrame& f) { return framing::invoke(*this,
*f.getBody()).wasHandled(); }
};
@@ -237,7 +243,7 @@
if (nJoined) // Notfiy new members of my URL.
mcastFrame(
- AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(),
url.str())),
+ AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(),
url.str())),
ConnectionId(self,0));
@@ -261,11 +267,15 @@
broker->shutdown();
}
-void Cluster::urlNotice(const MemberId& m, const string& url) {
+void Cluster::joining(const MemberId& m, const string& url) {
QPID_LOG(notice, "Cluster member " << m << " has URL " << url);
urls.insert(UrlMap::value_type(m,Url(url)));
}
+void Cluster::ready(const MemberId& ) {
+ // FIXME aconway 2008-09-08: TODO
+}
+
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Sep 10
11:15:25 2008
@@ -75,7 +75,8 @@
/** Leave the cluster */
void leave();
- void urlNotice(const MemberId&, const std::string& url);
+ void joining(const MemberId&, const std::string& url);
+ void ready(const MemberId&);
broker::Broker& getBroker() { assert(broker); return *broker; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Sep 10
11:15:25 2008
@@ -26,6 +26,7 @@
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
+#include "qpid/log/Statement.h"
#include <boost/utility/in_place_factory.hpp>
@@ -75,7 +76,7 @@
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (!broker || values.name.empty()) return; // Only if --cluster-name
option was specified.
- if (cluster) throw Exception("Cluster plugin cannot be initialized
twice in one process.");
+ QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of
cluster plugin.");
cluster = new Cluster(values.name, values.getUrl(broker->getPort()),
*broker);
broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
broker->setConnectionFactory(
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Sep 10
11:15:25 2008
@@ -89,6 +89,18 @@
// ConnectionInputHandlerFactory
sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out,
const std::string& id, bool isClient);
+ // State dump methods.
+ virtual void sessionState(const framing::SequenceNumber& /*replayId*/,
+ const framing::SequenceNumber& /*sendId*/,
+ const framing::SequenceSet& /*sentIncomplete*/,
+ const framing::SequenceNumber& /*expectedId*/,
+ const framing::SequenceNumber& /*receivedId*/,
+ const framing::SequenceSet& /*unknownCompleted*/,
+ const framing::SequenceSet&
/*receivedIncomplete*/) {}
+
+ virtual void shadowReady(uint64_t /*clusterId*/,
+ const std::string& /*userId*/) {}
+
private:
void sendDoOutput();
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=693918&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Wed Sep 10
11:15:25 2008
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DumpClient.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/enum.h"
+#include "qpid/Url.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace cluster {
+
+using broker::Broker;
+using broker::Exchange;
+using broker::Queue;
+using broker::QueueBinding;
+using broker::Message;
+using namespace framing::message;
+
+using namespace client;
+
+DumpClient::DumpClient(const Url& url) {
+ connection.open(url);
+ session = connection.newSession();
+}
+
+DumpClient::~DumpClient() {
+ session.close();
+ connection.close();
+}
+
+// Catch-up exchange name: an illegal AMQP exchange name to avoid clashes.
+static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
+static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
+
+void DumpClient::dump(Broker& donor) {
+ // TODO aconway 2008-09-08: Caller must handle exceptions
+ // FIXME aconway 2008-09-08: send cluster map frame first.
+ donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange,
this, _1));
+ // Catch-up exchange is used to route messages to the proper queue without
modifying routing key.
+ session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout",
arg::autoDelete=true);
+ donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
+ session.sync();
+}
+
+void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
+ session.exchangeDeclare(
+ ex->getName(), ex->getType(),
+ ex->getAlternate() ? ex->getAlternate()->getName() : std::string(),
+ arg::passive=false,
+ arg::durable=ex->isDurable(),
+ arg::autoDelete=false,
+ arg::arguments=ex->getArgs());
+}
+
+void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
+ session.queueDeclare(
+ q->getName(),
+ q->getAlternateExchange() ? q->getAlternateExchange()->getName() :
std::string(),
+ arg::passive=false,
+ arg::durable=q->isDurable(),
+ arg::exclusive=q->hasExclusiveConsumer(),
+ arg::autoDelete=q->isAutoDelete(),
+ arg::arguments=q->getSettings());
+
+ session.exchangeBind(q->getName(), CATCH_UP, std::string());
+ q->eachMessage(boost::bind(&DumpClient::dumpMessage, this, _1));
+ session.exchangeUnbind(q->getName(), CATCH_UP, std::string());
+ q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(),
_1));
+}
+
+void DumpClient::dumpMessage(const broker::QueuedMessage& message) {
+ SessionBase_0_10Access sb(session);
+ framing::MessageTransferBody transfer(framing::ProtocolVersion(),
CATCH_UP, ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED);
+ sb.get()->send(transfer, message.payload->getFrames());
+}
+
+void DumpClient::dumpBinding(const std::string& queue, const QueueBinding&
binding) {
+ session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
+}
+
+
+}} // namespace qpid::cluster
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=693918&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Wed Sep 10
11:15:25 2008
@@ -0,0 +1,74 @@
+#ifndef QPID_CLUSTER_DUMPCLIENT_H
+#define QPID_CLUSTER_DUMPCLIENT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include <boost/shared_ptr.hpp>
+
+
+namespace qpid {
+
+class Url;
+
+namespace broker {
+
+class Broker;
+class Queue;
+class Exchange;
+class QueueBindings;
+class QueueBinding;
+class QueuedMessage;
+} // namespace broker
+
+namespace cluster {
+
+/**
+ * A client that dumps the contents of a local broker to a remote one using
AMQP.
+ */
+class DumpClient {
+ public:
+ DumpClient(const Url& receiver);
+ ~DumpClient();
+
+ void dump(broker::Broker& donor);
+
+ private:
+ void dumpQueue(const boost::shared_ptr<broker::Queue>&);
+ void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
+ void dumpMessage(const broker::QueuedMessage&);
+ void dumpBinding(const std::string& queue, const broker::QueueBinding&
binding);
+
+ private:
+ client::Connection connection;
+ client::AsyncSession session;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_DUMPCLIENT_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp Wed Sep
10 11:15:25 2008
@@ -24,15 +24,12 @@
namespace qpid {
namespace framing {
-TransferContent::TransferContent(const std::string& data,
- const std::string& routingKey,
- const std::string& exchange)
-{
+TransferContent::TransferContent(const std::string& data, const std::string&
key) {
setData(data);
- if (routingKey.size()) getDeliveryProperties().setRoutingKey(routingKey);
- if (exchange.size()) getDeliveryProperties().setExchange(exchange);
+ if (!key.empty()) getDeliveryProperties().setRoutingKey(key);
}
+
AMQHeaderBody TransferContent::getHeader() const
{
return header;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h Wed Sep 10
11:15:25 2008
@@ -36,9 +36,7 @@
AMQHeaderBody header;
std::string data;
public:
- TransferContent(const std::string& data = std::string(),
- const std::string& routingKey = std::string(),
- const std::string& exchange = std::string());
+ TransferContent(const std::string& data = std::string(), const
std::string& key=std::string());
///@internal
AMQHeaderBody getHeader() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Sep 10
11:15:25 2008
@@ -23,6 +23,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/DumpClient.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
@@ -51,6 +52,7 @@
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::client;
+using qpid::sys::TIME_SEC;
using qpid::broker::Broker;
using boost::ptr_vector;
using qpid::cluster::Cluster;
@@ -133,6 +135,86 @@
return o;
}
+QPID_AUTO_TEST_CASE(testDumpClient) {
+ BrokerFixture donor, receiver;
+ {
+ Client c(donor.getPort());
+ FieldTable args;
+ args.setString("x", "y");
+ c.session.queueDeclare("qa", arg::arguments=args);
+ c.session.queueDeclare("qb", arg::alternateExchange="amq.direct");
+
+ c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct",
arg::arguments=args);
+ c.session.exchangeBind(arg::exchange="exd", arg::queue="qa",
arg::bindingKey="foo");
+ c.session.messageTransfer(arg::destination="exd",
arg::content=TransferContent("one", "foo"));
+
+ c.session.exchangeDeclare("ext", arg::type="topic");
+ c.session.exchangeBind(arg::exchange="ext", arg::queue="qb",
arg::bindingKey="bar");
+ c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0));
+ c.session.messageTransfer(arg::destination="ext",
arg::content=TransferContent("one", "bar"));
+ c.session.messageTransfer(arg::destination="ext",
arg::content=TransferContent("two", "bar"));
+
+ c.session.close();
+ c.connection.close();
+ }
+ qpid::cluster::DumpClient dump(Url::getIpAddressesUrl(receiver.getPort()));
+ dump.dump(*donor.broker);
+ {
+ Client r(receiver.getPort());
+ // Verify exchanges
+ ExchangeQueryResult ex=r.session.exchangeQuery("exd");
+ BOOST_CHECK_EQUAL(ex.getType(), "direct");
+ BOOST_CHECK_EQUAL(ex.getDurable(), false);
+ BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+ BOOST_CHECK_EQUAL(ex.getArguments().getString("x"), "y");
+
+ ex = r.session.exchangeQuery("ext");
+ BOOST_CHECK_EQUAL(ex.getType(), "topic");
+ BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+
+ // Verify queues
+ QueueQueryResult qq = r.session.queueQuery("qa");
+ BOOST_CHECK_EQUAL(qq.getQueue(), "qa");
+ BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "");
+ BOOST_CHECK_EQUAL(qq.getArguments().getString("x"), "y");
+ BOOST_CHECK_EQUAL(qq.getMessageCount(), 1);
+
+ qq = r.session.queueQuery("qb");
+ BOOST_CHECK_EQUAL(qq.getQueue(), "qb");
+ BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "amq.direct");
+ BOOST_CHECK_EQUAL(qq.getMessageCount(), 2);
+
+ // Verify messages
+ Message m;
+ BOOST_CHECK(r.subs.get(m, "qa", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "one");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "exd");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "foo");
+
+ BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "one");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+ BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "two");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+ // Verify bindings
+ r.session.messageTransfer(arg::destination="exd",
arg::content=TransferContent("xxx", "foo"));
+ BOOST_CHECK(r.subs.get(m, "qa"));
+ BOOST_CHECK_EQUAL(m.getData(), "xxx");
+
+ r.session.messageTransfer(arg::destination="ext",
arg::content=TransferContent("yyy", "bar"));
+ BOOST_CHECK(r.subs.get(m, "qb"));
+ BOOST_CHECK_EQUAL(m.getData(), "yyy");
+
+ r.session.close();
+ r.connection.close();
+ }
+}
+
QPID_AUTO_TEST_CASE(testForkedBroker) {
// Verify the ForkedBroker works as expected.
const char* argv[] = { "", "--auth=no", "--no-data-dir",
"--log-prefix=testForkedBroker" };
@@ -146,8 +228,7 @@
ClusterFixture cluster(1);
Client c(cluster[0]);
BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
- BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty());
- // FIXME aconway 2008-09-01: leaks if aisexec not running, investigate.
+ BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound());
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
@@ -234,5 +315,4 @@
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
-
QPID_AUTO_TEST_SUITE_END()
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=693918&r1=693917&r2=693918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Wed Sep 10 11:15:25 2008
@@ -22,20 +22,66 @@
<amqp major="0" minor="10" port="5672">
+ <!-- Controls sent between cluster nodes. -->
+
<class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
<doc>Qpid extension class to allow clustered brokers to communicate.</doc>
- <control name = "url-notice" code="0x1">
- <field name="url-notice" type="str16" />
+ <!-- Cluster membership -->
+
+ <control name = "joining" code="0x1">
+ <field name="joining" type="str16" label="URL of new member joining
cluster."/>
+ </control>
+
+
+ <control name="ready" code="0x2" label="New member is ready."/>
+
+ <control name="members" code="0x3" label="Cluster map sent to new
members.">
+ <field name="members" type="map"/> <!-- member-id -> URL -->
+ <field name="donors" type="map"/> <!-- member-id -> uint32
(donor-count) -->
+ <field name="newbies" type="map"/> <!-- member-id -> URL -->
</control>
+
+ <!-- Transferring broker state -->
+
</class>
+ <!-- TODO aconway 2008-09-10: support for un-attached connections. -->
+
+ <!-- Controls associated with a specific connection. -->
+
<class name="cluster-connection" code="0x81" label="Qpid clustering
extensions.">
+
<control name="deliver-close" code="0x2">
</control>
<control name="deliver-do-output" code="0x3">
<field name="bytes" type="uint32"/>
</control>
+
+ <!-- Brain-dump controls. Sent to a new broker in joining mode.
+ A connection is dumped as followed:
+ - open as a normal connection.
+ - attach sessions, create consumers, set flow with normal AMQP
cokmmands.
+ - reset session state by sending session-state for each session.
+ - frames following session-state are replay frames.
+ - send shadow-ready to mark end of dump.
+ -->
+ <control name="session-state" code="0x4" label="Set session state during a
brain dump.">
+ <!-- Target session deduced from channel number. -->
+ <field name="replay-id" type="sequence-no"/>
+ <field name="send-id" type="sequence-no"/>
+ <field name="sent-incomplete" type="sequence-set"/>
+
+ <field name="expected-id" type="sequence-no"/>
+ <field name="received-id" type="sequence-no"/>
+ <field name="unknown-completed" type="sequence-set"/>
+ <field name="received-incomplete" type="sequence-set"/>
+ </control>
+
+ <control name="shadow-ready" code="0x5" label="End of shadow connection
dump.">
+ <field name="cluster-id" type="uint64"/>
+ <field name="user-id" type="vbin16"/>
+ </control>
</class>
</amqp>