Author: aconway
Date: Mon Oct 20 06:58:23 2008
New Revision: 706293

URL: http://svn.apache.org/viewvc?rev=706293&view=rev
Log:
cluster: DumpClient updates consumer notifyEnabled and blocked.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Oct 20 
06:58:23 2008
@@ -616,6 +616,11 @@
     notifyEnabled = false;
 }
 
+bool SemanticState::ConsumerImpl::isNotifyEnabld() {
+    Mutex::ScopedLock l(lock);
+    return notifyEnabled;
+}
+
 void SemanticState::ConsumerImpl::notify()
 {
     //TODO: alter this, don't want to hold locks across external

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon Oct 20 
06:58:23 2008
@@ -100,6 +100,7 @@
         void disableNotify();
         void enableNotify();
         void notify();
+        bool isNotifyEnabld();
 
         void setWindowMode();
         void setCreditMode();
@@ -109,7 +110,8 @@
         void stop();
         void complete(DeliveryRecord&);    
         Queue::shared_ptr getQueue() { return queue; }
-        bool isBlocked() const { return blocked; }        
+        bool isBlocked() const { return blocked; }
+        bool setBlocked(bool set) { std::swap(set, blocked); return set; }
 
         bool hasOutput();
         bool doOutput();
@@ -150,7 +152,7 @@
     void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
     void record(const DeliveryRecord& delivery);
     void checkDtxTimeout();
-    ConsumerImpl& find(const std::string& destination);
+
     void complete(DeliveryRecord&);
     AckRange findRange(DeliveryId first, DeliveryId last);
     void requestDispatch();
@@ -162,6 +164,8 @@
     ~SemanticState();
 
     SessionContext& getSession() { return session; }
+
+    ConsumerImpl& find(const std::string& destination);
     
     /**
      * Get named queue, never returns 0.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon Oct 20 
06:58:23 2008
@@ -101,6 +101,7 @@
     void readyToSend();
 
     template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f); 
}
+    SemanticState::ConsumerImpl& getConsumer(const string& dest) { return 
semanticState.find(dest); } 
     
   private:
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Oct 20 
06:58:23 2008
@@ -22,6 +22,7 @@
 #include "Cluster.h"
 
 #include "qpid/broker/SessionState.h"
+#include "qpid/broker/SemanticState.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AllInvoker.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
@@ -159,6 +160,13 @@
         delivered(mcastDecoder.frame);
 }
 
+void Connection::consumerState(const string& name, bool blocked, bool 
notifyEnabled) {
+    broker::SessionHandler& h = connection.getChannel(currentChannel);
+    broker::SessionState* s = h.getSession();
+    broker::SemanticState::ConsumerImpl& c = s->getConsumer(name);
+    c.setBlocked(blocked);
+    if (notifyEnabled) c.enableNotify(); else c.disableNotify();
+}
 
 void Connection::sessionState(
     const SequenceNumber& replayStart,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon Oct 20 
06:58:23 2008
@@ -89,6 +89,8 @@
     void deliverBuffer(framing::Buffer&);
     void delivered(framing::AMQFrame&);
 
+    void consumerState(const std::string& name, bool blocked, bool 
notifyEnabled);
+    
     // ==== Used in catch-up mode to build initial state.
     // 
     // State dump methods.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Mon Oct 20 
06:58:23 2008
@@ -35,6 +35,8 @@
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/ClusterConnectionMembershipBody.h"
 #include "qpid/framing/ClusterConnectionShadowReadyBody.h"
+#include "qpid/framing/ClusterConnectionSessionStateBody.h"
+#include "qpid/framing/ClusterConnectionConsumerStateBody.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/log/Statement.h"
@@ -227,7 +229,13 @@
     shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? 
FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
     shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, 
ci->getMsgCredit());
     shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, 
ci->getByteCredit());
-    // FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and 
notifyEnabled?
+    ClusterConnectionConsumerStateBody state(
+        ProtocolVersion(),
+        ci->getName(),
+        ci->isBlocked(),
+        ci->isNotifyEnabld()
+    );
+    client::SessionBase_0_10Access(shadowSession).get()->send(state);
     QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " 
<< shadowSession.getId());
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Oct 20 
06:58:23 2008
@@ -239,8 +239,11 @@
 
     cluster.add();
     Client c1(cluster[1], "c1"); 
+    c1.session.queueDeclare("p");
     c1.session.queueDeclare("q");
     c1.subs.subscribe(c1.lq, "q", FlowControl::zero());
+    LocalQueue lp;
+    c1.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
     c1.session.sync();
 
     // Start new members
@@ -249,22 +252,34 @@
     cluster.add();
     Client c2(cluster[2], "c2"); 
 
-    // Transfer a message, verify all members see it.
+    // Transfer messages
     c1.session.messageTransfer(arg::content=Message("aaa", "q"));
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u);
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u);
     BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u);
 
+    c1.session.messageTransfer(arg::content=Message("bbb", "p"));
+    c1.session.messageTransfer(arg::content=Message("ccc", "p"));
+    
     // Activate the subscription, ensure message removed on all queues. 
     c1.subs.setFlowControl("q", FlowControl::unlimited());
     Message m;
     BOOST_CHECK(c1.lq.get(m, TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "aaa");
-
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
     BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
 
+    // Check second subscription's flow control: getsnn first message, not 
second.
+    BOOST_CHECK(lp.get(m, TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "bbb");
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
+
+    BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "ccc");
+    
     // Kill the subscribing member, ensure further messages are not removed.
     cluster.killWithSilencer(1,c1.connection,9);
     cluster.waitFor(2);

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Mon Oct 20 06:58:23 2008
@@ -75,9 +75,17 @@
         - send shadow-ready to mark end of shadow dump.
         - send dump-complete when entire dump is complete.
     -->
-    <control name="session-state" code="0x4" label="Set session state during a 
brain dump.">
-      <!-- Target session deduced from channel number.  -->
+    
+    <!-- Consumer state that cannot be set by standard AMQP controls. -->
+    <control name="consumer-state" code="0x10">
+      <field name="name" type="str8"/>
+      <field name="blocked" type="bit"/>
+      <field name="notifyEnabled" type="bit"/>
+    </control>
 
+    <!-- Complete a session state dump. -->
+    <control name="session-state" code="0x11" label="Set session state during 
a brain dump.">
+      <!-- Target session deduced from channel number.  -->
       <field name="replay-start" type="sequence-no"/>         <!-- Replay 
frames will start from this point.-->
       <field name="command-point" type="sequence-no"/>        <!-- Id of next 
command sent -->
       <field name="sent-incomplete" type="sequence-set"/>      <!-- Commands 
sent and incomplete. -->
@@ -88,12 +96,14 @@
       <field name="received-incomplete" type="sequence-set"/>  <!-- Received 
and incomplete -->
     </control>
 
-    <control name="shadow-ready" code="0x5" label="End of shadow connection 
dump.">
+    <!-- Complete a shadow connection dump. -->
+    <control name="shadow-ready" code="0x12" label="End of shadow connection 
dump.">
       <field name="member-id" type="uint64"/>
       <field name="connection-id" type="uint64"/>
     </control>
 
-    <control name="membership" code="0x6" label="Cluster membership details.">
+    <!-- Complete a cluster state dump. -->
+    <control name="membership" code="0x13" label="Cluster membership details.">
       <field name="newbies" type="map"/> <!-- member-id -> URL -->
       <field name="members" type="map"/> <!-- member-id -> state -->
     </control>


Reply via email to