Author: gsim
Date: Mon Sep  8 12:23:30 2008
New Revision: 693208

URL: http://svn.apache.org/viewvc?rev=693208&view=rev
Log:
Fixes to xml exchange:
* changed locking for QPID-1264
* allow multiple queues to be bound with the same binding key
* correct log message and management stats update on route


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h
    incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp?rev=693208&r1=693207&r2=693208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp Mon Sep  8 
12:23:30 2008
@@ -81,29 +81,23 @@
 
     try {
         RWlock::ScopedWlock l(lock);
-        XmlBinding::vector& bindings(bindingsMap[routingKey]);
-        XmlBinding::vector::iterator i;
 
-        for (i = bindings.begin(); i != bindings.end(); i++)
-            if ((*i)->queue == queue)
-                break;
-
-        if (i == bindings.end()) {
-
-            Query query(xqilla.parse(X(queryText.c_str())));
-            XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, 
this, query));
-            XmlBinding::vector bindings(1, binding);
-            bindingsMap[routingKey] = bindings;
-            QPID_LOG(trace, "Bound successfully with query: " << queryText );
-
-            if (mgmtExchange != 0) {
-                mgmtExchange->inc_bindingCount();
-                ((management::Queue*) 
queue->GetManagementObject())->inc_bindingCount();
-            }
-            return true;
-        } else{
-            return false;
-        }
+       XmlBinding::vector& bindings(bindingsMap[routingKey]);
+       XmlBinding::vector::ConstPtr p = bindings.snapshot();
+       if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == 
p->end()) {
+           Query query(xqilla.parse(X(queryText.c_str())));
+           XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, 
this, query));
+           bindings.add(binding);
+           QPID_LOG(trace, "Bound successfully with query: " << queryText );
+           
+           if (mgmtExchange != 0) {
+               mgmtExchange->inc_bindingCount();
+               ((management::Queue*) 
queue->GetManagementObject())->inc_bindingCount();
+           }
+           return true;
+       } else {
+           return false;
+       }
     }
     catch (XQException& e) {
         throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ 
queryText));
@@ -116,25 +110,14 @@
 bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, 
