Author: gsim
Date: Tue Apr 29 13:01:33 2008
New Revision: 652075

URL: http://svn.apache.org/viewvc?rev=652075&view=rev
Log:
QPID-977: shutdown mgmt client cleanly in federation tests (patch from [EMAIL 
PROTECTED])
QPID-981: added custom options to queue declare to tag each message as it goes 
through a bridge queue and allow 
          loop prevention through specifying exclusions

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/federation.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=652075&r1=652074&r2=652075&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue Apr 29 
13:01:33 2008
@@ -59,7 +59,16 @@
         } else {
             string queue = "bridge_queue_";
             queue += Uuid(true).str();
-            peer.getQueue().declare(queue, "", false, false, true, true, 
FieldTable());
+            FieldTable queueSettings;
+            if (args.i_id.size()) {
+                queueSettings.setString("qpid.trace.id", args.i_id);
+            }
+            if (args.i_excludes.size()) {
+                queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+            }
+            bool durable = false;//should this be an arg, or would be use 
src_is_queue for durable queues?
+            bool autoDelete = !durable;//auto delete transient queues?
+            peer.getQueue().declare(queue, "", false, durable, true, 
autoDelete, queueSettings);
             peer.getExchange().bind(queue, args.i_src, args.i_key, 
FieldTable());
             peer.getMessage().subscribe(queue, args.i_dest, 0, 0, false, "", 
0, FieldTable());
             peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=652075&r1=652074&r2=652075&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Apr 29 
13:01:33 2008
@@ -28,6 +28,8 @@
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/framing/TypeFilter.h"
 #include "qpid/log/Statement.h"
+#include <boost/algorithm/string/classification.hpp>
+#include <boost/algorithm/string/split.hpp>
 
 using boost::intrusive_ptr;
 using namespace qpid::broker;
@@ -214,6 +216,7 @@
 
 void Message::sendHeader(framing::FrameHandler& out, uint16_t 
/*maxFrameSize*/) const
 {
+    sys::Mutex::ScopedLock l(lock);
     Relay f(out);
     frames.map_if(f, TypeFilter<HEADER_BODY>());    
 }
@@ -243,3 +246,46 @@
 {
     return loaded;
 }
+
+
+namespace 
+{
+    const std::string X_QPID_TRACE("x-qpid.trace");
+}
+
+bool Message::isExcluded(const std::vector<std::string>& excludes) const
+{
+    const FieldTable* headers = getApplicationHeaders();
+    if (headers) {
+        std::string traceStr = headers->getString(X_QPID_TRACE);
+        if (traceStr.size()) {
+            std::vector<std::string> trace;
+            boost::split(trace, traceStr, boost::is_any_of(", ") );
+
+            for (std::vector<std::string>::const_iterator i = 
excludes.begin(); i != excludes.end(); i++) {
+                for (std::vector<std::string>::const_iterator j = 
trace.begin(); j != trace.end(); j++) {
+                    if (*i == *j) {
+                        return true;
+                    }
+                }
+            }
+        }
+    }
+    return false;
+}
+
+void Message::addTraceId(const std::string& id)
+{
+    sys::Mutex::ScopedLock l(lock);
+    if (isA<MessageTransferBody>()) {
+        FieldTable& headers = 
getProperties<MessageProperties>()->getApplicationHeaders();
+        std::string trace = headers.getString(X_QPID_TRACE);
+        if (trace.empty()) {
+            headers.setString(X_QPID_TRACE, id);
+        } else if (trace.find(id) == std::string::npos) {
+            trace += ",";
+            trace += id;
+            headers.setString(X_QPID_TRACE, trace);
+        }        
+    }
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=652075&r1=652074&r2=652075&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Apr 29 13:01:33 
2008
@@ -23,11 +23,13 @@
  */
 
 #include <string>
+#include <vector>
 #include <boost/shared_ptr.hpp>
 #include <boost/variant.hpp>
 #include "PersistableMessage.h"
 #include "MessageAdapter.h"
 #include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Mutex.h"
 
 namespace qpid {
        
@@ -125,7 +127,11 @@
 
     bool isContentLoaded() const;
 
+    bool isExcluded(const std::vector<std::string>& excludes) const;
+    void addTraceId(const std::string& id);
+
   private:
+    mutable sys::Mutex lock;
     framing::FrameSet frames;
     mutable boost::shared_ptr<Exchange> exchange;
     mutable uint64_t persistenceId;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=652075&r1=652074&r2=652075&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Apr 29 13:01:33 
2008
@@ -37,6 +37,8 @@
 
 #include <boost/bind.hpp>
 #include <boost/intrusive_ptr.hpp>
+#include <boost/algorithm/string/classification.hpp>
+#include <boost/algorithm/string/split.hpp>
 
 using namespace qpid::broker;
 using namespace qpid::sys;
@@ -105,6 +107,11 @@
     return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
 }
 
+bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
+{
+    return traceExclude.size() && msg->isExcluded(traceExclude);
+}
+
 void Queue::deliver(boost::intrusive_ptr<Message>& msg){
     if (msg->isImmediate() && getConsumerCount() == 0) {
         if (alternateExchange) {
@@ -113,7 +120,10 @@
         }
     } else if (isLocal(msg)) {
         //drop message
-        QPID_LOG(debug, "Dropping 'local' message from " << getName());
+        QPID_LOG(info, "Dropping 'local' message from " << getName());
+    } else if (isExcluded(msg)) {
+        //drop message
+        QPID_LOG(info, "Dropping excluded message from " << getName());
     } else {
         // if no store then mark as enqueued
         if (!enqueue(0, msg)){
@@ -448,6 +458,10 @@
 // return true if store exists, 
 bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> 
msg)
 {
+    if (traceId.size()) {
+        msg->addTraceId(traceId);
+    }
+
     if (msg->isPersistent() && store) {
         msg->enqueueAsync(shared_from_this(), store); //increment to async 
counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = 
boost::static_pointer_cast<PersistableMessage>(msg);
@@ -477,6 +491,8 @@
     const std::string qpidMaxSize("qpid.max_size");
     const std::string qpidMaxCount("qpid.max_count");
     const std::string qpidNoLocal("no-local");
+    const std::string qpidTraceIdentity("qpid.trace.id");
+    const std::string qpidTraceExclude("qpid.trace.exclude");
 }
 
 void Queue::create(const FieldTable& _settings)
@@ -497,6 +513,15 @@
     //set this regardless of owner to allow use of no-local with exclusive 
consumers also
     noLocal = _settings.get(qpidNoLocal);
     QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+
+    traceId = _settings.getString(qpidTraceIdentity);
+    std::string excludeList = _settings.getString(qpidTraceExclude);
+    if (excludeList.size()) {
+        boost::split(traceExclude, excludeList, boost::is_any_of(", ") );
+    }
+    QPID_LOG(info, "Configured queue " << getName() << " with qpid.trace.id='" 
<< traceId 
+             << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << 
traceExclude.size() << " elements");
+
     if (mgmtObject.get() != 0)
         mgmtObject->set_arguments (_settings);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=652075&r1=652074&r2=652075&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Apr 29 13:01:33 
2008
@@ -72,6 +72,8 @@
             uint32_t consumerCount;
             OwnershipToken* exclusive;
             bool noLocal;
+            std::string traceId;
+            std::vector<std::string> traceExclude;
             Listeners listeners;
             Messages messages;
             mutable qpid::sys::Mutex consumerLock;
@@ -98,6 +100,8 @@
             void removeListener(Consumer&);
             void addListener(Consumer&);
 
+            bool isExcluded(boost::intrusive_ptr<Message>& msg);
+
         public:
             virtual void notifyDurableIOComplete();
             typedef boost::shared_ptr<Queue> shared_ptr;
@@ -120,7 +124,6 @@
 
             bool acquire(const QueuedMessage& msg);
 
-            bool isLocal(boost::intrusive_ptr<Message>& msg);
             /**
              * Delivers a message to the queue. Will record it as
              * enqueued if persistent then process it.
@@ -174,6 +177,7 @@
 
             void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
             boost::shared_ptr<Exchange> getAlternateExchange();
+            bool isLocal(boost::intrusive_ptr<Message>& msg);
 
             //PersistableQueue support:
             uint64_t getPersistenceId() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=652075&r1=652074&r2=652075&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Tue Apr 29 13:01:33 
2008
@@ -24,6 +24,11 @@
 from qpid.datatypes import Message
 from qpid.queue import Empty
 
+def add_module(args=sys.argv[1:]):
+    for a in args:
+        if a.startswith("federation"):
+            return False
+    return True
 
 def scan_args(name, default=None, args=sys.argv[1:]):
     if (name in args):
@@ -56,6 +61,9 @@
         self.mch = self.mc.addChannel(self.session)
         self.mc.syncWaitForStable(self.mch)
 
+    def shutdown (self):
+        self.mc.removeChannel (self.mch)
+
     def get_objects(self, type):
         return self.mc.syncGetObjects(self.mch, type)
 
@@ -94,6 +102,8 @@
             mgmt.call_method(link, "close")
             self.assertEqual(len(mgmt.get_objects("link")), 0)
 
+        mgmt.shutdown ()
+
     def test_pull_from_exchange(self):
         session = self.session
         
@@ -135,6 +145,8 @@
         mgmt.call_method(link, "close")
         self.assertEqual(len(mgmt.get_objects("link")), 0)
 
+        mgmt.shutdown()
+
     def test_pull_from_queue(self):
         session = self.session
 
@@ -158,7 +170,7 @@
         mgmt.call_method(broker, "connect", {"host":remote_host(), 
"port":remote_port()})
         link = mgmt.get_object("link")
 
-        mgmt.call_method(link, "bridge", {"src":"my-bridge-queue", 
"dest":"amq.fanout", "key":"", "src_is_queue":1})
+        mgmt.call_method(link, "bridge", {"src":"my-bridge-queue", 
"dest":"amq.fanout", "key":"", "id":"", "excludes":"", "src_is_queue":1})
         bridge = mgmt.get_object("bridge")
 
         #add some more messages (i.e. after bridge was created)
@@ -167,8 +179,11 @@
             r_session.message_transfer(message=Message(dp, "Message %d" % i))
 
         for i in range(1, 11):
-            msg = queue.get(timeout=5)
-            self.assertEqual("Message %d" % i, msg.body)
+            try:
+                msg = queue.get(timeout=5)
+                self.assertEqual("Message %d" % i, msg.body)
+            except Empty:
+                self.fail("Failed to find expected message containing 'Message 
%d'" % i)
         try:
             extra = queue.get(timeout=1)
             self.fail("Got unexpected message in queue: " + extra.body)
@@ -181,12 +196,77 @@
         mgmt.call_method(link, "close")
         self.assertEqual(len(mgmt.get_objects("link")), 0)
 
+        mgmt.shutdown ()
+
+    def test_tracing(self):
+        session = self.session
+        
+        mgmt = Helper(self)
+        broker = mgmt.get_object("broker")
+
+        mgmt.call_method(broker, "connect", {"host":remote_host(), 
"port":remote_port()})
+        link = mgmt.get_object("link")
+        
+        mgmt.call_method(link, "bridge", {"src":"amq.direct", 
"dest":"amq.fanout", "key":"my-key",
+                                          "id":"my-bridge-id", 
"excludes":"exclude-me,also-exclude-me"})
+        bridge = mgmt.get_object("bridge")
+
+        #setup queue to receive messages from local broker
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed1", exchange="amq.fanout")
+        self.subscribe(queue="fed1", destination="f1")
+        queue = session.incoming("f1")
+
+        #send messages to remote broker and confirm it is routed to local 
broker
+        r_conn = self.connect(host=remote_host(), port=remote_port())
+        r_session = r_conn.session("1")
+
+        trace = [None, "exclude-me", "a,exclude-me,b", "also-exclude-me,c", 
"dont-exclude-me"]
+        body = ["yes", "first-bad", "second-bad", "third-bad", "yes"]
+        for b, t in zip(body, trace):
+            headers = {}
+            if (t): headers["x-qpid.trace"]=t
+            dp = r_session.delivery_properties(routing_key="my-key")
+            mp = r_session.message_properties(application_headers=headers)
+            r_session.message_transfer(destination="amq.direct", 
message=Message(dp, mp, b))
+
+        for e in ["my-bridge-id", "dont-exclude-me,my-bridge-id"]:
+            msg = queue.get(timeout=5)
+            self.assertEqual("yes", msg.body)
+            self.assertEqual(e, self.getAppHeader(msg, "x-qpid.trace"))
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        mgmt.call_method(bridge, "close")
+        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+        
+        mgmt.call_method(link, "close")
+        self.assertEqual(len(mgmt.get_objects("link")), 0)
+
+        mgmt.shutdown ()
+
+    def getProperty(self, msg, name):
+        for h in msg.headers:
+            if hasattr(h, name): return getattr(h, name)
+        return None            
+
+    def getAppHeader(self, msg, name):
+        headers = self.getProperty(msg, "application_headers")
+        if headers:
+            return headers[name]
+        return None            
+
 if __name__ == '__main__':
     args = sys.argv[1:]
     #need to remove the extra options from args as test runner doesn't 
recognise them
     extract_args("--remote-port", args)
     extract_args("--remote-host", args)
-    #add module(s) to run to testrunners args
-    args.append("federation") 
+
+    if add_module():
+        #add module(s) to run to testrunners args
+        args.append("federation") 
     
     if not testrunner.run(args): sys.exit(1)


Reply via email to