Author: cctrieloff
Date: Thu Oct 16 18:27:45 2008
New Revision: 705443
URL: http://svn.apache.org/viewvc?rev=705443&view=rev
Log:
Feature requested by AndrewM for M4...
- provide initial value support, for late joining consumers
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Oct 16
18:27:45 2008
@@ -43,17 +43,20 @@
}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey,
const FieldTable*){
- Mutex::ScopedLock l(lock);
- Binding::shared_ptr b(new Binding (routingKey, queue, this));
- if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ {
+ Mutex::ScopedLock l(lock);
+ Binding::shared_ptr b(new Binding (routingKey, queue, this));
+ if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ ((_qmf::Queue*)
queue->GetManagementObject())->inc_bindingCount();
+ }
+ } else {
+ return false;
}
- return true;
- } else {
- return false;
}
+ routeIVE();
+ return true;
}
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey,
const FieldTable* /*args*/){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Oct 16
18:27:45 2008
@@ -24,6 +24,7 @@
#include "qpid/agent/ManagementAgent.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/MessageProperties.h"
+#include "DeliverableMessage.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -38,26 +39,41 @@
namespace
{
const std::string qpidMsgSequence("qpid.msg_sequence");
+const std::string qpidIVE("qpid.ive");
}
Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
- if (parent && parent->sequence){
- parent->sequenceLock.lock();
- parent->sequenceNo++;
-
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+ if (parent){
+ if (parent->sequence || parent->ive) parent->sequenceLock.lock();
+
+ if (parent->sequence){
+ parent->sequenceNo++;
+
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+ }
+ if (parent->ive) {
+ parent->lastMsg = &( msg.getMessage());
+ }
}
}
Exchange::PreRoute::~PreRoute(){
- if (parent && parent->sequence){
+ if (parent && (parent->sequence || parent->ive)){
parent->sequenceLock.unlock();
}
}
+void Exchange::routeIVE(){
+ if (ive && lastMsg.get()){
+ DeliverableMessage dmsg(lastMsg);
+ route(dmsg, lastMsg->getRoutingKey(),
lastMsg->getApplicationHeaders());
+ }
+}
+
+
Exchange::Exchange (const string& _name, Manageable* parent) :
name(_name), durable(false), persistenceId(0), sequence(false),
- sequenceNo(0), mgmtExchange(0)
+ sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -73,7 +89,7 @@
Exchange::Exchange(const string& _name, bool _durable, const
qpid::framing::FieldTable& _args,
Manageable* parent)
: name(_name), durable(_durable), args(_args), alternateUsers(0),
persistenceId(0),
- sequence(false), sequenceNo(0), mgmtExchange(0)
+ sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -95,6 +111,8 @@
sequence = _args.get(qpidMsgSequence);
if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg
sequencing");
+ ive = _args.get(qpidIVE);
+ if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial
Value");
}
Exchange::~Exchange ()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Oct 16
18:27:45 2008
@@ -51,6 +51,8 @@
bool sequence;
mutable qpid::sys::Mutex sequenceLock;
uint64_t sequenceNo;
+ bool ive;
+ boost::intrusive_ptr<Message> lastMsg;
class PreRoute{
public:
@@ -60,6 +62,8 @@
Exchange* parent;
};
+ void routeIVE();
+
struct Binding : public management::Manageable {
typedef boost::shared_ptr<Binding> shared_ptr;
typedef std::vector<Binding::shared_ptr> vector;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu Oct 16
18:27:45 2008
@@ -49,6 +49,7 @@
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
+ routeIVE();
return true;
} else {
return false;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Thu Oct
16 18:27:45 2008
@@ -84,6 +84,7 @@
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
+ routeIVE();
return true;
} else {
return false;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Oct 16
18:27:45 2008
@@ -131,19 +131,22 @@
}
bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey,
const FieldTable* /*args*/){
- RWlock::ScopedWlock l(lock);
- TopicPattern routingPattern(routingKey);
- if (isBound(queue, routingPattern)) {
- return false;
- } else {
- Binding::shared_ptr binding (new Binding (routingKey, queue, this));
- bindings[routingPattern].push_back(binding);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ {
+ RWlock::ScopedWlock l(lock);
+ TopicPattern routingPattern(routingKey);
+ if (isBound(queue, routingPattern)) {
+ return false;
+ } else {
+ Binding::shared_ptr binding (new Binding (routingKey, queue,
this));
+ bindings[routingPattern].push_back(binding);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ ((_qmf::Queue*)
queue->GetManagementObject())->inc_bindingCount();
+ }
}
- return true;
}
+ routeIVE();
+ return true;
}
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey,
const FieldTable* /*args*/){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp Thu Oct 16
18:27:45 2008
@@ -86,22 +86,21 @@
try {
RWlock::ScopedWlock l(lock);
- XmlBinding::vector& bindings(bindingsMap[routingKey]);
- XmlBinding::vector::ConstPtr p = bindings.snapshot();
- if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) ==
p->end()) {
- Query query(xqilla.parse(X(queryText.c_str())));
- XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue,
this, query));
- bindings.add(binding);
- QPID_LOG(trace, "Bound successfully with query: " << queryText );
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ XmlBinding::vector& bindings(bindingsMap[routingKey]);
+ XmlBinding::vector::ConstPtr p = bindings.snapshot();
+ if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) ==
p->end()) {
+ Query query(xqilla.parse(X(queryText.c_str())));
+ XmlBinding::shared_ptr binding(new XmlBinding (routingKey,
queue, this, query));
+ bindings.add(binding);
+ QPID_LOG(trace, "Bound successfully with query: " << queryText
);
+
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ ((_qmf::Queue*)
queue->GetManagementObject())->inc_bindingCount();
+ }
+ } else {
+ return false;
}
- return true;
- } else {
- return false;
- }
}
catch (XQException& e) {
throw InternalErrorException(QPID_MSG("Could not parse xquery:"+
queryText));
@@ -109,6 +108,8 @@
catch (...) {
throw InternalErrorException(QPID_MSG("Unexpected error - Could not
parse xquery:"+ queryText));
}
+ routeIVE();
+ return true;
}
bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey,
const FieldTable* /*args*/)
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Thu Oct 16
18:27:45 2008
@@ -223,4 +223,45 @@
}
+QPID_AUTO_TEST_CASE(testIVEOption)
+{
+ FieldTable args;
+ args.setInt("qpid.ive",1);
+ DirectExchange direct("direct1", false, args);
+ FanOutExchange fanout("fanout1", false, args);
+ HeadersExchange header("headers1", false, args);
+ TopicExchange topic ("topic1", false, args);
+
+ intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
+
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("a",
"abc");
+ DeliverableMessage dmsg1(msg1);
+
+ FieldTable args2;
+ args2.setString("x-match", "any");
+ args2.setString("a", "abc");
+
+ direct.route(dmsg1, "abc", 0);
+ fanout.route(dmsg1, "abc", 0);
+ header.route(dmsg1, "abc", &args2);
+ topic.route(dmsg1, "abc", 0);
+ Queue::shared_ptr queue(new Queue("queue", true));
+ Queue::shared_ptr queue1(new Queue("queue1", true));
+ Queue::shared_ptr queue2(new Queue("queue2", true));
+ Queue::shared_ptr queue3(new Queue("queue3", true));
+
+ BOOST_CHECK(HeadersExchange::match(args2,
msg1->getProperties<MessageProperties>()->getApplicationHeaders()));
+
+ BOOST_CHECK(direct.bind(queue, "abc", 0));
+ BOOST_CHECK(fanout.bind(queue1, "abc", 0));
+ BOOST_CHECK(header.bind(queue2, "", &args2));
+ BOOST_CHECK(topic.bind(queue3, "abc", 0));
+
+ BOOST_CHECK_EQUAL(1u,queue->getMessageCount());
+ BOOST_CHECK_EQUAL(1u,queue1->getMessageCount());
+ BOOST_CHECK_EQUAL(1u,queue2->getMessageCount());
+ BOOST_CHECK_EQUAL(1u,queue3->getMessageCount());
+
+}
+
+
QPID_AUTO_TEST_SUITE_END()