Author: aconway
Date: Thu Oct 16 10:07:26 2008
New Revision: 705287
URL: http://svn.apache.org/viewvc?rev=705287&view=rev
Log:
Fix race in cluster causing incorrect known-broker lists to be sent to clients.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu Oct 16
10:07:26 2008
@@ -32,17 +32,18 @@
#include <boost/bind.hpp>
#include <boost/format.hpp>
-using namespace qpid::client;
+
+namespace qpid {
+namespace client {
+
using namespace qpid::framing;
using namespace qpid::framing::connection;
using namespace qpid::sys;
-
using namespace qpid::framing::connection;//for connection error codes
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const
ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
- failover(new FailoverListener()),
version(v),
nextChannel(1)
{
@@ -51,7 +52,6 @@
handler.out = boost::bind(&Connector::send, boost::ref(connector), _1);
handler.onClose = boost::bind(&ConnectionImpl::closed, this,
CLOSE_CODE_NORMAL, std::string());
-
//only set error handler once open
handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
}
@@ -69,7 +69,8 @@
Mutex::ScopedLock l(lock);
session->setChannel(channel ? channel : nextChannel++);
boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
- if (s.lock()) throw SessionBusyException();
+ boost::shared_ptr<SessionImpl> ss = s.lock();
+ if (ss) throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel()
<< " attachd to " << ss->getId()));
s = session;
}
@@ -110,7 +111,7 @@
connector->init();
handler.waitForOpen();
- if (failover.get()) failover->start(shared_from_this());
+ failover.reset(new FailoverListener(shared_from_this(),
handler.knownBrokersUrls));
}
void ConnectionImpl::idleIn()
@@ -176,7 +177,6 @@
}
std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
- // FIXME aconway 2008-10-08: ensure we never return empty list, always
include self Url.
return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
}
@@ -187,4 +187,6 @@
return simpl;
}
-void ConnectionImpl::stopFailoverListener() { failover.reset(); }
+void ConnectionImpl::stopFailoverListener() { failover->stop(); }
+
+}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Thu Oct 16
10:07:26 2008
@@ -89,6 +89,8 @@
std::vector<Url> getKnownBrokers();
void registerFailureCallback ( boost::function<void ()> fn ) {
failureCallback = fn; }
void stopFailoverListener();
+
+ framing::ProtocolVersion getVersion() { return version; }
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Thu Oct 16
10:07:26 2008
@@ -72,7 +72,7 @@
boost::state_saver<bool> reset(running); // Reset to false on exit.
running = true;
try {
- while (!queue->isClosed()) {
+ while (true) {
Mutex::ScopedUnlock u(lock);
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
@@ -92,12 +92,14 @@
}
}
}
- session.sync(); // Make sure all our acks are received before
returning.
}
- catch (const ClosedException& e)
- {
- QPID_LOG(debug, "Ignored exception in client dispatch thread: " <<
e.what());
- } //ignore it and return
+ catch (const ClosedException& e) {
+ QPID_LOG(debug, "Dispatch thread exiting, session closed: " <<
session.getId());
+ try {
+ session.sync(); // Make sure all our acks are received before
returning.
+ }
+ catch(...) {}
+ }
catch (const std::exception& e) {
QPID_LOG(error, "Exception in client dispatch thread: " << e.what());
if ( failoverHandler )
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Thu Oct
16 10:07:26 2008
@@ -21,6 +21,7 @@
#include "FailoverListener.h"
#include "SessionBase_0_10Access.h"
#include "qpid/client/SubscriptionManager.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/log/Helpers.h"
@@ -40,40 +41,58 @@
return s;
}
-FailoverListener::FailoverListener() {}
+FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c,
const std::vector<Url>& initUrls)
+ : knownBrokers(initUrls)
+ {
+ // Special versions used to mark cluster catch-up connections
+ // which do not need a FailoverListener
+ if (c->getVersion().getMajor() >= 0x80) {
+ QPID_LOG(debug, "No failover listener for catch-up connection.");
+ return;
+ }
-void FailoverListener::start(const boost::shared_ptr<ConnectionImpl>& c) {
- Session session = makeSession(c->newSession(std::string(), 0));
+ Session session =
makeSession(c->newSession(AMQ_FAILOVER+framing::Uuid(true).str(), 0));
if (session.exchangeQuery(arg::name=AMQ_FAILOVER).getNotFound()) {
session.close();
return;
}
subscriptions.reset(new SubscriptionManager(session));
- std::string qname=AMQ_FAILOVER + "." + session.getId().getName();
+ std::string qname=session.getId().getName();
session.queueDeclare(arg::queue=qname, arg::exclusive=true,
arg::autoDelete=true);
session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
subscriptions->subscribe(*this, qname, FlowControl::unlimited());
thread = sys::Thread(*subscriptions);
}
-void FailoverListener::stop() {
- if (subscriptions.get()) subscriptions->stop();
- if (thread.id()) thread.join();
- if (subscriptions.get()) subscriptions->getSession().close();
- thread=sys::Thread();
- subscriptions.reset();
-}
FailoverListener::~FailoverListener() {
try { stop(); }
catch (const std::exception& e) {}
}
+void FailoverListener::stop() {
+ if (subscriptions.get())
+ subscriptions->stop();
+
+ if (thread.id() == sys::Thread::current().id()) {
+ // FIXME aconway 2008-10-16: this can happen if ConnectionImpl
+ // dtor runs when my session drops its weak pointer lock.
+ // For now, leak subscriptions to prevent a core if we delete
+ // without joining.
+ subscriptions.release();
+ }
+ else if (thread.id()) {
+ thread.join();
+ thread=sys::Thread();
+ subscriptions.reset(); // Safe to delete after join.
+ }
+}
+
void FailoverListener::received(Message& msg) {
sys::Mutex::ScopedLock l(lock);
knownBrokers.clear();
framing::Array urlArray;
msg.getHeaders().getArray("amq.failover", urlArray);
- for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i <
urlArray.end(); ++i )
+ for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i
!= urlArray.end(); ++i )
knownBrokers.push_back(Url((*i)->get<std::string>()));
QPID_LOG(info, "Known-brokers update: " << log::formatList(knownBrokers));
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h Thu Oct 16
10:07:26 2008
@@ -38,11 +38,10 @@
*/
class FailoverListener : public MessageListener {
public:
- FailoverListener();
+ FailoverListener(const boost::shared_ptr<ConnectionImpl>&, const
std::vector<Url>& initUrls);
~FailoverListener();
- void start(const boost::shared_ptr<ConnectionImpl>&);
void stop();
-
+
std::vector<Url> getKnownBrokers() const;
void received(Message& msg);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Oct 16
10:07:26 2008
@@ -30,6 +30,7 @@
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterDumpOfferBody.h"
#include "qpid/framing/ClusterDumpStartBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
@@ -76,6 +77,7 @@
void dumpRequest(const std::string& url) { cluster.dumpRequest(member,
url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
+ void configChange(const std::string& addresses) {
cluster.configChange(member, addresses, l); }
void dumpOffer(uint64_t dumpee) { cluster.dumpOffer(member, dumpee, l); }
void dumpStart(uint64_t dumpee, const std::string& url) {
cluster.dumpStart(member, dumpee, url, l); }
void shutdown() { cluster.shutdown(member, l); }
@@ -89,14 +91,14 @@
cpg(*this),
name(name_),
myUrl(url_),
- memberId(cpg.self()),
+ myId(cpg.self()),
cpgDispatchHandle(
cpg,
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- deliverQueue(boost::bind(&Cluster::process, this, _1), poller),
+ deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
mcastId(0),
mgmtObject(0),
state(INIT),
@@ -115,20 +117,20 @@
failoverExchange.reset(new FailoverExchange(this));
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
+ deliverQueue.start();
cpg.join(name);
- QPID_LOG(notice, *this << " joining cluster " << name.str());
+ QPID_LOG(notice, *this << " will join cluster " << name.str());
}
Cluster::~Cluster() {
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
}
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+bool Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Lock l(lock);
- // FIXME aconway 2008-10-08: what keeps catchUp connections in memory if
not in map?
- // esp shadow connections? See race comment in getConnection.
- assert(!c->isCatchUp());
- connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+ bool result =
connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)).second;
+ assert(result);
+ return result;
}
void Cluster::erase(ConnectionId id) {
@@ -136,14 +138,19 @@
connections.erase(id);
}
-void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
+void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId&
id, uint32_t seq) {
Lock l(lock);
- mcastControl(body, cptr, l);
+ mcastControl(body, id, seq, l);
}
-void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr,
Lock&) {
- Lock l(lock);
- Event e(Event::control(body, ConnectionId(memberId, cptr), ++mcastId));
+void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId&
id, uint32_t seq, Lock& l) {
+ Event e(Event::control(body, id, seq));
+ QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
+ mcast(e, l);
+}
+
+void Cluster::mcastControl(const framing::AMQBody& body, Lock& l) {
+ Event e(Event::control(body, ConnectionId(myId,0), ++mcastId));
QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
mcast(e, l);
}
@@ -166,8 +173,8 @@
void Cluster::mcast(const Event& e, Lock&) {
if (state == LEFT)
return;
- if (state < READY && e.isConnection()) {
- // Stall outgoing connection events.
+ if (state <= CATCHUP && e.isConnection()) {
+ // Stall outgoing connection events untill we are fully READY
QPID_LOG(trace, *this << " MCAST deferred: " << e );
mcastQueue.push_back(e);
}
@@ -192,10 +199,10 @@
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
+ if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
QPID_LOG(notice, *this << " leaving cluster " << name.str());
if (!deliverQueue.isStopped()) deliverQueue.stop();
- if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
try { cpg.leave(name); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error leaving process group: " <<
e.what());
@@ -211,14 +218,15 @@
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId&
connectionId, Lock&) {
ConnectionMap::iterator i = connections.find(connectionId);
if (i == connections.end()) {
- if (connectionId.getMember() == memberId) { // Closed local connection
+ if (connectionId.getMember() == myId) { // Closed local connection
QPID_LOG(warning, *this << " attempt to use closed connection " <<
connectionId);
return boost::intrusive_ptr<Connection>();
}
else { // New shadow connection
std::ostringstream mgmtId;
mgmtId << name.str() << ":" << connectionId;
- ConnectionMap::value_type value(connectionId, new
Connection(*this, shadowOut, mgmtId.str(), connectionId));
+ ConnectionMap::value_type value(connectionId,
+ new Connection(*this, shadowOut,
mgmtId.str(), connectionId));
i = connections.insert(value).first;
}
}
@@ -242,50 +250,54 @@
{
Mutex::ScopedLock l(lock);
MemberId from(nodeid, pid);
- Event e = Event::delivered(from, msg, msg_len);
+ deliver(Event::delivered(from, msg, msg_len), l);
+}
+
+void Cluster::deliver(const Event& e, Lock&) {
if (state == LEFT) return;
- QPID_LOG(trace, *this << " DLVR: " << e);
- if (e.isCluster() && state != DUMPEE) // Process cluster controls
immediately unless in DUMPEE state.
- process(e, l);
- else if (state != NEWBIE) // Newbie discards events up to the dump offer.
- deliverQueue.push(e);
+ QPID_LOG(trace, *this << " PUSH: " << e);
+ deliverQueue.push(e); // Otherwise enqueue for processing.
}
-void Cluster::process(const Event& e) {
+void Cluster::delivered(const Event& e) {
Lock l(lock);
- process(e,l);
+ delivered(e,l);
}
-void Cluster::process(const Event& e, Lock& l) {
+void Cluster::delivered(const Event& e, Lock& l) {
try {
Buffer buf(e);
AMQFrame frame;
if (e.isCluster()) {
while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
+ QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
ClusterDispatcher dispatch(*this, e.getMemberId(), l);
if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
}
}
else { // e.isConnection()
- boost::intrusive_ptr<Connection> connection =
getConnection(e.getConnectionId(), l);
- if (connection) { // Ignore if no connection.
- if (e.getType() == DATA) {
- QPID_LOG(trace, *this << " PROC: " << e);
- connection->deliverBuffer(buf);
- }
- else { // control
+ if (state == NEWBIE) {
+ QPID_LOG(trace, *this << " DROP: " << e);
+ }
+ else {
+ boost::intrusive_ptr<Connection> connection =
getConnection(e.getConnectionId(), l);
+ if (!connection) return;
+ if (e.getType() == CONTROL) {
while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " PROC: " << e << " " <<
frame);
+ QPID_LOG(trace, *this << " DLVR: " << e << " " <<
frame);
connection->delivered(frame);
}
}
+ else {
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ connection->deliverBuffer(buf);
+ }
}
}
}
catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster process: " << e.what());
+ QPID_LOG(critical, *this << " error in cluster delivered: " <<
e.what());
leave(l);
}
}
@@ -304,11 +316,11 @@
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = " joined "; break;
- case CPG_REASON_LEAVE: reasonString = " left "; break;
- case CPG_REASON_NODEDOWN: reasonString = " node-down "; break;
- case CPG_REASON_NODEUP: reasonString = " node-up "; break;
- case CPG_REASON_PROCDOWN: reasonString = " process-down "; break;
+ case CPG_REASON_JOIN: reasonString = " (joined) "; break;
+ case CPG_REASON_LEAVE: reasonString = " (left) "; break;
+ case CPG_REASON_NODEDOWN: reasonString = " (node-down) "; break;
+ case CPG_REASON_NODEUP: reasonString = " (node-up) "; break;
+ case CPG_REASON_PROCDOWN: reasonString = " (process-down) "; break;
default: reasonString = " ";
}
qpid::cluster::MemberId member(*p);
@@ -338,61 +350,52 @@
cpg_name */*group*/,
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined)
+ cpg_address */*joined*/, int /*nJoined*/)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, *this << " configuration change: " << AddrList(current,
nCurrent)
+ QPID_LOG(debug, *this << " enqueue config change: " << AddrList(current,
nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
- map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
+ std::string addresses;
+ for (cpg_address* p = current; p < current+nCurrent; ++p)
+ addresses.append(MemberId(*p).str());
+ deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(),
addresses), myId), l);
+}
+
+void Cluster::configChange(const MemberId&, const std::string& addresses,
Lock& l) {
+ bool memberChange = map.configChange(addresses);
if (state == LEFT) return;
- if (!map.isAlive(memberId)) { leave(l); return; }
- if(state == INIT) { // First configChange
- if (map.aliveCount() == 1) {
+ if (!map.isAlive(myId)) { // Final config change.
+ leave(l);
+ return;
+ }
+
+ if (state == INIT) { // First configChange
+ if (map.aliveCount() == 1) {
QPID_LOG(info, *this << " first in cluster at " << myUrl);
- map = ClusterMap(memberId, myUrl, true);
+ state = READY;
+ if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ map = ClusterMap(myId, myUrl, true);
memberUpdate(l);
- unstall(l);
}
else { // Joining established group.
state = NEWBIE;
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(),
myUrl.str()), 0, l);
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(),
myUrl.str()), l);
QPID_LOG(debug, *this << " send dump-request " << myUrl);
}
}
- else if (state >= READY)
+ else if (state >= READY && memberChange)
memberUpdate(l);
}
-void Cluster::dumpInDone(const ClusterMap& m) {
- Lock l(lock);
- dumpedMap = m;
- checkDumpIn(l);
-}
+
+
void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
if (state == READY && map.isNewbie(id)) {
state = OFFER;
QPID_LOG(debug, *this << " send dump-offer to " << id);
- mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), 0, l);
- }
-}
-
-void Cluster::unstall(Lock& l) {
- // Called with lock held
- switch (state) {
- case INIT: case DUMPEE: case DUMPER: case READY:
- QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size()
- << " mcast=" << mcastQueue.size());
- deliverQueue.start();
- state = READY;
- for_each(mcastQueue.begin(), mcastQueue.end(),
boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
- mcastQueue.clear();
- if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- break;
- case LEFT: break;
- case NEWBIE: case OFFER:
- assert(0);
+ mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), l);
}
}
@@ -418,23 +421,25 @@
}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
- map.ready(id, Url(url));
- if (id == memberId)
- unstall(l);
- memberUpdate(l);
+ if (map.ready(id, Url(url)))
+ memberUpdate(l);
+ if (state == CATCHUP && id == myId) {
+ QPID_LOG(debug, *this << " caught-up, going to ready mode.");
+ state = READY;
+ if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ for_each(mcastQueue.begin(), mcastQueue.end(),
boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
+ mcastQueue.clear();
+ }
}
void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {
if (state == LEFT) return;
MemberId dumpee(dumpeeInt);
boost::optional<Url> url = map.dumpOffer(dumper, dumpee);
- if (dumper == memberId) {
+ if (dumper == myId) {
assert(state == OFFER);
if (url) { // My offer was first.
- QPID_LOG(debug, *this << " mark dump point for dump to " <<
dumpee);
- // Put dump-start on my own deliver queue to mark the stall point.
- // We will stall when it is processed.
-
deliverQueue.push(Event::control(ClusterDumpStartBody(ProtocolVersion(),
dumpee, url->str()), memberId));
+ dumpStart(myId, dumpee, url->str(), l);
}
else { // Another offer was first.
QPID_LOG(debug, *this << " cancel dump offer to " << dumpee);
@@ -442,38 +447,47 @@
tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
}
}
- else if (dumpee == memberId && url) {
+ else if (dumpee == myId && url) {
assert(state == NEWBIE);
QPID_LOG(debug, *this << " accepted dump-offer from " << dumper);
state = DUMPEE;
+ deliverQueue.stop();
checkDumpIn(l);
}
}
+// FIXME aconway 2008-10-15: no longer need a separate control now
+// that the dump control is in the deliver queue.
void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const
std::string& urlStr, Lock& l) {
if (state == LEFT) return;
MemberId dumpee(dumpeeInt);
Url url(urlStr);
assert(state == OFFER);
+ state = DUMPER;
deliverQueue.stop();
QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " <<
urlStr);
- state = DUMPER;
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(
- new DumpClient(memberId, dumpee, url, broker, map, getConnections(l),
+ new DumpClient(myId, dumpee, url, broker, map, getConnections(l),
boost::bind(&Cluster::dumpOutDone, this),
boost::bind(&Cluster::dumpOutError, this, _1)));
}
+void Cluster::dumpInDone(const ClusterMap& m) {
+ Lock l(lock);
+ dumpedMap = m;
+ checkDumpIn(l);
+}
+
void Cluster::checkDumpIn(Lock& l) {
if (state == LEFT) return;
- assert(state == DUMPEE || state == NEWBIE);
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
- QPID_LOG(debug, *this << " incoming dump complete. Members: " << map);
- mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l);
- state = READY;
- // unstall when ready control is self-delivered.
+ QPID_LOG(debug, *this << " incoming dump complete, start catchup.
map=" << map);
+ mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l);
+ // Don't flush the mcast queue till we are READY, on self-deliver.
+ state = CATCHUP;
+ deliverQueue.start();
}
}
@@ -485,7 +499,8 @@
void Cluster::dumpOutDone(Lock& l) {
QPID_LOG(debug, *this << " finished sending dump.");
assert(state == DUMPER);
- unstall(l);
+ state = READY;
+ deliverQueue.start();
tryMakeOffer(map.firstNewbie(), l); // Try another offer
}
@@ -504,7 +519,7 @@
Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&,
string&) {
Lock l(lock);
- QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]");
+ QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
@@ -520,10 +535,11 @@
void Cluster::stopFullCluster(Lock& l) {
QPID_LOG(notice, *this << " shutting down cluster " << name.str());
- mcastControl(ClusterShutdownBody(), 0, l);
+ mcastControl(ClusterShutdownBody(), l);
}
void Cluster::memberUpdate(Lock& l) {
+ QPID_LOG(debug, *this << " member update, map=" << map);
std::vector<Url> vectUrl = getUrls(l);
size_t size = vectUrl.size();
@@ -552,12 +568,12 @@
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
- static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "READY",
"OFFER", "DUMPER", "LEFT" };
- return o << cluster.memberId << "(" << STATE[cluster.state] << ")";
+ static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "CATCHUP",
"READY", "OFFER", "DUMPER", "LEFT" };
+ return o << cluster.myId << "(" << STATE[cluster.state] << ")";
}
MemberId Cluster::getId() const {
- return memberId; // Immutable, no need to lock.
+ return myId; // Immutable, no need to lock.
}
broker::Broker& Cluster::getBroker() const {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Oct 16
10:07:26 2008
@@ -65,11 +65,11 @@
virtual ~Cluster();
// Connection map
- void insert(const ConnectionPtr&);
+ bool insert(const ConnectionPtr&);
void erase(ConnectionId);
// Send to the cluster
- void mcastControl(const framing::AMQBody& controlBody, Connection* cptr=0);
+ void mcastControl(const framing::AMQBody& controlBody, const
ConnectionId&, uint32_t id);
void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id);
void mcast(const Event& e);
@@ -101,7 +101,8 @@
// a Lock to call the unlocked functions.
// Unlocked versions of public functions
- void mcastControl(const framing::AMQBody& controlBody, Connection* cptr,
Lock&);
+ void mcastControl(const framing::AMQBody& controlBody, const
ConnectionId&, uint32_t, Lock&);
+ void mcastControl(const framing::AMQBody& controlBody, Lock&);
void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id,
Lock&);
void mcast(const Event& e, Lock&);
void leave(Lock&);
@@ -110,9 +111,6 @@
// Called via CPG, deliverQueue or DumpClient threads.
void tryMakeOffer(const MemberId&, Lock&);
- // Called in CPG, connection IO and DumpClient threads.
- void unstall(Lock&);
-
// Called in main thread in ~Broker.
void brokerShutdown();
@@ -123,9 +121,10 @@
void dumpOffer(const MemberId& dumper, uint64_t dumpee, Lock&);
void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const
std::string& urlStr, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
+ void configChange(const MemberId&, const std::string& addresses, Lock& l);
void shutdown(const MemberId&, Lock&);
- void process(const Event&); // deliverQueue callback
- void process(const Event&, Lock&); // unlocked version
+ void delivered(const Event&); // deliverQueue callback
+ void delivered(const Event&, Lock&); // unlocked version
// CPG callbacks, called in CPG IO thread.
void dispatch(sys::DispatchHandle&); // Dispatch CPG events.
@@ -139,6 +138,8 @@
void* /*msg*/,
int /*msg_len*/);
+ void deliver(const Event& e, Lock&);
+
void configChange( // CPG config change callback.
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
@@ -172,7 +173,7 @@
Cpg cpg;
const Cpg::Name name;
const Url myUrl;
- const MemberId memberId;
+ const MemberId myId;
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
@@ -183,7 +184,17 @@
qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns
lifecycle
- enum { INIT, NEWBIE, DUMPEE, READY, OFFER, DUMPER, LEFT } state;
+ enum {
+ INIT, ///< Initial state, no CPG messages received.
+ NEWBIE, ///< Sent dump request, waiting for dump offer.
+ DUMPEE, ///< Stalled receive queue at dump offer,
waiting for dump to complete.
+ CATCHUP, ///< Dump complete, unstalled but has not yet
seen own "ready" event.
+ READY, ///< Fully operational
+ OFFER, ///< Sent an offer, waiting for accept/reject.
+ DUMPER, ///< Offer accepted, sending a state dump.
+ LEFT ///< Final state, left the cluster.
+ } state;
+
ClusterMap map;
sys::Thread dumpThread;
boost::optional<ClusterMap> dumpedMap;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Thu Oct 16
10:07:26 2008
@@ -34,25 +34,24 @@
namespace cluster {
namespace {
-void insertSet(ClusterMap::Set& set, const ClusterMap::Map::value_type& v) {
set.insert(v.first); }
-void insertMap(ClusterMap::Map& map, FieldTable::ValueMap::value_type vt) {
- map.insert(ClusterMap::Map::value_type(vt.first,
Url(vt.second->get<std::string>())));
+void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map&
map, ClusterMap::Set& set) {
+ MemberId id(vt.first);
+ set.insert(id);
+ std::string url = vt.second->get<std::string>();
+ if (!url.empty())
+ map.insert(ClusterMap::Map::value_type(id, Url(url)));
}
-void assignMap(ClusterMap::Map& map, const FieldTable& ft) {
- map.clear();
- std::for_each(ft.begin(), ft.end(), boost::bind(&insertMap,
boost::ref(map), _1));
-}
-
-void insertFieldTable(FieldTable& ft, const ClusterMap::Map::value_type& vt) {
- return ft.setString(vt.first.str(), vt.second.str());
+void insertFieldTableFromMapValue(FieldTable& ft, const
ClusterMap::Map::value_type& vt) {
+ ft.setString(vt.first.str(), vt.second.str());
}
void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
ft.clear();
- std::for_each(map.begin(), map.end(), boost::bind(&insertFieldTable,
boost::ref(ft), _1));
+ std::for_each(map.begin(), map.end(),
boost::bind(&insertFieldTableFromMapValue, boost::ref(ft), _1));
}
+
}
ClusterMap::ClusterMap() {}
@@ -66,10 +65,21 @@
}
ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable&
membersFt) {
- assignMap(newbies, newbiesFt);
- assignMap(members, membersFt);
- std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertSet,
boost::ref(alive), _1));
- std::for_each(members.begin(), members.end(), boost::bind(&insertSet,
boost::ref(alive), _1));
+ std::for_each(newbiesFt.begin(), newbiesFt.end(),
boost::bind(&addFieldTableValue, _1, boost::ref(newbies), boost::ref(alive)));
+ std::for_each(membersFt.begin(), membersFt.end(),
boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
+}
+
+ClusterConnectionMembershipBody ClusterMap::asMethodBody() const {
+ framing::ClusterConnectionMembershipBody b;
+ b.getNewbies().clear();
+ std::for_each(newbies.begin(), newbies.end(),
boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getNewbies()), _1));
+ for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) {
+ if (!isMember(*i) && !isNewbie(*i))
+ b.getNewbies().setString(i->str(), std::string());
+ }
+ b.getMembers().clear();
+ std::for_each(members.begin(), members.end(),
boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
+ return b;
}
bool ClusterMap::configChange(
@@ -80,7 +90,7 @@
cpg_address* a;
bool memberChange=false;
for (a = left; a != left+nLeft; ++a) {
- memberChange = members.erase(*a);
+ memberChange = memberChange || members.erase(*a);
newbies.erase(*a);
}
alive.clear();
@@ -97,13 +107,6 @@
return newbies.empty() ? MemberId() : newbies.begin()->first;
}
-ClusterConnectionMembershipBody ClusterMap::asMethodBody() const {
- framing::ClusterConnectionMembershipBody b;
- assignFieldTable(b.getNewbies(), newbies);
- assignFieldTable(b.getMembers(), members);
- return b;
-}
-
std::vector<Url> ClusterMap::memberUrls() const {
std::vector<Url> urls(members.size());
std::transform(members.begin(), members.end(), urls.begin(),
@@ -121,7 +124,8 @@
for (ClusterMap::Set::const_iterator i = m.alive.begin(); i !=
m.alive.end(); ++i) {
o << *i;
if (m.isMember(*i)) o << "(member)";
- if (m.isNewbie(*i)) o << "(newbie)";
+ else if (m.isNewbie(*i)) o << "(newbie)";
+ else o << "(unknown)";
o << " ";
}
return o;
@@ -139,6 +143,23 @@
return isAlive(id) && members.insert(Map::value_type(id,url)).second;
}
+bool ClusterMap::configChange(const std::string& addresses) {
+ bool memberChange = false;
+ Set update;
+ for (std::string::const_iterator i = addresses.begin(); i <
addresses.end(); i += 8)
+ update.insert(MemberId(std::string(i, i+8)));
+ Set removed;
+ std::set_difference(alive.begin(), alive.end(),
+ update.begin(), update.end(),
+ std::inserter(removed, removed.begin()));
+ alive = update;
+ for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) {
+ memberChange = memberChange || members.erase(*i);
+ newbies.erase(*i);
+ }
+ return memberChange;
+}
+
boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const
MemberId& to) {
Map::iterator i = newbies.find(to);
if (isAlive(from) && i != newbies.end()) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Thu Oct 16
10:07:26 2008
@@ -58,6 +58,8 @@
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined);
+ bool configChange(const std::string& addresses);
+
bool isNewbie(const MemberId& id) const { return newbies.find(id) !=
newbies.end(); }
bool isMember(const MemberId& id) const { return members.find(id) !=
members.end(); }
bool isAlive(const MemberId& id) const { return alive.find(id) !=
alive.end(); }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Oct 16
10:07:26 2008
@@ -44,7 +44,7 @@
: cluster(c), self(myId), catchUp(false), output(*this, out),
connection(&output, cluster.getBroker(), wrappedId)
{
- QPID_LOG(debug, "New connection: " << *this);
+ QPID_LOG(debug, cluster << " new connection: " << *this);
}
// Local connections
@@ -53,11 +53,11 @@
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
connection(&output, cluster.getBroker(), wrappedId)
{
- QPID_LOG(debug, "New connection: " << *this);
+ QPID_LOG(debug, cluster << " new connection: " << *this);
}
Connection::~Connection() {
- QPID_LOG(debug, "Deleted connection: " << *this);
+ QPID_LOG(debug, cluster << " deleted connection: " << *this);
}
bool Connection::doOutput() {
@@ -72,32 +72,36 @@
output.deliverDoOutput(requested);
}
+// FIXME aconway 2008-10-15: changes here, dubious.
+
// Received from a directly connected client.
void Connection::received(framing::AMQFrame& f) {
- QPID_LOG(trace, "RECV " << *this << ": " << f);
- if (isShadow()) {
- // Intercept the close that completes catch-up for shadow a connection.
- if (isShadow() && f.getMethod() &&
f.getMethod()->isA<ConnectionCloseBody>()) {
- catchUp = false;
- cluster.insert(boost::intrusive_ptr<Connection>(this));
+ QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
+ if (isLocal()) {
+ currentChannel = f.getChannel();
+ if (!framing::invoke(*this, *f.getBody()).wasHandled())
+ connection.received(f);
+ }
+ else { // Shadow or dumped ex catch-up connection.
+ if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
+ if (isShadow()) {
+ QPID_LOG(debug, cluster << " inserting connection " << *this);
+ cluster.insert(boost::intrusive_ptr<Connection>(this));
+ }
AMQFrame ok(in_place<ConnectionCloseOkBody>());
connection.getOutput().send(ok);
output.setOutputHandler(discardHandler);
+ catchUp = false;
}
else
- QPID_LOG(warning, *this << " ignoring unexpected frame: " << f);
- }
- else {
- currentChannel = f.getChannel();
- if (!framing::invoke(*this, *f.getBody()).wasHandled())
- connection.received(f);
+ QPID_LOG(warning, cluster << " ignoring unexpected frame " <<
*this << ": " << f);
}
}
// Delivered from cluster.
void Connection::delivered(framing::AMQFrame& f) {
- QPID_LOG(trace, "DLVR " << *this << ": " << f);
- assert(!isCatchUp());
+ QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f);
+ assert(!catchUp);
// Handle connection controls, deliver other frames to connection.
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
@@ -106,24 +110,25 @@
void Connection::closed() {
try {
- QPID_LOG(debug, "Connection closed " << *this);
if (catchUp) {
- QPID_LOG(critical, cluster << " error on catch-up connection " <<
*this);
+ QPID_LOG(critical, cluster << " catch-up connection closed
prematurely " << *this);
cluster.leave();
}
- else if (isDump())
+ else if (isDumped()) {
+ QPID_LOG(debug, cluster << " closed dump connection " << *this);
connection.closed();
+ }
else if (isLocal()) {
+ QPID_LOG(debug, cluster << " local close of replicated connection
" << *this);
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
output.setOutputHandler(discardHandler);
- cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
- ++mcastSeq;
+ cluster.mcastControl(ClusterConnectionDeliverCloseBody(), self,
++mcastSeq);
}
}
catch (const std::exception& e) {
- QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
+ QPID_LOG(error, cluster << " error closing connection " << *this << ":
" << e.what());
}
}
@@ -135,7 +140,7 @@
// Decode data from local clients.
size_t Connection::decode(const char* buffer, size_t size) {
- if (catchUp || isDump()) { // Handle catch-up locally.
+ if (catchUp) { // Handle catch-up locally.
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
received(localDecoder.frame);
@@ -174,26 +179,39 @@
received,
unknownCompleted,
receivedIncomplete);
- QPID_LOG(debug, "Received session state dump for " << s->getId());
+ QPID_LOG(debug, cluster << " received session state dump for " <<
s->getId());
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
ConnectionId shadow = ConnectionId(memberId, connectionId);
- QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " <<
shadow);
+ QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes
shadow " << shadow);
self = shadow;
}
-void Connection::membership(const FieldTable& urls, const FieldTable& states) {
- cluster.dumpInDone(ClusterMap(urls,states));
- catchUp = false;
- self.second = 0; // Mark this as completed dump connection.
+void Connection::membership(const FieldTable& newbies, const FieldTable&
members) {
+ QPID_LOG(debug, cluster << " incoming dump complete on connection " <<
*this);
+ cluster.dumpInDone(ClusterMap(newbies, members));
+ self.second = 0; // Mark this as completed dump connection.
}
-bool Connection::isLocal() const { return self.first == cluster.getId() &&
self.second == this; }
+bool Connection::isLocal() const {
+ return self.first == cluster.getId() && self.second == this;
+}
+
+bool Connection::isShadow() const {
+ return self.first != cluster.getId();
+}
+
+bool Connection::isDumped() const {
+ return self.first == cluster.getId() && self.second == 0;
+}
std::ostream& operator<<(std::ostream& o, const Connection& c) {
- return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow")
- << (c.isCatchUp() ? ",catchup" : "") << ")";
+ const char* type="unknown";
+ if (c.isLocal()) type = "local";
+ else if (c.isShadow()) type = "shadow";
+ else if (c.isDumped()) type = "dumped";
+ return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "")
<< ")";
}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Oct 16
10:07:26 2008
@@ -64,12 +64,13 @@
bool isLocal() const;
/** True for connections that are shadowing remote broker connections */
- bool isShadow() const { return !isLocal(); }
+ bool isShadow() const;
/** True if the connection is in "catch-up" mode: building initial broker
state. */
bool isCatchUp() const { return catchUp; }
- bool isDump() const { return self.getPointer() == 0; }
+ /** True if the connection is a completed shared dump connection */
+ bool isDumped() const;
Cluster& getCluster() { return cluster; }
@@ -103,6 +104,7 @@
void membership(const framing::FieldTable&, const framing::FieldTable&);
private:
+ bool catcUp;
void deliverClose();
void deliverDoOutput(uint32_t requested);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Oct 16
10:07:26 2008
@@ -168,9 +168,6 @@
// authentication etc. See ConnectionSettings.
shadowConnection.open(dumpeeUrl, bc.getUserId());
- // Stop the failover listener as its session will conflict with
re-creating-sessions
-
client::ConnectionAccess::getImpl(shadowConnection)->stopFailoverListener();
-
dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession,
this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
dumpConnection->getId().getMember(),
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu
Oct 16 10:07:26 2008
@@ -105,8 +105,9 @@
// Note we may send 0 size request if there's more than 2*estimate in the
buffer.
// Send it anyway to keep the doOutput chain going until we are sure
there's no more output
// (in deliverDoOutput)
- //
-
parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(),
request), &parent);
+ //
+ // FIXME aconway 2008-10-16: use ++parent.mcastSeq as sequence no,not 0
+
parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(),
request), parent.getId(), 0);
QPID_LOG(trace, &parent << "Send doOutput request for " << request);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Oct 16
10:07:26 2008
@@ -177,46 +177,52 @@
return s;
}
-template <class T> std::set<uint16_t> knownBrokerPorts(T& source, size_t n) {
+template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
vector<Url> urls = source.getKnownBrokers();
- for (size_t retry=1000; urls.size() != n && retry != 0; --retry) {
- ::usleep(1000);
- urls = source.getKnownBrokers();
+ BOOST_MESSAGE("knownBrokerPorts " << n << ": " << urls);
+ if (n >= 0) {
+ for (size_t retry=10; urls.size() != unsigned(n) && retry != 0;
--retry) {
+ ::usleep(100000);
+ urls = source.getKnownBrokers();
+ BOOST_MESSAGE("knownBrokerPorts retry: " << urls);
+ }
}
set<uint16_t> s;
- for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
- BOOST_MESSAGE("Failover URL: " << *i);
- BOOST_CHECK(i->size() >= 1);
- BOOST_CHECK((*i)[0].get<TcpAddress>());
+ for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
s.insert((*i)[0].get<TcpAddress>()->port);
- }
return s;
}
-QPID_AUTO_TEST_CASE(testFailoverListener) {
- ClusterFixture cluster(2);
- Client c0(cluster[0], "c0");
- FailoverListener fl;
- fl.start(ConnectionAccess::getImpl(c0.connection));
- set<uint16_t> set0=makeSet(cluster);
-
- BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2));
- cluster.add();
- BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(fl, 3));
- cluster.kill(2);
- BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2));
-}
-
QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
- ClusterFixture cluster(2);
+ ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
- set<uint16_t> set0=makeSet(cluster);
+ set<uint16_t> kb0 = knownBrokerPorts(c0.connection);
+ BOOST_CHECK_EQUAL(kb0.size(), 1);
+ BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
+
+ cluster.add();
+ Client c1(cluster[1], "c1");
+ set<uint16_t> kb1 = knownBrokerPorts(c1.connection);
+ kb0 = knownBrokerPorts(c0.connection, 2);
+ BOOST_CHECK_EQUAL(kb1.size(), 2);
+ BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
+ BOOST_CHECK_EQUAL(kb1,kb0);
- BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2));
cluster.add();
- BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(c0.connection, 3));
- cluster.kill(2);
- BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2));
+ Client c2(cluster[2], "c2");
+ set<uint16_t> kb2 = knownBrokerPorts(c2.connection);
+ kb1 = knownBrokerPorts(c1.connection, 3);
+ kb0 = knownBrokerPorts(c0.connection, 3);
+ BOOST_CHECK_EQUAL(kb2.size(), 3);
+ BOOST_CHECK_EQUAL(kb2, makeSet(cluster));
+ BOOST_CHECK_EQUAL(kb2,kb0);
+ BOOST_CHECK_EQUAL(kb2,kb1);
+
+ cluster.kill(1);
+ kb0 = knownBrokerPorts(c0.connection, 2);
+ kb2 = knownBrokerPorts(c2.connection, 2);
+ BOOST_CHECK_EQUAL(kb0.size(), 2);
+ BOOST_CHECK_EQUAL(kb0, kb2);
}
QPID_AUTO_TEST_CASE(DumpConsumers) {
@@ -238,6 +244,7 @@
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u);
+
// Activate the subscription, ensure message removed on all queues.
c0.subs.setFlowControl("q", FlowControl::messageCredit(1));
Message m;
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Oct 16 10:07:26 2008
@@ -44,6 +44,10 @@
<control name="ready" code="0x10" label="New member is ready.">
<field name="url" type="str16"/>
</control>
+
+ <control name="config-change" code="0x11" label="Raw cluster membership.">
+ <field name="current" type="vbin16"/> <!-- packed member-id array -->
+ </control>
<control name="shutdown" code="0x20" label="Shut down entire cluster"/>
</class>