Author: aconway
Date: Tue Oct  7 10:27:06 2008
New Revision: 702552

URL: http://svn.apache.org/viewvc?rev=702552&view=rev
Log:

broker: Fixed incorrect pass-by-reference of Queue::shared_ptr in several files.
cluster: added FailoverExchange - send cluster membership to clients.
client: added FailoverListener - receive cluster updates from failover exchange.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h   (with 
props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Oct  7 10:27:06 2008
@@ -373,9 +373,11 @@
   qpid/client/ConnectionHandler.cpp            \
   qpid/client/ConnectionImpl.cpp \
   qpid/client/ConnectionSettings.cpp           \
-  qpid/client/Connector.cpp    \
+  qpid/client/Connector.cpp                    \
   qpid/client/Demux.cpp                                \
   qpid/client/Dispatcher.cpp                   \
+  qpid/client/FailoverListener.h               \
+  qpid/client/FailoverListener.cpp             \
   qpid/client/Future.cpp                       \
   qpid/client/FutureCompletion.cpp             \
   qpid/client/FutureResult.cpp                 \
@@ -383,7 +385,7 @@
   qpid/client/LocalQueue.cpp                   \
   qpid/client/Message.cpp                      \
   qpid/client/MessageListener.cpp              \
-  qpid/client/QueueOptions.cpp         \
+  qpid/client/QueueOptions.cpp                 \
   qpid/client/Results.cpp                      \
   qpid/client/SessionBase_0_10.cpp             \
   qpid/client/SessionBase_0_10.h               \

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Tue Oct  7 10:27:06 2008
@@ -28,7 +28,9 @@
   qpid/cluster/DumpClient.h \
   qpid/cluster/DumpClient.cpp \
   qpid/cluster/ClusterMap.h \
-  qpid/cluster/ClusterMap.cpp
+  qpid/cluster/ClusterMap.cpp \
+  qpid/cluster/FailoverExchange.h \
+  qpid/cluster/FailoverExchange.cpp
 
 cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h Tue Oct  7 
10:27:06 2008
@@ -33,7 +33,7 @@
 
            virtual Message& getMessage() = 0;
            
-            virtual void deliverTo(Queue::shared_ptr& queue) = 0;
+            virtual void deliverTo(const boost::shared_ptr<Queue>& queue) = 0;
             virtual uint64_t contentSize() { return 0; }
             virtual ~Deliverable(){}
         };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp Tue 
Oct  7 10:27:06 2008
@@ -22,11 +22,11 @@
 
 using namespace qpid::broker;
 
-DeliverableMessage::DeliverableMessage(boost::intrusive_ptr<Message>& _msg) : 
msg(_msg)
+DeliverableMessage::DeliverableMessage(const boost::intrusive_ptr<Message>& 
_msg) : msg(_msg)
 {
 }
 
