Author: cctrieloff
Date: Mon Nov 3 13:06:22 2008
New Revision: 710157
URL: http://svn.apache.org/viewvc?rev=710157&view=rev
Log:
correction for Active-Active clustering, allowing late joining nodes in the
cluster to sync counter values for sequenced messages
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=710157&r1=710156&r2=710157&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Nov 3
13:06:22 2008
@@ -152,7 +152,9 @@
buffer.getShortString(type);
buffer.get(args);
- return exchanges.declare(name, type, durable, args).first;
+ Exchange::shared_ptr exch = exchanges.declare(name, type, durable,
args).first;
+ exch->sequenceNo = buffer.getInt64();
+ return exch;
}
void Exchange::encode(Buffer& buffer) const
@@ -161,6 +163,7 @@
buffer.putOctet(durable);
buffer.putShortString(getType());
buffer.put(args);
+ buffer.putInt64(sequenceNo);
}
uint32_t Exchange::encodedSize() const
@@ -168,7 +171,8 @@
return name.size() + 1/*short string size*/
+ 1 /*durable*/
+ getType().size() + 1/*short string size*/
- + args.encodedSize();
+ + args.encodedSize()
+ + 8; /*int64 */
}
ManagementObject* Exchange::GetManagementObject (void) const
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=710157&r1=710156&r2=710157&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon Nov 3
13:06:22 2008
@@ -50,7 +50,7 @@
protected:
bool sequence;
mutable qpid::sys::Mutex sequenceLock;
- uint64_t sequenceNo;
+ int64_t sequenceNo;
bool ive;
boost::intrusive_ptr<Message> lastMsg;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=710157&r1=710156&r2=710157&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Mon Nov 3
13:06:22 2008
@@ -179,48 +179,65 @@
{
FieldTable args;
args.setInt("qpid.msg_sequence",1);
-
- DirectExchange direct("direct1", false, args);
-
- intrusive_ptr<Message> msg1 = cmessage("e", "A");
- intrusive_ptr<Message> msg2 = cmessage("e", "B");
- intrusive_ptr<Message> msg3 = cmessage("e", "C");
-
- DeliverableMessage dmsg1(msg1);
- DeliverableMessage dmsg2(msg2);
- DeliverableMessage dmsg3(msg3);
-
- direct.route(dmsg1, "abc", 0);
- direct.route(dmsg2, "abc", 0);
- direct.route(dmsg3, "abc", 0);
+ char* buff = new char[10000];
+ framing::Buffer buffer(buff,10000);
+ {
+ DirectExchange direct("direct1", false, args);
+
+ intrusive_ptr<Message> msg1 = cmessage("e", "A");
+ intrusive_ptr<Message> msg2 = cmessage("e", "B");
+ intrusive_ptr<Message> msg3 = cmessage("e", "C");
+
+ DeliverableMessage dmsg1(msg1);
+ DeliverableMessage dmsg2(msg2);
+ DeliverableMessage dmsg3(msg3);
+
+ direct.route(dmsg1, "abc", 0);
+ direct.route(dmsg2, "abc", 0);
+ direct.route(dmsg3, "abc", 0);
+
+ BOOST_CHECK_EQUAL(1,
msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ BOOST_CHECK_EQUAL(2,
msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ BOOST_CHECK_EQUAL(3,
msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+ FanOutExchange fanout("fanout1", false, args);
+ HeadersExchange header("headers1", false, args);
+ TopicExchange topic ("topic1", false, args);
+
+ // check other exchanges, that they preroute
+ intrusive_ptr<Message> msg4 = cmessage("e", "A");
+ intrusive_ptr<Message> msg5 = cmessage("e", "B");
+ intrusive_ptr<Message> msg6 = cmessage("e", "C");
+
+ DeliverableMessage dmsg4(msg4);
+ DeliverableMessage dmsg5(msg5);
+ DeliverableMessage dmsg6(msg6);
+
+ fanout.route(dmsg4, "abc", 0);
+ BOOST_CHECK_EQUAL(1,
msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+ FieldTable headers;
+ header.route(dmsg5, "abc", &headers);
+ BOOST_CHECK_EQUAL(1,
msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+ topic.route(dmsg6, "abc", 0);
+ BOOST_CHECK_EQUAL(1,
msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ direct.encode(buffer);
+ }
+ {
+
+ ExchangeRegistry exchanges;
+ buffer.reset();
+ DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges,
buffer);
+
+ intrusive_ptr<Message> msg1 = cmessage("e", "A");
+ DeliverableMessage dmsg1(msg1);
+ exch_dec->route(dmsg1, "abc", 0);
- BOOST_CHECK_EQUAL(1,
msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
- BOOST_CHECK_EQUAL(2,
msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
- BOOST_CHECK_EQUAL(3,
msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ BOOST_CHECK_EQUAL(4,
msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
- FanOutExchange fanout("fanout1", false, args);
- HeadersExchange header("headers1", false, args);
- TopicExchange topic ("topic1", false, args);
-
- // check other exchanges, that they preroute
- intrusive_ptr<Message> msg4 = cmessage("e", "A");
- intrusive_ptr<Message> msg5 = cmessage("e", "B");
- intrusive_ptr<Message> msg6 = cmessage("e", "C");
-
- DeliverableMessage dmsg4(msg4);
- DeliverableMessage dmsg5(msg5);
- DeliverableMessage dmsg6(msg6);
-
- fanout.route(dmsg4, "abc", 0);
- BOOST_CHECK_EQUAL(1,
msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-
- FieldTable headers;
- header.route(dmsg5, "abc", &headers);
- BOOST_CHECK_EQUAL(1,
msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-
- topic.route(dmsg6, "abc", 0);
- BOOST_CHECK_EQUAL(1,
msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-
+ }
+ delete [] buff;
}
QPID_AUTO_TEST_CASE(testIVEOption)