Author: aconway
Date: Mon Oct 20 06:58:23 2008
New Revision: 706293
URL: http://svn.apache.org/viewvc?rev=706293&view=rev
Log:
cluster: DumpClient updates consumer notifyEnabled and blocked.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Oct 20
06:58:23 2008
@@ -616,6 +616,11 @@
notifyEnabled = false;
}
+bool SemanticState::ConsumerImpl::isNotifyEnabld() {
+ Mutex::ScopedLock l(lock);
+ return notifyEnabled;
+}
+
void SemanticState::ConsumerImpl::notify()
{
//TODO: alter this, don't want to hold locks across external
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon Oct 20
06:58:23 2008
@@ -100,6 +100,7 @@
void disableNotify();
void enableNotify();
void notify();
+ bool isNotifyEnabld();
void setWindowMode();
void setCreditMode();
@@ -109,7 +110,8 @@
void stop();
void complete(DeliveryRecord&);
Queue::shared_ptr getQueue() { return queue; }
- bool isBlocked() const { return blocked; }
+ bool isBlocked() const { return blocked; }
+ bool setBlocked(bool set) { std::swap(set, blocked); return set; }
bool hasOutput();
bool doOutput();
@@ -150,7 +152,7 @@
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void record(const DeliveryRecord& delivery);
void checkDtxTimeout();
- ConsumerImpl& find(const std::string& destination);
+
void complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
@@ -162,6 +164,8 @@
~SemanticState();
SessionContext& getSession() { return session; }
+
+ ConsumerImpl& find(const std::string& destination);
/**
* Get named queue, never returns 0.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon Oct 20
06:58:23 2008
@@ -101,6 +101,7 @@
void readyToSend();
template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f);
}
+ SemanticState::ConsumerImpl& getConsumer(const string& dest) { return
semanticState.find(dest); }
private:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Oct 20
06:58:23 2008
@@ -22,6 +22,7 @@
#include "Cluster.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/SemanticState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
@@ -159,6 +160,13 @@
delivered(mcastDecoder.frame);
}
+void Connection::consumerState(const string& name, bool blocked, bool
notifyEnabled) {
+ broker::SessionHandler& h = connection.getChannel(currentChannel);
+ broker::SessionState* s = h.getSession();
+ broker::SemanticState::ConsumerImpl& c = s->getConsumer(name);
+ c.setBlocked(blocked);
+ if (notifyEnabled) c.enableNotify(); else c.disableNotify();
+}
void Connection::sessionState(
const SequenceNumber& replayStart,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon Oct 20
06:58:23 2008
@@ -89,6 +89,8 @@
void deliverBuffer(framing::Buffer&);
void delivered(framing::AMQFrame&);
+ void consumerState(const std::string& name, bool blocked, bool
notifyEnabled);
+
// ==== Used in catch-up mode to build initial state.
//
// State dump methods.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Mon Oct 20
06:58:23 2008
@@ -35,6 +35,8 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
+#include "qpid/framing/ClusterConnectionSessionStateBody.h"
+#include "qpid/framing/ClusterConnectionConsumerStateBody.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
@@ -227,7 +229,13 @@
shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ?
FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE,
ci->getMsgCredit());
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE,
ci->getByteCredit());
- // FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and
notifyEnabled?
+ ClusterConnectionConsumerStateBody state(
+ ProtocolVersion(),
+ ci->getName(),
+ ci->isBlocked(),
+ ci->isNotifyEnabld()
+ );
+ client::SessionBase_0_10Access(shadowSession).get()->send(state);
QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on "
<< shadowSession.getId());
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Oct 20
06:58:23 2008
@@ -239,8 +239,11 @@
cluster.add();
Client c1(cluster[1], "c1");
+ c1.session.queueDeclare("p");
c1.session.queueDeclare("q");
c1.subs.subscribe(c1.lq, "q", FlowControl::zero());
+ LocalQueue lp;
+ c1.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
c1.session.sync();
// Start new members
@@ -249,22 +252,34 @@
cluster.add();
Client c2(cluster[2], "c2");
- // Transfer a message, verify all members see it.
+ // Transfer messages
c1.session.messageTransfer(arg::content=Message("aaa", "q"));
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u);
+ c1.session.messageTransfer(arg::content=Message("bbb", "p"));
+ c1.session.messageTransfer(arg::content=Message("ccc", "p"));
+
// Activate the subscription, ensure message removed on all queues.
c1.subs.setFlowControl("q", FlowControl::unlimited());
Message m;
BOOST_CHECK(c1.lq.get(m, TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "aaa");
-
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
+ // Check second subscription's flow control: getsnn first message, not
second.
+ BOOST_CHECK(lp.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bbb");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
+
+ BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "ccc");
+
// Kill the subscribing member, ensure further messages are not removed.
cluster.killWithSilencer(1,c1.connection,9);
cluster.waitFor(2);
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=706293&r1=706292&r2=706293&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Mon Oct 20 06:58:23 2008
@@ -75,9 +75,17 @@
- send shadow-ready to mark end of shadow dump.
- send dump-complete when entire dump is complete.
-->
- <control name="session-state" code="0x4" label="Set session state during a
brain dump.">
- <!-- Target session deduced from channel number. -->
+
+ <!-- Consumer state that cannot be set by standard AMQP controls. -->
+ <control name="consumer-state" code="0x10">
+ <field name="name" type="str8"/>
+ <field name="blocked" type="bit"/>
+ <field name="notifyEnabled" type="bit"/>
+ </control>
+ <!-- Complete a session state dump. -->
+ <control name="session-state" code="0x11" label="Set session state during
a brain dump.">
+ <!-- Target session deduced from channel number. -->
<field name="replay-start" type="sequence-no"/> <!-- Replay
frames will start from this point.-->
<field name="command-point" type="sequence-no"/> <!-- Id of next
command sent -->
<field name="sent-incomplete" type="sequence-set"/> <!-- Commands
sent and incomplete. -->
@@ -88,12 +96,14 @@
<field name="received-incomplete" type="sequence-set"/> <!-- Received
and incomplete -->
</control>
- <control name="shadow-ready" code="0x5" label="End of shadow connection
dump.">
+ <!-- Complete a shadow connection dump. -->
+ <control name="shadow-ready" code="0x12" label="End of shadow connection
dump.">
<field name="member-id" type="uint64"/>
<field name="connection-id" type="uint64"/>
</control>
- <control name="membership" code="0x6" label="Cluster membership details.">
+ <!-- Complete a cluster state dump. -->
+ <control name="membership" code="0x13" label="Cluster membership details.">
<field name="newbies" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
</control>