Author: aconway
Date: Wed Nov 19 05:52:51 2008
New Revision: 718961
URL: http://svn.apache.org/viewvc?rev=718961&view=rev
Log:
Fix --cluster-cman option to enable cman integration.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=718961&r1=718960&r2=718961&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Nov 19
05:52:51 2008
@@ -85,8 +85,7 @@
bool invoke(AMQBody& body) { return framing::invoke(*this,
body).wasHandled(); }
};
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b)
:
- isQuorate(isQuorateImpl),
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b,
bool useQuorum) :
broker(b),
poller(b.getPoller()),
cpg(*this),
@@ -117,8 +116,9 @@
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
+ QPID_LOG(notice, *this << " joining cluster " << name.str());
+ if (useQuorum) quorum.init();
cpg.join(name);
- QPID_LOG(notice, *this << " will join cluster " << name.str());
}
Cluster::~Cluster() {
@@ -592,11 +592,8 @@
return broker; // Immutable, no need to lock.
}
-/** Default implementation for isQuorateImpl when there is no quorum service.
*/
-bool Cluster::isQuorateImpl() { return true; }
-
void Cluster::checkQuorum() {
- if (!isQuorate()) {
+ if (!quorum.isQuorate()) {
QPID_LOG(critical, *this << " disconnected from cluster quorum,
shutting down");
leave();
throw Exception(QPID_MSG(*this << " disconnected from cluster
quorum."));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=718961&r1=718960&r2=718961&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Nov 19
05:52:51 2008
@@ -24,6 +24,7 @@
#include "NoOpConnectionOutputHandler.h"
#include "ClusterMap.h"
#include "FailoverExchange.h"
+#include "Quorum.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
@@ -66,7 +67,7 @@
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
*/
- Cluster(const std::string& name, const Url& url, broker::Broker&);
+ Cluster(const std::string& name, const Url& url, broker::Broker&, bool
useQuorum);
virtual ~Cluster();
@@ -176,7 +177,6 @@
void dumpOutDone(Lock&);
void setClusterId(const framing::Uuid&);
- static bool isQuorateImpl();
mutable sys::Monitor lock;
@@ -215,6 +215,8 @@
size_t lastSize;
boost::shared_ptr<FailoverExchange> failoverExchange;
+ Quorum quorum;
+
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=718961&r1=718960&r2=718961&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Nov 19
05:52:51 2008
@@ -39,7 +39,10 @@
struct ClusterValues {
string name;
string url;
+ bool quorum;
+ ClusterValues() : quorum(false) {}
+
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
return Url(url);
@@ -59,6 +62,9 @@
("cluster-url", optValue(values.url,"URL"),
"URL of this broker, advertized to the cluster.\n"
"Defaults to a URL listing all the local IP addresses\n")
+#if HAVE_LIBCMAN
+ ("cluster-cman", optValue(values.quorum), "Integrate with Cluster
Manager (CMAN) cluster.")
+#endif
;
}
};
@@ -78,7 +84,7 @@
if (values.name.empty()) return; // Only if --cluster-name option was
specified.
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- cluster = new Cluster(values.name,
values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker);
+ cluster = new Cluster(values.name,
values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum);
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(),
*cluster)));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp?rev=718961&r1=718960&r2=718961&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp Wed Nov 19
05:52:51 2008
@@ -18,7 +18,7 @@
* under the License.
*
*/
-#include "Quorum.h"
+#include "Quorum_cman.h"
#include "qpid/log/Statement.h"
#include "qpid/Options.h"
#include "qpid/sys/Time.h"
@@ -30,24 +30,20 @@
Quorum::~Quorum() { if (cman) cman_finish(cman); }
-void Quorum::addOption(Options& opts) {
- opts.addOptions()("cluster-cman", optValue(enable), "Enable integration
with CMAN Cluster Manager");
-}
-
void Quorum::init() {
- if (enable) {
- cman = cman_init(0);
- if (cman == 0) throw ErrnoException("Can't connect to cman service");
- // FIXME aconway 2008-11-13: configure max wait.
- for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) {
- QPID_LOG(notice, "Waiting for cluster quorum: " <<
sys::strError(errno));
- sys::sleep(1);
- }
- if (!cman_is_quorate(cman))
- throw ErrnoException("Timed out waiting for cluster quorum");
+ QPID_LOG(info, "Waiting for cluster quorum");
+ enable = true;
+ cman = cman_init(0);
+ if (cman == 0) throw ErrnoException("Can't connect to cman service");
+ // FIXME aconway 2008-11-13: configure max wait.
+ for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) {
+ QPID_LOG(info, "Waiting for cluster quorum: " << sys::strError(errno));
+ sys::sleep(1);
}
+ if (!cman_is_quorate(cman))
+ throw ErrnoException("Timed out waiting for cluster quorum.");
}
-bool Quorum::isQuorate() { return cman_is_quorate(cman); }
+bool Quorum::isQuorate() { return enable ? cman_is_quorate(cman) : true; }
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h?rev=718961&r1=718960&r2=718961&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h Wed Nov 19
05:52:51 2008
@@ -36,7 +36,6 @@
public:
Quorum();
~Quorum();
- void addOption(Options& opts);
void init();
bool isQuorate();
@@ -48,6 +47,4 @@
}} // namespace qpid::cluster
- // namespace qpid::cluster
-
#endif /*!QPID_CLUSTER_QUORUM_CMAN_H*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h?rev=718961&r1=718960&r2=718961&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h Wed Nov 19
05:52:51 2008
@@ -28,12 +28,10 @@
class Quorum {
public:
- void init();
+ void init() {}
bool isQuorate() { return true; }
- void addOption(Options& opts) {}
};
-#endif
-
-
#endif /*!QPID_CLUSTER_QUORUM_NULL_H*/
+
+}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp?rev=718961&r1=718960&r2=718961&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp Wed
Nov 19 05:52:51 2008
@@ -22,10 +22,6 @@
*
*/
-// FIXME aconway 2008-08-11: this could be of more general interest,
-// move to common lib.
-//
-
#include "PollableCondition.h"
#include "qpid/sys/posix/PrivatePosix.h"
#include "qpid/Exception.h"