Author: aconway
Date: Mon Dec 1 07:40:14 2008
New Revision: 722101
URL: http://svn.apache.org/viewvc?rev=722101&view=rev
Log:
cluster: Queue outgoing multicast events.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=722101&r1=722100&r2=722101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Dec 1
07:40:14 2008
@@ -99,6 +99,7 @@
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
+ mcastQueue(boost::bind(&Event::mcast, _1, boost::cref(name),
boost::ref(cpg)), poller),
mcastId(0),
mgmtObject(0),
state(INIT),
@@ -116,6 +117,7 @@
failoverExchange.reset(new FailoverExchange(this));
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
+ mcastQueue.start();
QPID_LOG(notice, *this << " joining cluster " << name.str() << " with
url=" << myUrl);
if (useQuorum) quorum.init();
cpg.join(name);
@@ -176,10 +178,10 @@
if (state <= CATCHUP && e.isConnection()) {
// Stall outgoing connection events untill we are fully READY
QPID_LOG(trace, *this << " MCAST deferred: " << e );
- mcastQueue.push_back(e);
+ mcastStallQueue.push_back(e);
}
else
- e.mcast(name, cpg);
+ mcastQueue.push(e);
}
std::vector<Url> Cluster::getUrls() const {
@@ -432,8 +434,8 @@
state = READY;
QPID_LOG(notice, *this << " caught up, active cluster member");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- for_each(mcastQueue.begin(), mcastQueue.end(),
boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
- mcastQueue.clear();
+ for_each(mcastStallQueue.begin(), mcastStallQueue.end(),
boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
+ mcastStallQueue.clear();
}
}
@@ -545,7 +547,7 @@
}
void Cluster::memberUpdate(Lock& l) {
- QPID_LOG(info, *this << " member update, map=" << map);
+ QPID_LOG(info, *this << map.memberCount() << " members: " << map);
std::vector<Url> urls = getUrls(l);
size_t size = urls.size();
failoverExchange->setUrls(urls);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=722101&r1=722100&r2=722101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Dec 1
07:40:14 2008
@@ -190,8 +190,8 @@
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- PollableEventQueue deliverQueue;
- PlainEventQueue mcastQueue;
+ PollableEventQueue deliverQueue, mcastQueue;
+ PlainEventQueue mcastStallQueue;
uint32_t mcastId;
framing::Uuid clusterId;