const FieldTable* /*args*/)
 {
     RWlock::ScopedWlock l(lock);
-    XmlBinding::vector& bindings(bindingsMap[routingKey]);
-    XmlBinding::vector::iterator i;
-
-    for (i = bindings.begin(); i != bindings.end(); i++)
-        if ((*i)->queue == queue)
-            break;
-
-    if (i < bindings.end()) {
-        bindings.erase(i);
-        if (bindings.empty()) {
-            bindingsMap.erase(routingKey);
-        }
+    if (bindingsMap[routingKey].remove_if(MatchQueue(queue))) {
         if (mgmtExchange != 0) {
             mgmtExchange->dec_bindingCount();
             ((management::Queue*) 
queue->GetManagementObject())->dec_bindingCount();
         }
         return true;
     } else {
-        return false;
+        return false;      
     }
 }
 
@@ -193,13 +176,15 @@
 void XmlExchange::route(Deliverable& msg, const string& routingKey, const 
FieldTable* args)
 {
     try {
-        RWlock::ScopedRlock l(lock);
-        XmlBinding::vector& bindings(bindingsMap[routingKey]);
-        XmlBinding::vector::iterator i;
+        XmlBinding::vector::ConstPtr p;
+       {
+            RWlock::ScopedRlock l(lock);
+           p = bindingsMap[routingKey].snapshot();
+           if (!p) return;
+       }
         int count(0);
 
-        for (i = bindings.begin(); i != bindings.end(); i++) {
-
+        for (std::vector<XmlBinding::shared_ptr>::const_iterator i = 
p->begin(); i != p->end(); i++) {
             if ((*i)->xquery && matches((*i)->xquery, msg, args)) {   // 
Overly defensive? There should always be a query ...
                 msg.deliverTo((*i)->queue);
                 count++;
@@ -208,28 +193,25 @@
                 if ((*i)->mgmtBinding != 0)
                     (*i)->mgmtBinding->inc_msgMatched ();
             }
-
-            if(!count){
-                QPID_LOG(warning, "XMLExchange " << getName() << ": could not 
route message with query " << routingKey);
-                if (mgmtExchange != 0) {
-                    mgmtExchange->inc_msgDrops  ();
-                    mgmtExchange->inc_byteDrops (msg.contentSize ());
-                }
-            }
-            else {
-                if (mgmtExchange != 0) {
-                    mgmtExchange->inc_msgRoutes  (count);
-                    mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
-                }
-            }
-
-            if (mgmtExchange != 0) {
-                mgmtExchange->inc_msgReceives  ();
-                mgmtExchange->inc_byteReceives (msg.contentSize ());
-            }
-        }
-    }
-    catch (...) {
+       }
+       if (!count) {
+           QPID_LOG(warning, "XMLExchange " << getName() << ": could not route 
message with query " << routingKey);
+           if (mgmtExchange != 0) {
+               mgmtExchange->inc_msgDrops  ();
+               mgmtExchange->inc_byteDrops (msg.contentSize ());
+           }
+       } else {
+           if (mgmtExchange != 0) {
+               mgmtExchange->inc_msgRoutes  (count);
+               mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+           }
+       }
+
+       if (mgmtExchange != 0) {
+           mgmtExchange->inc_msgReceives  ();
+           mgmtExchange->inc_byteReceives (msg.contentSize ());
+       }
+    } catch (...) {
         QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing 
message with query " << routingKey);
     }
       
@@ -239,30 +221,27 @@
 
 bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const 
routingKey, const FieldTable* const) 
 {
-    XmlBinding::vector::iterator j;
-
+    RWlock::ScopedRlock l(lock);
     if (routingKey) {
-      XmlBindingsMap::iterator i = bindingsMap.find(*routingKey);
+        XmlBindingsMap::iterator i = bindingsMap.find(*routingKey);
 
-      if (i == bindingsMap.end())
-       return false;
-      if (!queue)
-       return true;
-      for (j = i->second.begin(); j != i->second.end(); j++)
-       if ((*j)->queue == queue)
-         return true;
+        if (i == bindingsMap.end())
+           return false;
+        if (!queue)
+           return true;
+        XmlBinding::vector::ConstPtr p = i->second.snapshot();
+        return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != 
p->end();
     } else if (!queue) {
-      //if no queue or routing key is specified, just report whether any 
bindings exist
-      return bindingsMap.size() > 0;
+        //if no queue or routing key is specified, just report whether any 
bindings exist
+        return bindingsMap.size() > 0;
     } else {
-      for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != 
bindingsMap.end(); i++)
-       for (j = i->second.begin(); j != i->second.end(); j++)
-         if ((*j)->queue == queue)
-           return true;
-      return false;
+        for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != 
bindingsMap.end(); i++) {
+           XmlBinding::vector::ConstPtr p = i->second.snapshot();
+            if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != 
p->end()) return true;
+       }
+       return false;
     }
 
-    return false;
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h?rev=693208&r1=693207&r2=693208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h Mon Sep  8 
12:23:30 2008
@@ -23,6 +23,7 @@
 
 #include "Exchange.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/sys/CopyOnWriteArray.h"
 #include "qpid/sys/Monitor.h"
 #include "Queue.h"
 
@@ -42,7 +43,7 @@
 
     struct XmlBinding : public Exchange::Binding {
         typedef boost::shared_ptr<XmlBinding> shared_ptr;
-        typedef std::vector<XmlBinding::shared_ptr> vector;
+        typedef qpid::sys::CopyOnWriteArray<XmlBinding::shared_ptr> vector;
 
         boost::shared_ptr<XQQuery> xquery;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp?rev=693208&r1=693207&r2=693208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp Mon Sep  8 
12:23:30 2008
@@ -146,6 +146,36 @@
   BOOST_CHECK_EQUAL(m, m2.getData());  
 }
 
+/**
+ * Ensure that multiple queues can be bound using the same routing key
+ */
+QPID_AUTO_TEST_CASE(testBindMultipleQueues) {
+    ClientSessionFixture f;
+
+    f.session.exchangeDeclare(arg::exchange="xml", arg::type="xml");
+    f.session.queueDeclare(arg::queue="blue", arg::exclusive=true, 
arg::autoDelete=true);
+    f.session.queueDeclare(arg::queue="red", arg::exclusive=true, 
arg::autoDelete=true);
+
+    FieldTable blue;
+    blue.setString("xquery", "./colour = 'blue'");
+    f.session.exchangeBind(arg::exchange="xml", arg::queue="blue", 
arg::bindingKey="by-colour", arg::arguments=blue); 
+    FieldTable red;
+    red.setString("xquery", "./colour = 'red'");
+    f.session.exchangeBind(arg::exchange="xml", arg::queue="red", 
arg::bindingKey="by-colour", arg::arguments=red); 
+
+    Message sent1("<colour>blue</colour>", "by-colour");
+    f.session.messageTransfer(arg::content=sent1,  arg::destination="xml");
+
+    Message sent2("<colour>red</colour>", "by-colour");
+    f.session.messageTransfer(arg::content=sent2,  arg::destination="xml");
+
+    Message received;
+    BOOST_CHECK(f.subs.get(received, "blue"));
+    BOOST_CHECK_EQUAL(sent1.getData(), received.getData());
+    BOOST_CHECK(f.subs.get(received, "red"));
+    BOOST_CHECK_EQUAL(sent2.getData(), received.getData());
+}
+
 //### Test: Bad XML does not kill the server
 
 //### Test: Bad XQuery does not kill the server


Reply via email to