Author: aconway
Date: Tue Aug 5 09:24:25 2008
New Revision: 682774
URL: http://svn.apache.org/viewvc?rev=682774&view=rev
Log:
Fix sporadic shutdown hang in clustered broker.
Add start|stop_cluster test scripts
Added:
incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
- copied, changed from r671968,
incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
incubator/qpid/trunk/qpid/cpp/src/tests/stop_cluster
- copied, changed from r671968,
incubator/qpid/trunk/qpid/cpp/src/tests/stop_cluster
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=682774&r1=682773&r2=682774&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Aug 5
09:24:25 2008
@@ -292,11 +292,11 @@
// call any function that is not async-signal safe.
// Any unsafe shutdown actions should be done in the destructor.
poller->shutdown();
- finalize(); // Finalize any plugins.
}
Broker::~Broker() {
shutdown();
+ finalize(); // Finalize any plugins.
delete store;
if (config.auth) {
#if HAVE_SASL
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=682774&r1=682773&r2=682774&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Aug 5
09:24:25 2008
@@ -56,8 +56,8 @@
}
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b)
:
- cpg(*this),
broker(&b),
+ cpg(*this),
name(name_),
url(url_),
self(cpg.self())
@@ -75,10 +75,7 @@
}
}
-Cluster::~Cluster() {
- cpg.shutdown();
- dispatcher.join();
-}
+Cluster::~Cluster() {}
// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
@@ -88,16 +85,16 @@
}
void Cluster::leave() {
- if (!broker.get()) return; // Already left
- QPID_LOG(info, QPID_MSG("Leaving cluster " << *this));
- // Must not be called in the dispatch thread.
- assert(Thread::current().id() != dispatcher.id());
+ Mutex::ScopedLock l(lock);
+ if (!broker) return; // Already left.
+ assert(Thread::current().id() != dispatcher.id()); // Must not be called
in the dispatch thread.
+ QPID_LOG(debug, "Leaving cluster " << *this);
cpg.leave(name);
- // Wait till final config-change is delivered and broker is released.
- {
- Mutex::ScopedLock l(lock);
- while(broker.get()) lock.wait();
- }
+ // The dispatch thread sets broker=0 when the final config-change
+ // is delivered.
+ while(broker) lock.wait();
+ cpg.shutdown();
+ dispatcher.join();
}
template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -115,7 +112,6 @@
// FIXME aconway 2008-07-03: More efficient buffer management.
// Cache coded form of decoded frames for re-encoding?
Buffer buf(buffer);
- assert(frame.size() + 64 < sizeof(buffer));
frame.encode(buf);
encodePtr(buf, connection);
iovec iov = { buffer, buf.getPosition() };
@@ -145,6 +141,7 @@
if (i == shadowConnectionMap.end()) { // A new shadow connection.
std::ostringstream os;
os << name << ":" << member << ":" << remotePtr;
+ assert(broker);
broker::Connection* c = new broker::Connection(&shadowOut, *broker,
os.str());
ShadowConnectionMap::value_type value(id, new
ConnectionInterceptor(*c, *this, id));
i = shadowConnectionMap.insert(value).first;
@@ -169,8 +166,8 @@
decodePtr(buf, connection);
QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " <<
frame);
- if (!broker.get()) {
- QPID_LOG(warning, "Ignoring late DLVR, already left the cluster.");
+ if (!broker) {
+ QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
return;
}
@@ -232,11 +229,8 @@
QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left,
" << nJoined << " joined):"
<< members);
assert(members.size() == size_t(nCurrent));
- if (members.find(self) == members.end()) {
- QPID_LOG(debug, "Left cluster " << *this);
- broker = 0; // Release broker reference.
- }
-
+ if (members.find(self) == members.end())
+ broker = 0; // We have left the group, this is the final config
change.
lock.notifyAll(); // Threads waiting for membership changes.
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=682774&r1=682773&r2=682774&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Aug 5
09:24:25 2008
@@ -122,8 +122,8 @@
ConnectionInterceptor* getShadowConnection(const Cpg::Id&, void*);
mutable sys::Monitor lock; // Protect access to members.
+ broker::Broker* broker;
Cpg cpg;
- boost::intrusive_ptr<broker::Broker> broker;
Cpg::Name name;
Url url;
MemberMap members;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=682774&r1=682773&r2=682774&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Tue Aug 5 09:24:25 2008
@@ -12,7 +12,7 @@
# ais_check checks pre-requisites for cluster tests and runs them if ok.
TESTS+=ais_check
-EXTRA_DIST+=ais_check
+EXTRA_DIST+=ais_check start_cluster stop_cluster
check_PROGRAMS+=cluster_test
cluster_test_SOURCES=unit_test.cpp cluster_test.cpp
Copied: incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster (from r671968,
incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster)
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster?p2=incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster&p1=incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster&r1=671968&r2=682774&rev=682774&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster Tue Aug 5 09:24:25
2008
@@ -1,7 +1,7 @@
#!/bin/sh
-# Start a cluster of brokers on local host.
-# Print the cluster's URL.
+# Start a cluster of brokers on local host, put the list of ports for cluster
members in cluster.ports
#
+echo $1 | grep '^[0-9][0-9]*$' > /dev/null || { echo "Usage: $0 cluster-size
[options]"; exit 1; }
# Execute command with the ais group set.
with_ais_group() {
@@ -10,17 +10,14 @@
}
test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; }
-test -z "$*" && { echo "Usage: $0 cluster-size [options]"; exit 1; }
-
-rm -f cluster*.log cluster.ports
-SIZE=$1
-shift
-CLUSTER=`whoami` # Cluster name=user name, avoid clashes.
-OPTS="--load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log
--cluster-name=$CLUSTER --no-data-dir --auth=no $*"
+rm -f cluster*.log
+SIZE=$1; shift
+CLUSTER=`pwd` # Cluster name=pwd, avoid clashes.
for (( i=0; i<SIZE; ++i )); do
+ OPTS="--load-module ../.libs/libqpidcluster.so -dp0
--log-output=cluster$i.log --cluster-name=$CLUSTER --no-data-dir --auth=no $*"
PORT=`with_ais_group ../qpidd $OPTS` || exit 1
echo $PORT >> cluster.ports
done
-
+
Copied: incubator/qpid/trunk/qpid/cpp/src/tests/stop_cluster (from r671968,
incubator/qpid/trunk/qpid/cpp/src/tests/stop_cluster)
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/stop_cluster?p2=incubator/qpid/trunk/qpid/cpp/src/tests/stop_cluster&p1=incubator/qpid/trunk/qpid/cpp/src/tests/stop_cluster&r1=671968&r2=682774&rev=682774&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/stop_cluster (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/stop_cluster Tue Aug 5 09:24:25
2008
@@ -1,7 +1,6 @@
#!/bin/sh
# Stop brokers on ports listed in cluster.ports
-
PORTS=`cat cluster.ports`
for PORT in $PORTS ; do
../qpidd -qp $PORT || ERROR="$ERROR $PORT"