-void DeliverableMessage::deliverTo(Queue::shared_ptr& queue)
+void DeliverableMessage::deliverTo(const boost::shared_ptr<Queue>& queue)
 {
     queue->deliver(msg);    
     delivered = true;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h Tue Oct  
7 10:27:06 2008
@@ -32,8 +32,8 @@
         class DeliverableMessage : public Deliverable{
             boost::intrusive_ptr<Message> msg;
         public:
-            DeliverableMessage(boost::intrusive_ptr<Message>& msg);
-            virtual void deliverTo(Queue::shared_ptr& queue);
+            DeliverableMessage(const boost::intrusive_ptr<Message>& msg);
+            virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
             Message& getMessage();
             uint64_t contentSize();
             virtual ~DeliverableMessage(){}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Tue Oct  
7 10:27:06 2008
@@ -97,6 +97,10 @@
     return i->second;
 }
 
+bool ExchangeRegistry::registerExchange(const Exchange::shared_ptr& ex) {
+    return exchanges.insert(ExchangeMap::value_type(ex->getName(), ex)).second;
+}
+
 void ExchangeRegistry::registerType(const std::string& type, FactoryFunction f)
 {
     factory[type] = f;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Tue Oct  7 
10:27:06 2008
@@ -59,6 +59,11 @@
      */
     void setParent (management::Manageable* _parent) { parent = _parent; }
 
+    /** Register an exchange instance.
+     [EMAIL PROTECTED] true if registered, false if exchange with same name is 
already  registered.
+     */
+    bool registerExchange(const Exchange::shared_ptr&);
+
     void registerType(const std::string& type, FactoryFunction);
 
     /** Call f for each exchange in the registry. */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Tue 
Oct  7 10:27:06 2008
@@ -59,7 +59,7 @@
 {
     Queue::shared_ptr queue;
 public:
-    RecoverableQueueImpl(Queue::shared_ptr& _queue) : queue(_queue) {}
+    RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : 
queue(_queue) {}
     ~RecoverableQueueImpl() {};
     void setPersistenceId(uint64_t id);    
        uint64_t getPersistenceId() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Tue Oct  7 
10:27:06 2008
@@ -45,7 +45,7 @@
 void TxPublish::rollback() throw(){
 }
 
-void TxPublish::deliverTo(Queue::shared_ptr& queue){
+void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
     if (!queue->isLocal(msg)) {
         queues.push_back(queue);
         delivered = true;
@@ -57,7 +57,7 @@
 TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& 
_msg) 
     : ctxt(_ctxt), msg(_msg){}
 
-void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
+void TxPublish::Prepare::operator()(const boost::shared_ptr<Queue>& queue){
     if (!queue->enqueue(ctxt, msg)){
         /**
        * if not store then mark message for ack and deleivery once
@@ -70,7 +70,7 @@
 
 TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}
 
-void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
+void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){
     queue->process(msg);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Tue Oct  7 
10:27:06 2008
@@ -51,14 +51,14 @@
                 boost::intrusive_ptr<Message>& msg;
             public:
                 Prepare(TransactionContext* ctxt, 
boost::intrusive_ptr<Message>& msg);
-                void operator()(Queue::shared_ptr& queue);            
+                void operator()(const boost::shared_ptr<Queue>& queue);        
    
             };
 
             class Commit{
                 boost::intrusive_ptr<Message>& msg;
             public:
                 Commit(boost::intrusive_ptr<Message>& msg);
-                void operator()(Queue::shared_ptr& queue);            
+                void operator()(const boost::shared_ptr<Queue>& queue);        
    
             };
 
             boost::intrusive_ptr<Message> msg;
@@ -72,7 +72,7 @@
 
            virtual Message& getMessage() { return *msg; };
             
-            virtual void deliverTo(Queue::shared_ptr& queue);
+            virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
 
             virtual ~TxPublish(){}
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=702552&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Tue Oct  
7 10:27:06 2008
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FailoverListener.h"
+
+namespace qpid {
+namespace client {
+
+static const std::string AMQ_FAILOVER("amq.failover");
+
+FailoverListener::FailoverListener(Connection c)
+    : connection(c), session(c.newSession()), subscriptions(session)
+{
+    std::string qname=AMQ_FAILOVER + "." + session.getId().getName();
+    if (session.exchangeQuery(arg::exchange=AMQ_FAILOVER).getType().empty())
+        return;                 // Failover exchange not implemented.
+    session.queueDeclare(arg::queue=qname, arg::exclusive=true, 
arg::autoDelete=true);
+    session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
+    subscriptions.subscribe(*this, qname, FlowControl::unlimited());
+    thread = sys::Thread(subscriptions);
+}
+
+FailoverListener::~FailoverListener() {
+    subscriptions.stop();
+    if (thread.id()) thread.join();
+}
+
+void FailoverListener::received(Message& msg) {
+    sys::Mutex::ScopedLock l(lock);
+    knowBrokers.clear();
+    framing::Array urlArray;
+    msg.getHeaders().getArray("amq.failover", urlArray);
+    for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < 
urlArray.end(); ++i ) 
+        knowBrokers.push_back(Url((*i)->get<std::string>()));
+}
+
+std::vector<Url> FailoverListener::getKnownBrokers() const {
+    sys::Mutex::ScopedLock l(lock);
+    return knowBrokers;
+}
+
+}} // namespace qpid::client

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h?rev=702552&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h Tue Oct  7 
10:27:06 2008
@@ -0,0 +1,58 @@
+#ifndef QPID_CLIENT_FAILOVERLISTENER_H
+#define QPID_CLIENT_FAILOVERLISTENER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/Url.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Thread.h"
+#include <vector>
+
+namespace qpid {
+namespace client {
+
+/**
+ * @internal Listen for failover updates from the amq.failover exchange.
+ */
+class FailoverListener : public MessageListener
+{
+  public:
+    FailoverListener(Connection);
+    ~FailoverListener();
+    std::vector<Url> getKnownBrokers() const;
+    void received(Message& msg);
+    
+  private:
+    mutable sys::Mutex lock;
+    Connection connection;
+    Session session;
+    SubscriptionManager subscriptions;
+    sys::Thread thread;
+    std::vector<Url> knowBrokers;
+};
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_FAILOVERLISTENER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Oct  7 
10:27:06 2008
@@ -19,6 +19,7 @@
 #include "Cluster.h"
 #include "Connection.h"
 #include "DumpClient.h"
+#include "FailoverExchange.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/SessionState.h"
@@ -109,6 +110,7 @@
         // FIXME aconway 2008-09-24: 
         // if first cluster up set new UUID to set_clusterID() else set UUID 
of cluster being joined.
     }
+    failoverExchange.reset(new FailoverExchange(this));
     broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
     cpgDispatchHandle.startWatch(poller);
     cpg.join(name);
@@ -331,15 +333,15 @@
     Mutex::ScopedLock l(lock);
     QPID_LOG(debug, *this << " configuration change: " << AddrList(current, 
nCurrent) 
              << AddrList(left, nLeft, "( ", ")"));
-    map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
-    updateMemberStats(l);
+    bool changed = map.configChange(current, nCurrent, left, nLeft, joined, 
nJoined);
     if (state == LEFT) return;
     if (!map.isAlive(memberId)) { leave(l); return; } 
-
+    
     if(state == INIT) {    // First configChange
         if (map.aliveCount() == 1) { 
             QPID_LOG(info, *this << " first in cluster at " << myUrl);
             map = ClusterMap(memberId, myUrl, true);
+            memberUpdate(l);
             unstall(l);
         }
         else {                  // Joining established group.
@@ -348,6 +350,8 @@
             QPID_LOG(debug, *this << " send dump-request " << myUrl);
         }
     }
+    else if (state >= READY && changed) 
+        memberUpdate(l);
 }
 
 void Cluster::dumpInDone(const ClusterMap& m) {
@@ -403,8 +407,9 @@
     tryMakeOffer(id, l);
 }
 
-void Cluster::ready(const MemberId& id, const std::string& url, Lock&) {
-    map.ready(id, Url(url));
+void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
+    if (map.ready(id, Url(url)))
+        memberUpdate(l);
 }
 
 void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {
@@ -454,8 +459,8 @@
     if (state == DUMPEE && dumpedMap) {
         map = *dumpedMap;
         QPID_LOG(debug, *this << " incoming dump complete. Members: " << map);
-        unstall(l);
         mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l);
+        unstall(l);
     }
 }
 
@@ -488,28 +493,31 @@
     Lock l(lock);
     QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]");
     switch (methodId) {
-      case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break;
-      case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break;
+      case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
+      case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
       default: return Manageable::STATUS_UNKNOWN_METHOD;
     }
     return Manageable::STATUS_OK;
 }    
 
-void Cluster::stopClusterNode() {
+void Cluster::stopClusterNode(Lock&) {
     QPID_LOG(notice, *this << " stopped by admin");
     leave();
 }
 
-void Cluster::stopFullCluster() {
-    Lock l(lock);
+void Cluster::stopFullCluster(Lock& l) {
     QPID_LOG(notice, *this << " shutting down cluster " << name.str());
     mcastControl(ClusterShutdownBody(), 0, l);
 }
 
-void Cluster::updateMemberStats(Lock& l) {
+void Cluster::memberUpdate(Lock& l) {
+    std::vector<Url> vectUrl = getUrls(l);
+    size_t size = vectUrl.size();
+
+    failoverExchange->setUrls(vectUrl);
+
     if (mgmtObject) {
-        std::vector<Url> vectUrl = getUrls(l);
-        size_t size = vectUrl.size();
+
         if (lastSize != size && size == 1){
             QPID_LOG(info, *this << " last node standing, updating queue 
policies.");
             broker.getQueues().updateQueueClusterState(true);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Oct  7 
10:27:06 2008
@@ -23,6 +23,7 @@
 #include "Event.h"
 #include "NoOpConnectionOutputHandler.h"
 #include "ClusterMap.h"
+#include "FailoverExchange.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/sys/PollableQueue.h"
@@ -74,6 +75,7 @@
 
     // URLs of current cluster members.
     std::vector<Url> getUrls() const;
+    boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return 
failoverExchange; }
 
     // Leave the cluster
     void leave();
@@ -93,6 +95,11 @@
     typedef sys::PollableQueue<Event> PollableEventQueue;
     typedef std::deque<Event> PlainEventQueue;
 
+    // NB: The final Lock& parameter on functions below is used to mark 
functions
+    // that should only be called by a function that already holds the lock.
+    // The parameter makes it hard to forget since you have to have an 
instance of
+    // a Lock to call the unlocked functions.
+
     // Unlocked versions of public functions
     void mcastControl(const framing::AMQBody& controlBody, Connection* cptr, 
Lock&);
     void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, 
Lock&);
@@ -145,9 +152,10 @@
 
     virtual qpid::management::ManagementObject* GetManagementObject() const;
     virtual management::Manageable::status_t ManagementMethod (uint32_t 
methodId, management::Args& args, std::string& text);
-    void stopClusterNode();
-    void stopFullCluster();
-    void updateMemberStats(Lock&);
+
+    void stopClusterNode(Lock&);
+    void stopFullCluster(Lock&);
+    void memberUpdate(Lock&);
 
     // Called in connection IO threads .
     void checkDumpIn(Lock&);
@@ -181,6 +189,7 @@
     boost::optional<ClusterMap> dumpedMap;
     
     size_t lastSize;
+    boost::shared_ptr<FailoverExchange> failoverExchange;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
   friend class ClusterDispatcher;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Tue Oct  7 
10:27:06 2008
@@ -72,18 +72,20 @@
     std::for_each(members.begin(), members.end(), boost::bind(&insertSet, 
boost::ref(alive), _1));
 }
 
-void ClusterMap::configChange(
+bool ClusterMap::configChange(
     cpg_address *current, int nCurrent,
     cpg_address *left, int nLeft,
     cpg_address */*joined*/, int /*nJoined*/)
 {
     cpg_address* a;
+    bool memberChange=false;
     for (a = left; a != left+nLeft; ++a) {
-        members.erase(*a);
+        memberChange = members.erase(*a);
         newbies.erase(*a);
     }
     alive.clear();
     std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
+    return memberChange;
 }
 
 Url ClusterMap::getUrl(const Map& map, const  MemberId& id) {
@@ -133,8 +135,8 @@
     return false;
 }
 
-void ClusterMap::ready(const MemberId& id, const Url& url) {
-    if (isAlive(id)) members[id] = url;
+bool ClusterMap::ready(const MemberId& id, const Url& url) {
+    return isAlive(id) &&  members.insert(Map::value_type(id,url)).second;
 }
 
 boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const 
MemberId& to) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Tue Oct  7 
10:27:06 2008
@@ -50,8 +50,10 @@
     ClusterMap(const MemberId& id, const Url& url, bool isReady);
     ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& 
states);
 
-    /** Update from config change. */
-    void configChange(
+    /** Update from config change.
+     [EMAIL PROTECTED] true if member set changed.
+     */
+    bool configChange(
         cpg_address *current, int nCurrent,
         cpg_address *left, int nLeft,
         cpg_address *joined, int nJoined);
@@ -76,7 +78,9 @@
     bool dumpRequest(const MemberId& id, const std::string& url);       
     /** Return non-empty Url if accepted */
     boost::optional<Url> dumpOffer(const MemberId& from, const MemberId& to);
-    void ready(const MemberId& id, const Url&);
+
+    /[EMAIL PROTECTED] true If this is a new member */ 
+    bool ready(const MemberId& id, const Url&);
 
   private:
     Url getUrl(const Map& map, const  MemberId& id);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue Oct  7 
10:27:06 2008
@@ -34,6 +34,7 @@
 namespace cluster {
 
 using namespace std;
+using broker::Broker;
 
 struct ClusterValues {
     string name;
@@ -74,12 +75,14 @@
     Options* getOptions() { return &options; }
 
     void initialize(Plugin::Target& target) {
-        broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
-        if (!broker || values.name.empty()) return;  // Only if --cluster-name 
option was specified.
+        if (values.name.empty()) return; // Only if --cluster-name option was 
specified.
+        Broker* broker = dynamic_cast<Broker*>(&target);
+        if (!broker) return;
         cluster = new Cluster(values.name, values.getUrl(broker->getPort()), 
*broker);
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), 
*cluster)));
+        
broker->getExchanges().registerExchange(cluster->getFailoverExchange());
     }
 
     void earlyInitialize(Plugin::Target&) {}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp?rev=702552&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp Tue Oct 
 7 10:27:06 2008
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FailoverExchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/Array.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+using namespace std;
+
+using namespace broker;
+using namespace framing;
+
+const string FailoverExchange::TYPE_NAME("amq.failover");
+
+FailoverExchange::FailoverExchange(management::Manageable* parent) : 
Exchange(TYPE_NAME, parent) {
+    if (mgmtExchange != 0)
+        mgmtExchange->set_type(TYPE_NAME);
+}
+
+
+void FailoverExchange::setUrls(const vector<Url>& u) {
+    Lock l(lock);
+    urls=u;
+    if (urls.empty()) return;
+    std::for_each(queues.begin(), queues.end(),
+                  boost::bind(&FailoverExchange::sendUpdate, this, _1));
+}
+
+string FailoverExchange::getType() const { return TYPE_NAME; }
+
+bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const 
framing::FieldTable*) {
+    Lock l(lock);
+    sendUpdate(queue);
+    return queues.insert(queue).second;
+}
+
+bool FailoverExchange::unbind(Queue::shared_ptr queue, const string&, const 
framing::FieldTable*) {
+    Lock l(lock);
+    return queues.erase(queue);
+}
+
+bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, 
const framing::FieldTable*) {
+    Lock l(lock);
+    return queues.find(queue) != queues.end();
+}
+
+void FailoverExchange::route(Deliverable&, const string& , const 
framing::FieldTable* ) {
+    QPID_LOG(warning, "Message received by exchange " << TYPE_NAME << " 
ignoring");
+}
+
+void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
+    // Called with lock held.
+    if (urls.empty()) return;
+    framing::Array array(0x95); // FIXME aconway 2008-10-06: Array is unusable 
like this. Need type constants or better mapping.
+    for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) 
+        array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
+    const ProtocolVersion v;
+    boost::intrusive_ptr<Message> msg(new Message);
+    AMQFrame command(MessageTransferBody(v, TYPE_NAME, 1, 0));
+    command.setLastSegment(false);
+    msg->getFrames().append(command);
+    AMQHeaderBody header;
+    header.get<MessageProperties>(true)->setContentLength(0);
+    
header.get<MessageProperties>(true)->getApplicationHeaders().setArray(TYPE_NAME,
 array);
+    AMQFrame headerFrame(header);
+    headerFrame.setFirstSegment(false);
+    msg->getFrames().append(headerFrame);    
+    DeliverableMessage(msg).deliverTo(queue);
+}
+
+}} // namespace cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h?rev=702552&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h Tue Oct  
7 10:27:06 2008
@@ -0,0 +1,68 @@
+#ifndef QPID_CLUSTER_FAILOVEREXCHANGE_H
+#define QPID_CLUSTER_FAILOVEREXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/Url.h"
+
+#include <vector>
+#include <set>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Failover exchange provides failover host list, as specified in AMQP 0-10.
+ */
+class FailoverExchange : public broker::Exchange
+{
+  public:
+    static const std::string TYPE_NAME;
+
+    FailoverExchange(management::Manageable* parent);
+    
+    void setUrls(const std::vector<Url>&);
+
+    // Exchange overrides
+    std::string getType() const;
+    bool bind(broker::Queue::shared_ptr queue, const std::string& routingKey, 
const framing::FieldTable* args);
+    bool unbind(broker::Queue::shared_ptr queue, const std::string& 
routingKey, const framing::FieldTable* args);
+    bool isBound(broker::Queue::shared_ptr queue, const std::string* const 
routingKey, const framing::FieldTable* const args);
+    void route(broker::Deliverable& msg, const std::string& routingKey, const 
framing::FieldTable* args);
+
+  private:
+    void sendUpdate(const broker::Queue::shared_ptr&);
+    
+    typedef sys::Mutex::ScopedLock Lock;
+    typedef std::vector<Url> Urls;
+    typedef std::set<broker::Queue::shared_ptr> Queues;
+
+    sys::Mutex lock;
+    Urls urls;
+    Queues queues;
+    
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_FAILOVEREXCHANGE_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp Tue Oct  7 
10:27:06 2008
@@ -117,7 +117,7 @@
 void Array::add(ValuePtr value)
 {
     if (typeOctet != value->getType()) {
-        throw IllegalArgumentException(QPID_MSG("Wrong type of value, expected 
" << typeOctet));
+        throw IllegalArgumentException(QPID_MSG("Wrong type of value in Array, 
expected " << typeOctet << " but found " << value->getType()));
     }
     values.push_back(value);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h Tue Oct  7 10:27:06 
2008
@@ -61,13 +61,13 @@
         }
     }
     
+    ValueVector::const_iterator begin() const { return values.begin(); }
+    ValueVector::const_iterator end() const { return values.end(); }
+
   private:
     uint8_t typeOctet;
     ValueVector values;
 
-    ValueVector::const_iterator begin() const { return values.begin(); }
-    ValueVector::const_iterator end() const { return values.end(); }
-
     friend std::ostream& operator<<(std::ostream& out, const Array& body);
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Tue Oct  7 
10:27:06 2008
@@ -33,7 +33,7 @@
 void FrameSet::append(const AMQFrame& part)
 {
     parts.push_back(part);
-       recalculateSize = true;
+    recalculateSize = true;
 }
 
 bool FrameSet::isComplete() const

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Oct  7 10:27:06 
2008
@@ -57,7 +57,7 @@
 {
     Message msg;
 public:
-    void deliverTo(Queue::shared_ptr& queue)
+    void deliverTo(const boost::shared_ptr<Queue>& queue)
     {
         throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=702552&r1=702551&r2=702552&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Oct  7 
10:27:06 2008
@@ -23,6 +23,7 @@
 
 #include "qpid/client/Connection.h"
 #include "qpid/client/Session.h"
+#include "qpid/client/FailoverListener.h"
 #include "qpid/cluster/Cluster.h"
 #include "qpid/cluster/Cpg.h"
 #include "qpid/cluster/DumpClient.h"
@@ -38,7 +39,9 @@
 #include <iostream>
 #include <iterator>
 #include <vector>
+#include <set>
 #include <algorithm>
+#include <iterator>
 
 namespace qpid {
 namespace cluster {
@@ -46,6 +49,12 @@
 }} // namespace qpid::cluster
 
 
+namespace std {                 // ostream operators in std:: namespace
+template <class T>
+ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); 
}
+}
+
+
 QPID_AUTO_TEST_SUITE(cluster)
 
 using namespace std;
@@ -88,11 +97,8 @@
     }
 
     void waitFor(size_t n) {
-        size_t retry=1000;            // TODO aconway 2008-07-16: nasty 
sleeps, clean this up.
-        while (retry && getGlobalCluster().getUrls().size() != n) {
+        for (size_t retry = 1000; retry && getGlobalCluster().getUrls().size() 
!= n; --retry)
             ::usleep(1000);
-            --retry;
-        }
     }
 };
 
@@ -164,6 +170,75 @@
     return o;
 }
 
+template <class C> set<uint16_t> makeSet(const C& c) {
+    set<uint16_t> s;
+    std::copy(c.begin(), c.end(), std::inserter(s, s.begin()));
+    return s;
+}
+
+std::set<uint16_t> portsFromFailoverArray(const framing::Array& urlArray) {
+    std::set<uint16_t> ports;
+    for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < 
urlArray.end(); ++i ) {
+        Url url((*i)->get<std::string>());
+        BOOST_REQUIRE(url.size() > 0);
+        BOOST_REQUIRE(url[0].get<TcpAddress>());
+        ports.insert(url[0].get<TcpAddress>()->port);
+    }
+    return ports;
+}
+
+std::set<uint16_t> portsFromFailoverMessage(const Message& m) {
+    framing::Array urlArray;
+    m.getHeaders().getArray("amq.failover", urlArray);
+    return portsFromFailoverArray(urlArray);
+}
+
+QPID_AUTO_TEST_CASE(FailoverExchange) {
+    ClusterFixture cluster(2);
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare("q");
+    c0.session.exchangeBind(arg::queue="q", arg::exchange="amq.failover");
+
+    Message m;
+    BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverMessage(m));
+
+    cluster.add();
+    BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK_EQUAL(makeSet(cluster),portsFromFailoverMessage(m));
+}
+
+std::set<uint16_t> portsFromFailoverListener(const FailoverListener& fl, 
size_t n) {
+    // Wait till there are n ports in the list.
+    vector<Url> kb = fl.getKnownBrokers();
+    for (size_t retry=1000; kb.size() != n && retry != 0; --retry) {
+        ::usleep(1000);
+        kb = fl.getKnownBrokers();
+    }
+    set<uint16_t> s;
+    for (vector<Url>::const_iterator i = kb.begin(); i != kb.end(); ++i) {
+        BOOST_MESSAGE("Failover URL: " << *i);      
+        BOOST_CHECK(i->size() >= 1);
+        BOOST_CHECK((*i)[0].get<TcpAddress>());
+        s.insert((*i)[0].get<TcpAddress>()->port);
+    }
+    return s;
+}
+
+QPID_AUTO_TEST_CASE(testFailoverListener) {
+    ClusterFixture cluster(1);
+    Client c0(cluster[0], "c0");
+    FailoverListener fl(c0.connection);
+
+    set<uint16_t> set0=makeSet(cluster);
+
+    BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1));
+    cluster.add();
+    BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverListener(fl, 2));
+    cluster.kill(1);
+    BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1));
+}
+
 QPID_AUTO_TEST_CASE(DumpConsumers) {
     ClusterFixture cluster(1); 
     Client c0(cluster[0], "c0"); 


Reply via email to