Author: aconway
Date: Thu Sep 11 06:26:10 2008
New Revision: 694243
URL: http://svn.apache.org/viewvc?rev=694243&view=rev
Log:
Moved PollableCondition, PollableQueue and to sys. Fixed cluster shutdown
issues.
sys/PollableCondition: is a generic mechansim to poll for non-IO
events in the Poller.
sys/PollableQueue: is a thread-safe queue template that can be
dispatched from the Poller when there are items on the queue. It uses
PollableCondition.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.cpp
- copied, changed from r693865,
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h
- copied, changed from r693865,
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
- copied, changed from r693865,
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Sep 11 06:26:10 2008
@@ -264,6 +264,9 @@
qpid/sys/AggregateOutput.cpp \
qpid/sys/AsynchIOHandler.cpp \
qpid/sys/Dispatcher.cpp \
+ qpid/sys/PollableCondition.h \
+ qpid/sys/PollableCondition.cpp \
+ qpid/sys/PollableQueue.h \
qpid/sys/Runnable.cpp \
qpid/sys/SystemInfo.cpp \
qpid/sys/Shlib.cpp \
Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Thu Sep 11 06:26:10 2008
@@ -18,9 +18,6 @@
qpid/cluster/Connection.h \
qpid/cluster/Connection.cpp \
qpid/cluster/NoOpConnectionOutputHandler.h \
- qpid/cluster/PollableCondition.h \
- qpid/cluster/PollableCondition.cpp \
- qpid/cluster/PollableQueue.h \
qpid/cluster/WriteEstimate.h \
qpid/cluster/WriteEstimate.cpp \
qpid/cluster/OutputInterceptor.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Thu Sep 11
06:26:10 2008
@@ -70,8 +70,6 @@
}
SessionState::~SessionState() {
- // Remove ID from active session list.
- broker.getSessionManager().forget(getId());
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Thu Sep 11
06:26:10 2008
@@ -649,7 +649,7 @@
{
check();
if (state != ATTACHED) {
- throw NotAttachedException("Session isn't attached");
+ throw NotAttachedException(QPID_MSG("Session " << getId() << " isn't
attached"));
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Sep 11
06:26:10 2008
@@ -61,7 +61,7 @@
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b)
:
- broker(&b),
+ broker(b),
poller(b.getPoller()),
cpg(*this),
name(name_),
@@ -74,15 +74,17 @@
),
deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this,
_1)))
{
- broker->addFinalizer(boost::bind(&Cluster::leave, this));
- QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
+ QPID_LOG(notice, "Cluster member " << self << " joining cluster " <<
name.str());
+ broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
cpg.join(name);
deliverQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
}
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+ QPID_LOG(debug, "~Cluster()");
+}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Mutex::ScopedLock l(lock);
@@ -94,20 +96,13 @@
connections.erase(id);
}
+// FIXME aconway 2008-09-10: leave is currently not called,
+// It should be called if we are shut down by a cluster admin command.
+// Any other type of exit is caught in disconnect().
+//
void Cluster::leave() {
- Mutex::ScopedLock l(lock);
- if (!broker) return; // Already left.
- // Leave is called by from Broker destructor after the poller has
- // been shut down. No dispatches can occur.
-
- QPID_LOG(notice, "Leaving cluster " << name.str());
+ QPID_LOG(notice, "Cluster member " << self << " leaving cluster " <<
name.str());
cpg.leave(name);
- // broker= is set to 0 when the final config-change is delivered.
- while(broker) {
- Mutex::ScopedUnlock u(lock);
- cpg.dispatchAll();
- }
- cpg.shutdown();
}
template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -177,6 +172,7 @@
{
try {
MemberId from(nodeid, pid);
+ QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self);
// FIXME aconway 2008-09-10:
deliverQueue.push(Event::delivered(from, msg, msg_len));
}
catch (const std::exception& e) {
@@ -238,7 +234,7 @@
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined)
{
- QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current,
nCurrent) << ".\n Changes: "
+ QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current,
nCurrent) << ".\n Changes: "
<< AddrList(joined, nJoined) << AddrList(left, nLeft));
if (nJoined) // Notfiy new members of my URL.
@@ -246,13 +242,14 @@
AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(),
url.str())),
ConnectionId(self,0));
-
+ if (find(left, left+nLeft, self) != left+nLeft) {
+ // We have left the group, this is the final config change.
+ QPID_LOG(notice, "Cluster member " << self << " left cluster " <<
name.str());
+ broker.shutdown();
+ }
Mutex::ScopedLock l(lock);
for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
// Add new members when their URL notice arraives.
-
- if (find(left, left+nLeft, self) != left+nLeft)
- broker = 0; // We have left the group, this is the final config
change.
lock.notifyAll(); // Threads waiting for membership changes.
}
@@ -261,22 +258,35 @@
h.rewatch();
}
-void Cluster::disconnect(sys::DispatchHandle& h) {
- h.stopWatch();
- QPID_LOG(critical, "Disconnected from cluster, shutting down");
- broker->shutdown();
+void Cluster::disconnect(sys::DispatchHandle& ) {
+ // FIXME aconway 2008-09-11: this should be logged as critical,
+ // when we provide admin option to shut down cluster and let
+ // members leave cleanly.
+ QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster
" << name.str());
+ broker.shutdown();
}
void Cluster::joining(const MemberId& m, const string& url) {
- QPID_LOG(notice, "Cluster member " << m << " has URL " << url);
+ QPID_LOG(info, "Cluster member " << m << " has URL " << url);
urls.insert(UrlMap::value_type(m,Url(url)));
}
void Cluster::ready(const MemberId& ) {
// FIXME aconway 2008-09-08: TODO
}
-
-}} // namespace qpid::cluster
+// Called from Broker::~Broker when broker is shut down. At this
+// point we know the poller has stopped so no poller callbacks will be
+// invoked. We must ensure that CPG has also shut down so no CPG
+// callbacks will be invoked.
+//
+void Cluster::shutdown() {
+ QPID_LOG(notice, "Cluster member " << self << " shutting down.");
+ try { cpg.shutdown(); }
+ catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: "
<< e.what()); }
+ delete this;
+}
+broker::Broker& Cluster::getBroker(){ return broker; }
+}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Sep 11
06:26:10 2008
@@ -21,7 +21,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/Event.h"
-#include "qpid/cluster/PollableQueue.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/cluster/NoOpConnectionOutputHandler.h"
#include "qpid/broker/Broker.h"
@@ -43,7 +43,7 @@
* Connection to the cluster.
* Keeps cluster membership data.
*/
-class Cluster : public RefCounted, private Cpg::Handler
+class Cluster : private Cpg::Handler
{
public:
@@ -78,17 +78,16 @@
void joining(const MemberId&, const std::string& url);
void ready(const MemberId&);
- broker::Broker& getBroker() { assert(broker); return *broker; }
-
MemberId getSelf() const { return self; }
+ void shutdown();
+
+ broker::Broker& getBroker();
+
private:
typedef std::map<MemberId, Url> UrlMap;
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> >
ConnectionMap;
-
- /** Message sent over the cluster. */
- typedef std::pair<framing::AMQFrame, ConnectionId> Message;
- typedef PollableQueue<Event> EventQueue;
+ typedef sys::PollableQueue<Event> EventQueue;
boost::function<void()> shutdownNext;
@@ -127,7 +126,7 @@
boost::intrusive_ptr<cluster::Connection> getConnection(const
ConnectionId&);
mutable sys::Monitor lock; // Protect access to members.
- broker::Broker* broker;
+ broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
Cpg::Name name;
@@ -137,7 +136,7 @@
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- PollableQueue<Event> deliverQueue;
+ EventQueue deliverQueue;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Sep 11
06:26:10 2008
@@ -66,10 +66,10 @@
ClusterValues values;
ClusterOptions options;
- boost::intrusive_ptr<Cluster> cluster;
+ Cluster* cluster;
boost::scoped_ptr<ConnectionCodec::Factory> factory;
- ClusterPlugin() : options(values) {}
+ ClusterPlugin() : options(values), cluster(0) {}
Options* getOptions() { return &options; }
@@ -78,20 +78,17 @@
if (!broker || values.name.empty()) return; // Only if --cluster-name
option was specified.
QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of
cluster plugin.");
cluster = new Cluster(values.name, values.getUrl(broker->getPort()),
*broker);
- broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(),
*cluster)));
}
void earlyInitialize(Plugin::Target&) {}
-
- void shutdown() { cluster = 0; }
};
static ClusterPlugin instance; // Static initialization.
// For test purposes.
-boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; }
+Cluster& getGlobalCluster() { assert(instance.cluster); return
*instance.cluster; }
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Sep 11
06:26:10 2008
@@ -106,5 +106,21 @@
deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for
delivery in separate thread.
}
+
+void Connection::sessionState(const SequenceNumber& /*replayStart*/,
+ const SequenceSet& /*sentIncomplete*/,
+ const SequenceNumber& /*expected*/,
+ const SequenceNumber& /*received*/,
+ const SequenceSet& /*unknownCompleted*/,
+ const SequenceSet& /*receivedIncomplete*/)
+{
+ // FIXME aconway 2008-09-10: TODO
+}
+
+void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/)
+{
+ // FIXME aconway 2008-09-10: TODO
+}
+
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Sep 11
06:26:10 2008
@@ -40,9 +40,7 @@
namespace cluster {
-/**
- * Plug-in associated with broker::Connections, both local and shadow.
- */
+/** Intercept broker::Connection calls for shadow and local cluster
connections. */
class Connection :
public RefCounted,
public sys::ConnectionInputHandler,
@@ -90,16 +88,13 @@
sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out,
const std::string& id, bool isClient);
// State dump methods.
- virtual void sessionState(const framing::SequenceNumber& /*replayId*/,
- const framing::SequenceNumber& /*sendId*/,
- const framing::SequenceSet& /*sentIncomplete*/,
- const framing::SequenceNumber& /*expectedId*/,
- const framing::SequenceNumber& /*receivedId*/,
- const framing::SequenceSet& /*unknownCompleted*/,
- const framing::SequenceSet&
/*receivedIncomplete*/) {}
+ virtual void sessionState(const SequenceNumber& replayStart,
+ const SequenceSet& sentIncomplete,
+ const SequenceNumber& expected,
+ const SequenceNumber& received,
+ const SequenceSet& unknownCompleted, const
SequenceSet& receivedIncomplete);
- virtual void shadowReady(uint64_t /*clusterId*/,
- const std::string& /*userId*/) {}
+ virtual void shadowReady(uint64_t memberId, uint64_t connectionId);
private:
void sendDoOutput();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Thu Sep
11 06:26:10 2008
@@ -30,16 +30,16 @@
sys::ConnectionCodec*
ConnectionCodec::Factory::create(framing::ProtocolVersion v,
sys::OutputControl& out, const std::string& id) {
- if (v == framing::ProtocolVersion(0, 10))
+ if (v == framing::ProtocolVersion(0, 10))
return new ConnectionCodec(out, id, cluster);
return 0;
}
+// FIXME aconway 2008-08-27: outbound connections need to be made
+// with proper qpid::client code for failover, get rid of this
+// broker-side hack.
sys::ConnectionCodec*
ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string&
id) {
- // FIXME aconway 2008-08-27: outbound connections need to be made
- // with proper qpid::client code for failover, get rid of this
- // broker-side hack.
return next->create(out, id);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Thu Sep 11
06:26:10 2008
@@ -50,7 +50,8 @@
struct Factory : public sys::ConnectionCodec::Factory {
boost::shared_ptr<sys::ConnectionCodec::Factory> next;
Cluster& cluster;
- Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster&
c) : next(f), cluster(c) {}
+ Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c)
+ : next(f), cluster(c) {}
sys::ConnectionCodec* create(framing::ProtocolVersion,
sys::OutputControl&, const std::string& id);
sys::ConnectionCodec* create(sys::OutputControl&, const std::string&
id);
};
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.cpp (from
r693865, incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp)
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp&r1=693865&r2=694243&rev=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.cpp Thu Sep 11
06:26:10 2008
@@ -27,14 +27,14 @@
//
#include "qpid/sys/posix/PrivatePosix.h"
-#include "qpid/cluster/PollableCondition.h"
+#include "qpid/sys/PollableCondition.h"
#include "qpid/Exception.h"
#include <unistd.h>
#include <fcntl.h>
namespace qpid {
-namespace cluster {
+namespace sys {
PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
int fds[2];
@@ -67,13 +67,13 @@
#if 0
// FIXME aconway 2008-08-12: More efficient Linux implementation using
-// eventfd system call. Do a configure.ac test to enable this when
-// eventfd is available.
+// eventfd system call. Move to separate file & do configure.ac test
+// to enable this when ::eventfd() is available.
#include <sys/eventfd.h>
namespace qpid {
-namespace cluster {
+namespace sys {
PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
impl->fd = ::eventfd(0, 0);
@@ -95,6 +95,6 @@
#endif
-}} // namespace qpid::cluster
+}} // namespace qpid::sys
#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h (from
r693865, incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h)
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h&r1=693865&r2=694243&rev=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h Thu Sep 11
06:26:10 2008
@@ -29,7 +29,7 @@
//
namespace qpid {
-namespace cluster {
+namespace sys {
/**
* A pollable condition to integrate in-process conditions with IO
@@ -55,6 +55,6 @@
private:
int writeFd;
};
-}} // namespace qpid::cluster
+}} // namespace qpid::sys
#endif /*!QPID_SYS_POLLABLECONDITION_H*/
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (from
r693865, incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h)
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h&r1=693865&r2=694243&rev=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Thu Sep 11
06:26:10 2008
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_POLLABLEQUEUE_H
-#define QPID_CLUSTER_POLLABLEQUEUE_H
+#ifndef QPID_SYS_POLLABLEQUEUE_H
+#define QPID_SYS_POLLABLEQUEUE_H
/*
*
@@ -22,7 +22,7 @@
*
*/
-#include "qpid/cluster/PollableCondition.h"
+#include "qpid/sys/PollableCondition.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Mutex.h"
#include <boost/function.hpp>
@@ -34,7 +34,7 @@
namespace sys { class Poller; }
-namespace cluster {
+namespace sys {
// FIXME aconway 2008-08-11: this could be of more general interest,
// move to common lib.
@@ -108,6 +108,6 @@
batch.clear();
}
-}} // namespace qpid::cluster
+}} // namespace qpid::sys
-#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/
+#endif /*!QPID_SYS_POLLABLEQUEUE_H*/
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Thu Sep 11 06:26:10
2008
@@ -92,7 +92,8 @@
SessionType session;
qpid::client::SubscriptionManager subs;
qpid::client::LocalQueue lq;
- ClientT(uint16_t port) : connection(port),
session(connection.newSession()), subs(session) {}
+ ClientT(uint16_t port, const std::string& name=std::string())
+ : connection(port), session(connection.newSession(name)),
subs(session) {}
~ClientT() { connection.close(); }
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Sep 11
06:26:10 2008
@@ -21,13 +21,14 @@
#include "ForkedBroker.h"
#include "BrokerFixture.h"
-#include "qpid/cluster/Cpg.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/DumpClient.h"
#include "qpid/framing/AMQBody.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Session.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
@@ -41,7 +42,7 @@
namespace qpid {
namespace cluster {
-boost::intrusive_ptr<Cluster> getGlobalCluster(); // Defined in
qpid/cluster/ClusterPlugin.cpp
+Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
}} // namespace qpid::cluster
@@ -81,11 +82,11 @@
add(n);
// Wait for all n members to join the cluster
int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean
this up.
- while (retry && getGlobalCluster()->size() != n) {
+ while (retry && getGlobalCluster().size() != n) {
::sleep(1);
--retry;
}
- BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size());
+ BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size());
}
void ClusterFixture::add() {
@@ -135,7 +136,37 @@
return o;
}
-QPID_AUTO_TEST_CASE(testDumpClient) {
+#if 0 // FIXME aconway 2008-09-10: finish & enable
+QPID_AUTO_TEST_CASE(testDumpConsumers) {
+ ClusterFixture cluster(1);
+ Client a(cluster[0]);
+ a.session.queueDeclare("q");
+ a.subs.subscribe(a.lq, "q");
+
+ cluster.add();
+ Client b(cluster[1]);
+ try {
+ b.connection.newSession(a.session.getId().getName());
+ BOOST_FAIL("Expected SessionBusyException for " <<
a.session.getId().getName());
+ } catch (const SessionBusyException&) {}
+
+ // Transfer some messages to the subscription by client a.
+ Message m;
+ a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa",
"q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "aaa");
+
+ b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb",
"q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bbb");
+
+ // Verify that the queue has been drained on both brokers.
+ // This proves that the consumer was replicated when the second broker
joined.
+ BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
+}
+#endif
+
+QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
BrokerFixture donor, receiver;
{
Client c(donor.getPort());
@@ -146,13 +177,13 @@
c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct",
arg::arguments=args);
c.session.exchangeBind(arg::exchange="exd", arg::queue="qa",
arg::bindingKey="foo");
- c.session.messageTransfer(arg::destination="exd",
arg::content=TransferContent("one", "foo"));
+ c.session.messageTransfer(arg::destination="exd",
arg::content=Message("one", "foo"));
c.session.exchangeDeclare("ext", arg::type="topic");
c.session.exchangeBind(arg::exchange="ext", arg::queue="qb",
arg::bindingKey="bar");
c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0));
- c.session.messageTransfer(arg::destination="ext",
arg::content=TransferContent("one", "bar"));
- c.session.messageTransfer(arg::destination="ext",
arg::content=TransferContent("two", "bar"));
+ c.session.messageTransfer(arg::destination="ext",
arg::content=Message("one", "bar"));
+ c.session.messageTransfer(arg::destination="ext",
arg::content=Message("two", "bar"));
c.session.close();
c.connection.close();
@@ -202,11 +233,11 @@
BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
// Verify bindings
- r.session.messageTransfer(arg::destination="exd",
arg::content=TransferContent("xxx", "foo"));
+ r.session.messageTransfer(arg::destination="exd",
arg::content=Message("xxx", "foo"));
BOOST_CHECK(r.subs.get(m, "qa"));
BOOST_CHECK_EQUAL(m.getData(), "xxx");
- r.session.messageTransfer(arg::destination="ext",
arg::content=TransferContent("yyy", "bar"));
+ r.session.messageTransfer(arg::destination="ext",
arg::content=Message("yyy", "bar"));
BOOST_CHECK(r.subs.get(m, "qb"));
BOOST_CHECK_EQUAL(m.getData(), "yyy");
@@ -254,8 +285,8 @@
ClusterFixture cluster(2);
Client c0(cluster[0]);
c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
- c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+ c0.session.messageTransfer(arg::content=Message("foo", "q"));
+ c0.session.messageTransfer(arg::content=Message("bar", "q"));
c0.session.close();
Client c1(cluster[1]);
Message msg;
@@ -268,19 +299,19 @@
QPID_AUTO_TEST_CASE(testMessageDequeue) {
// Enqueue on one broker, dequeue on two others.
ClusterFixture cluster (3);
- Client c0(cluster[0]);
+ Client c0(cluster[0], "c0");
c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
- c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+ c0.session.messageTransfer(arg::content=Message("foo", "q"));
+ c0.session.messageTransfer(arg::content=Message("bar", "q"));
Message msg;
// Dequeue on 2 others, ensure correct order.
- Client c1(cluster[1]);
+ Client c1(cluster[1], "c1");
BOOST_CHECK(c1.subs.get(msg, "q"));
BOOST_CHECK_EQUAL("foo", msg.getData());
- Client c2(cluster[2]);
+ Client c2(cluster[2], "c2");
BOOST_CHECK(c1.subs.get(msg, "q"));
BOOST_CHECK_EQUAL("bar", msg.getData());
@@ -298,8 +329,8 @@
c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
// Now send messages
Client c1(cluster[1]);
- c1.session.messageTransfer(arg::content=TransferContent("foo", "q"));
- c1.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+ c1.session.messageTransfer(arg::content=Message("foo", "q"));
+ c1.session.messageTransfer(arg::content=Message("bar", "q"));
// Check they arrived
Message m;
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Sep 11 06:26:10 2008
@@ -69,19 +69,20 @@
-->
<control name="session-state" code="0x4" label="Set session state during a
brain dump.">
<!-- Target session deduced from channel number. -->
- <field name="replay-id" type="sequence-no"/>
- <field name="send-id" type="sequence-no"/>
- <field name="sent-incomplete" type="sequence-set"/>
- <field name="expected-id" type="sequence-no"/>
- <field name="received-id" type="sequence-no"/>
- <field name="unknown-completed" type="sequence-set"/>
- <field name="received-incomplete" type="sequence-set"/>
+ <field name="replay-start" type="sequence-no"/> <!-- Replay
frames will start from this point.-->
+ <field name="sent-incomplete" type="sequence-set"/> <!-- Commands
sent and incomplete. -->
+
+ <field name="expected" type="sequence-no"/> <!-- Idempotence
barrier -->
+ <field name="received" type="sequence-no"/> <!-- Received up
to here > expected-->
+ <field name="unknown-completed" type="sequence-set"/> <!-- Completed
but not known to peer. -->
+ <field name="received-incomplete" type="sequence-set"/> <!-- Received
and incomplete -->
</control>
<control name="shadow-ready" code="0x5" label="End of shadow connection
dump.">
- <field name="cluster-id" type="uint64"/>
- <field name="user-id" type="vbin16"/>
+ <field name="member-id" type="uint64"/>
+ <field name="connection-id" type="uint64"/>
</control>
+
</class>
</amqp>