Author: cctrieloff
Date: Thu Oct 16 18:27:45 2008
New Revision: 705443

URL: http://svn.apache.org/viewvc?rev=705443&view=rev
Log:

Feature requested by AndrewM for M4...
    - provide initial value support, for late joining consumers


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Oct 16 
18:27:45 2008
@@ -43,17 +43,20 @@
 }
 
 bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, 
const FieldTable*){
-    Mutex::ScopedLock l(lock);
-    Binding::shared_ptr b(new Binding (routingKey, queue, this));
-    if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
-        if (mgmtExchange != 0) {
-            mgmtExchange->inc_bindingCount();
-            ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+    {
+        Mutex::ScopedLock l(lock);
+        Binding::shared_ptr b(new Binding (routingKey, queue, this));
+        if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
+            if (mgmtExchange != 0) {
+                mgmtExchange->inc_bindingCount();
+                ((_qmf::Queue*) 
queue->GetManagementObject())->inc_bindingCount();
+            }
+        } else {
+            return false;
         }
-        return true;
-    } else {
-        return false;
     }
+    routeIVE();
+    return true;
 }
 
 bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, 
const FieldTable* /*args*/){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Oct 16 
18:27:45 2008
@@ -24,6 +24,7 @@
 #include "qpid/agent/ManagementAgent.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/MessageProperties.h"
+#include "DeliverableMessage.h"
 
 using namespace qpid::broker;
 using namespace qpid::framing;
@@ -38,26 +39,41 @@
 namespace 
 {
 const std::string qpidMsgSequence("qpid.msg_sequence");
+const std::string qpidIVE("qpid.ive");
 }
 
 
 Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
-    if (parent && parent->sequence){
-        parent->sequenceLock.lock();
-        parent->sequenceNo++;
-        
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
 
+    if (parent){
+        if (parent->sequence || parent->ive) parent->sequenceLock.lock();
+        
+        if (parent->sequence){
+            parent->sequenceNo++;
+            
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
 
+        } 
+        if (parent->ive) {
+            parent->lastMsg =  &( msg.getMessage());
+        }
     }
 }
 
 Exchange::PreRoute::~PreRoute(){
-    if (parent && parent->sequence){
+    if (parent && (parent->sequence || parent->ive)){
         parent->sequenceLock.unlock();
     }
 }
 
+void Exchange::routeIVE(){
+    if (ive && lastMsg.get()){
+        DeliverableMessage dmsg(lastMsg);
+        route(dmsg, lastMsg->getRoutingKey(), 
lastMsg->getApplicationHeaders());
+    }
+}
+
+
 Exchange::Exchange (const string& _name, Manageable* parent) :
     name(_name), durable(false), persistenceId(0), sequence(false), 
-       sequenceNo(0), mgmtExchange(0)
+       sequenceNo(0), ive(false), mgmtExchange(0)
 {
     if (parent != 0)
     {
@@ -73,7 +89,7 @@
 Exchange::Exchange(const string& _name, bool _durable, const 
qpid::framing::FieldTable& _args,
                    Manageable* parent)
     : name(_name), durable(_durable), args(_args), alternateUsers(0), 
persistenceId(0), 
-       sequence(false), sequenceNo(0), mgmtExchange(0)
+       sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
 {
     if (parent != 0)
     {
@@ -95,6 +111,8 @@
     sequence = _args.get(qpidMsgSequence);
     if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg 
sequencing");
 
+    ive = _args.get(qpidIVE);
+    if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial 
Value");
 }
 
 Exchange::~Exchange ()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Oct 16 
18:27:45 2008
@@ -51,6 +51,8 @@
             bool sequence;
             mutable qpid::sys::Mutex sequenceLock;
             uint64_t sequenceNo;
+            bool ive;
+            boost::intrusive_ptr<Message> lastMsg;
                
             class PreRoute{
             public:
@@ -60,6 +62,8 @@
                 Exchange* parent;
                    };
            
+            void routeIVE();
+           
             struct Binding : public management::Manageable {
                 typedef boost::shared_ptr<Binding>       shared_ptr;
                 typedef std::vector<Binding::shared_ptr> vector;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu Oct 16 
18:27:45 2008
@@ -49,6 +49,7 @@
             mgmtExchange->inc_bindingCount();
             ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
         }
+        routeIVE();
         return true;
     } else {
         return false;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Thu Oct 
16 18:27:45 2008
@@ -84,6 +84,7 @@
             mgmtExchange->inc_bindingCount();
             ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
         }
+        routeIVE();
         return true;
     } else {
         return false;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Oct 16 
18:27:45 2008
@@ -131,19 +131,22 @@
 }
 
 bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, 
const FieldTable* /*args*/){
-    RWlock::ScopedWlock l(lock);
-    TopicPattern routingPattern(routingKey);
-    if (isBound(queue, routingPattern)) {
-        return false;
-    } else {
-        Binding::shared_ptr binding (new Binding (routingKey, queue, this));
-        bindings[routingPattern].push_back(binding);
-        if (mgmtExchange != 0) {
-            mgmtExchange->inc_bindingCount();
-            ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+    {
+        RWlock::ScopedWlock l(lock);
+        TopicPattern routingPattern(routingKey);
+        if (isBound(queue, routingPattern)) {
+            return false;
+        } else {
+            Binding::shared_ptr binding (new Binding (routingKey, queue, 
this));
+            bindings[routingPattern].push_back(binding);
+            if (mgmtExchange != 0) {
+                mgmtExchange->inc_bindingCount();
+                ((_qmf::Queue*) 
queue->GetManagementObject())->inc_bindingCount();
+            }
         }
-        return true;
     }
+    routeIVE();
+    return true;
 }
 
 bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, 
const FieldTable* /*args*/){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp Thu Oct 16 
18:27:45 2008
@@ -86,22 +86,21 @@
     try {
         RWlock::ScopedWlock l(lock);
 
-       XmlBinding::vector& bindings(bindingsMap[routingKey]);
-       XmlBinding::vector::ConstPtr p = bindings.snapshot();
-       if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == 
p->end()) {
-           Query query(xqilla.parse(X(queryText.c_str())));
-           XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, 
this, query));
-           bindings.add(binding);
-           QPID_LOG(trace, "Bound successfully with query: " << queryText );
-           
-           if (mgmtExchange != 0) {
-               mgmtExchange->inc_bindingCount();
-            ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+           XmlBinding::vector& bindings(bindingsMap[routingKey]);
+           XmlBinding::vector::ConstPtr p = bindings.snapshot();
+           if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == 
p->end()) {
+               Query query(xqilla.parse(X(queryText.c_str())));
+               XmlBinding::shared_ptr binding(new XmlBinding (routingKey, 
queue, this, query));
+               bindings.add(binding);
+               QPID_LOG(trace, "Bound successfully with query: " << queryText 
);
+
+               if (mgmtExchange != 0) {
+                   mgmtExchange->inc_bindingCount();
+                ((_qmf::Queue*) 
queue->GetManagementObject())->inc_bindingCount();
+               }
+           } else {
+               return false;
            }
-           return true;
-       } else {
-           return false;
-       }
     }
     catch (XQException& e) {
         throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ 
queryText));
@@ -109,6 +108,8 @@
     catch (...) {
         throw InternalErrorException(QPID_MSG("Unexpected error - Could not 
parse xquery:"+ queryText));
     }
+    routeIVE();
+       return true;
 }
 
 bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, 
const FieldTable* /*args*/)

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=705443&r1=705442&r2=705443&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Thu Oct 16 
18:27:45 2008
@@ -223,4 +223,45 @@
 
 }
 
+QPID_AUTO_TEST_CASE(testIVEOption) 
+{
+    FieldTable args;
+    args.setInt("qpid.ive",1);
+    DirectExchange direct("direct1", false, args);
+    FanOutExchange fanout("fanout1", false, args);
+    HeadersExchange header("headers1", false, args);
+    TopicExchange topic ("topic1", false, args);
+    
+    intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
+    
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("a",
 "abc");
+    DeliverableMessage dmsg1(msg1);
+
+    FieldTable args2;
+    args2.setString("x-match", "any");
+    args2.setString("a", "abc");
+    
+    direct.route(dmsg1, "abc", 0);
+    fanout.route(dmsg1, "abc", 0);
+    header.route(dmsg1, "abc", &args2);
+    topic.route(dmsg1, "abc", 0);
+    Queue::shared_ptr queue(new Queue("queue", true));
+    Queue::shared_ptr queue1(new Queue("queue1", true));
+    Queue::shared_ptr queue2(new Queue("queue2", true));
+    Queue::shared_ptr queue3(new Queue("queue3", true));
+    
+    BOOST_CHECK(HeadersExchange::match(args2, 
msg1->getProperties<MessageProperties>()->getApplicationHeaders()));
+    
+    BOOST_CHECK(direct.bind(queue, "abc", 0));
+    BOOST_CHECK(fanout.bind(queue1, "abc", 0));
+    BOOST_CHECK(header.bind(queue2, "", &args2));
+    BOOST_CHECK(topic.bind(queue3, "abc", 0));
+    
+    BOOST_CHECK_EQUAL(1u,queue->getMessageCount());
+    BOOST_CHECK_EQUAL(1u,queue1->getMessageCount());
+    BOOST_CHECK_EQUAL(1u,queue2->getMessageCount());
+    BOOST_CHECK_EQUAL(1u,queue3->getMessageCount());
+    
+}
+
+
 QPID_AUTO_TEST_SUITE_END()


Reply via email to