Author: gsim
Date: Wed Apr  9 12:52:44 2008
New Revision: 646505

URL: http://svn.apache.org/viewvc?rev=646505&view=rev
Log:
Fixes and automated tests for federation function.


Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/federation.py   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/run_federation_tests   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/python/commands/qpid-route

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=646505&r1=646504&r2=646505&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Wed Apr  9 
12:52:44 2008
@@ -49,8 +49,6 @@
     framing::AMQP_ServerProxy::Session session(channel);
     session.open(0);
 
-    //peer.getSession().open(0);
-
     if (args.i_src_is_local) {
         //TODO: handle 'push' here... simplest way is to create frames and 
pass them to Connection::received()
     } else {
@@ -62,7 +60,7 @@
             string queue = "bridge_queue_";
             queue += Uuid(true).str();
             peer.getQueue().declare(0, queue, "", false, false, true, true, 
FieldTable());
-            peer.getQueue().bind(0, queue, args.i_dest, args.i_key, 
FieldTable());
+            peer.getQueue().bind(0, queue, args.i_src, args.i_key, 
FieldTable());
             peer.getMessage().subscribe(0, queue, args.i_dest, false, 0, 0, 
false, FieldTable());
             peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
             peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=646505&r1=646504&r2=646505&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Wed Apr  9 
12:52:44 2008
@@ -70,7 +70,7 @@
     Bridges cancelled;//holds list of bridges pending cancellation
     Bridges active;//holds active bridges
     uint channelCounter;
-    sys::Mutex lock;
+    sys::Mutex linkLock;
 
     void cancel(Bridge*);
 
@@ -88,7 +88,7 @@
 Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const 
std::string& mgmtId_) :
     ConnectionState(out_, broker_),
     adapter(*this),
-    mgmtClosing(0),
+    mgmtClosing(false),
     mgmtId(mgmtId_)
 {
     initMgmt();
@@ -164,6 +164,7 @@
     try{
         //process any pending mgmt commands:
         if (mgmtWrapper.get()) mgmtWrapper->processPending();
+        if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
 
         //then do other output as needed:
         return outputTasks.doOutput();
@@ -203,8 +204,9 @@
     switch (methodId)
     {
     case management::Client::METHOD_CLOSE :
-        mgmtClosing = 1;
+        mgmtClosing = true;
         if (mgmtWrapper.get()) mgmtWrapper->closing();
+        out->activateOutput();
         status = Manageable::STATUS_OK;
         break;
     case management::Link::METHOD_BRIDGE :
@@ -253,6 +255,7 @@
 
 void Connection::MgmtLink::processPending()
 {
+    Mutex::ScopedLock l(linkLock);
     //process any pending creates
     if (!created.empty()) {
         for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -271,6 +274,7 @@
 
 void Connection::MgmtLink::process(Connection& connection, const 
management::Args& args)
 {   
+    Mutex::ScopedLock l(linkLock);
     created.push_back(new Bridge(channelCounter++, connection, 
                                  boost::bind(&MgmtLink::cancel, this, _1),
                                  dynamic_cast<const 
management::ArgsLinkBridge&>(args)));
@@ -278,6 +282,7 @@
 
 void Connection::MgmtLink::cancel(Bridge* b)
 {   
+    Mutex::ScopedLock l(linkLock);
     //need to take this out the active map and add it to the cancelled map
     for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
         if (&(*i) == b) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp?rev=646505&r1=646504&r2=646505&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp Wed Apr 
 9 12:52:44 2008
@@ -70,7 +70,7 @@
     Bridges cancelled;//holds list of bridges pending cancellation
     Bridges active;//holds active bridges
     uint channelCounter;
-    sys::Mutex lock;
+    sys::Mutex linkLock;
 
     void cancel(Bridge*);
 
@@ -88,7 +88,7 @@
 PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& 
broker_, const std::string& mgmtId_, bool isLink) :
     ConnectionState(out_, broker_),
     adapter(*this, isLink),
-    mgmtClosing(0),
+    mgmtClosing(false),
     mgmtId(mgmtId_)
 {
     Manageable* parent = broker.GetVhostObject ();
@@ -111,7 +111,7 @@
 PreviewConnection::~PreviewConnection () {}
 
 void PreviewConnection::received(framing::AMQFrame& frame){
-    if (mgmtClosing)
+    if (mgmtClosing) 
         close (403, "Closed by Management Request", 0, 0);
 
     if (frame.getChannel() == 0) {
@@ -159,6 +159,8 @@
     try{
         //process any pending mgmt commands:
         if (mgmtWrapper.get()) mgmtWrapper->processPending();
+        if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
+
 
         //then do other output as needed:
         return outputTasks.doOutput();
@@ -198,8 +200,9 @@
     switch (methodId)
     {
     case management::Client::METHOD_CLOSE :
-        mgmtClosing = 1;
+        mgmtClosing = true;
         if (mgmtWrapper.get()) mgmtWrapper->closing();
+        out->activateOutput();
         status = Manageable::STATUS_OK;
         break;
     case management::Link::METHOD_BRIDGE :
@@ -248,6 +251,7 @@
 
 void PreviewConnection::MgmtLink::processPending()
 {
+    Mutex::ScopedLock l(linkLock);
     //process any pending creates
     if (!created.empty()) {
         for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -266,6 +270,7 @@
 
 void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const 
management::Args& args)
 {   
+    Mutex::ScopedLock l(linkLock);
     created.push_back(new Bridge(channelCounter++, connection, 
                                  boost::bind(&MgmtLink::cancel, this, _1),
                                  dynamic_cast<const 
management::ArgsLinkBridge&>(args)));
@@ -273,6 +278,7 @@
 
 void PreviewConnection::MgmtLink::cancel(Bridge* b)
 {   
+    Mutex::ScopedLock l(linkLock);
     //need to take this out the active map and add it to the cancelled map
     for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
         if (&(*i) == b) {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=646505&r1=646504&r2=646505&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Apr  9 12:52:44 2008
@@ -118,7 +118,7 @@
 TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= 
$(srcdir)/run_test 
 
 system_tests = client_test quick_perftest quick_topictest
-TESTS += run-unit-tests start_broker $(system_tests) python_tests stop_broker 
+TESTS += run-unit-tests start_broker $(system_tests) python_tests stop_broker 
run_federation_tests
 
 EXTRA_DIST +=                                                          \
   run_test vg_check                                                    \
@@ -126,6 +126,7 @@
   quick_topictest                                                      \
   quick_perftest                                                       \
   topictest                                                            \
+  run_federation_tests                                                 \
   .valgrind.supp                                                       \
   .valgrindrc                                                          \
   MessageUtils.h                                                       \

Added: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=646505&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Wed Apr  9 12:52:44 
2008
@@ -0,0 +1,191 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from qpid.testlib import TestBase, testrunner
+from qpid.management import managementChannel, managementClient
+from qpid.queue import Empty
+from qpid.content import Content
+
+
+def scan_args(name, default=None, args=sys.argv[1:]):
+    if (name in args):
+        pos = args.index(name)
+        return args[pos + 1]
+    elif default:
+        return default
+    else:
+        print "Please specify extra argument: %s" % name
+        sys.exit(2)
+
+def extract_args(name, args):
+    if (name in args):
+        pos = args.index(name)
+        del args[pos:pos+2]
+    else:
+        return None
+
+def remote_host():
+    return scan_args("--remote-host", "localhost")
+
+def remote_port():
+    return int(scan_args("--remote-port"))
+
+class Helper:
+    def __init__(self, parent):
+        self.parent = parent
+        self.channel = parent.client.channel(2)
+        self.mc  = managementClient(self.channel.spec)
+        self.mch = self.mc.addChannel(self.channel)
+        self.mc.syncWaitForStable(self.mch)
+
+    def get_objects(self, type):
+        return self.mc.syncGetObjects(self.mch, type)
+
+    def get_object(self, type, position = 1, expected = None):
+        objects = self.get_objects(type)
+        if not expected: expected = position
+        self.assertEqual(len(objects), expected)
+        return objects[(position - 1)]
+
+        
+    def call_method(self, object, method, args=None):
+        res = self.mc.syncCallMethod(self.mch, object.id, object.classKey, 
method, args)
+        self.assertEqual(res.status,     0)
+        self.assertEqual(res.statusText, "OK")
+        return res
+    
+    def assertEqual(self, a, b):
+        self.parent.assertEqual(a, b)
+
+class FederationTests(TestBase):
+
+    def test_bridge_create_and_close(self):
+        mgmt = Helper(self)
+        broker = mgmt.get_object("broker")
+
+        for i in range(10):
+            mgmt.call_method(broker, "connect", {"host":remote_host(), 
"port":remote_port()})
+            link = mgmt.get_object("link")
+            
+            mgmt.call_method(link, "bridge", {"src":"amq.direct", 
"dest":"amq.direct", "key":"my-key"})
+            bridge = mgmt.get_object("bridge")
+            
+            mgmt.call_method(bridge, "close")
+            self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+            
+            mgmt.call_method(link, "close")
+            self.assertEqual(len(mgmt.get_objects("link")), 0)
+
+    def test_pull_from_exchange(self):
+        channel = self.channel
+        
+        mgmt = Helper(self)
+        broker = mgmt.get_object("broker")
+
+        mgmt.call_method(broker, "connect", {"host":remote_host(), 
"port":remote_port()})
+        link = mgmt.get_object("link")
+        
+        mgmt.call_method(link, "bridge", {"src":"amq.direct", 
"dest":"amq.fanout", "key":"my-key"})
+        bridge = mgmt.get_object("bridge")
+
+        #setup queue to receive messages from local broker
+        channel.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        channel.queue_bind(queue="fed1", exchange="amq.fanout")
+        self.subscribe(queue="fed1", destination="f1")
+        queue = self.client.queue("f1")
+
+        #send messages to remote broker and confirm it is routed to local 
broker
+        r_conn = self.connect(host=remote_host(), port=remote_port())
+        r_channel = r_conn.channel(1)
+        r_channel.session_open()
+
+        for i in range(1, 11):
+            r_channel.message_transfer(destination="amq.direct", 
content=Content(properties={'routing_key' : "my-key"}, body="Message %d" % i))
+
+        for i in range(1, 11):
+            msg = queue.get(timeout=5)
+            self.assertEqual("Message %d" % i, msg.content.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.content.body)
+        except Empty: None
+
+
+        mgmt.call_method(bridge, "close")
+        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+        
+        mgmt.call_method(link, "close")
+        self.assertEqual(len(mgmt.get_objects("link")), 0)
+
+    def test_pull_from_queue(self):
+        channel = self.channel
+
+        #setup queue on remote broker and add some messages
+        r_conn = self.connect(host=remote_host(), port=remote_port())
+        r_channel = r_conn.channel(1)
+        r_channel.session_open()
+        r_channel.queue_declare(queue="my-bridge-queue", exclusive=True, 
auto_delete=True)
+        for i in range(1, 6):
+            
r_channel.message_transfer(content=Content(properties={'routing_key' : 
"my-bridge-queue"}, body="Message %d" % i))
+
+        #setup queue to receive messages from local broker
+        channel.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        channel.queue_bind(queue="fed1", exchange="amq.fanout")
+        self.subscribe(queue="fed1", destination="f1")
+        queue = self.client.queue("f1")
+
+        mgmt = Helper(self)
+        broker = mgmt.get_object("broker")
+
+        mgmt.call_method(broker, "connect", {"host":remote_host(), 
"port":remote_port()})
+        link = mgmt.get_object("link")
+
+        mgmt.call_method(link, "bridge", {"src":"my-bridge-queue", 
"dest":"amq.fanout", "key":"", "src_is_queue":1})
+        bridge = mgmt.get_object("bridge")
+
+        #add some more messages (i.e. after bridge was created)
+        for i in range(6, 11):
+            
r_channel.message_transfer(content=Content(properties={'routing_key' : 
"my-bridge-queue"}, body="Message %d" % i))
+
+        for i in range(1, 11):
+            msg = queue.get(timeout=5)
+            self.assertEqual("Message %d" % i, msg.content.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.content.body)
+        except Empty: None
+
+
+        mgmt.call_method(bridge, "close")
+        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+        
+        mgmt.call_method(link, "close")
+        self.assertEqual(len(mgmt.get_objects("link")), 0)
+
+if __name__ == '__main__':
+    args = sys.argv[1:]
+    #need to remove the extra options from args as test runner doesn't 
recognise them
+    extract_args("--remote-port", args)
+    extract_args("--remote-host", args)
+    #add module(s) to run to testrunners args
+    args.append("federation") 
+    
+    if not testrunner.run(args): sys.exit(1)

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/qpid/trunk/qpid/cpp/src/tests/run_federation_tests
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/run_federation_tests?rev=646505&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/run_federation_tests (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/run_federation_tests Wed Apr  9 
12:52:44 2008
@@ -0,0 +1,24 @@
+#!/bin/sh
+# Run the federation tests.
+
+trap stop_brokers EXIT
+
+start_brokers() {
+    ../qpidd --daemon --port 0 --no-data-dir > qpidd.port
+    LOCAL_PORT=`cat qpidd.port`
+    ../qpidd --daemon --port 0 --no-data-dir > qpidd.port
+    REMOTE_PORT=`cat qpidd.port`
+}
+
+stop_brokers() {
+        ../qpidd -q --port $LOCAL_PORT
+        ../qpidd -q --port $REMOTE_PORT
+}
+
+if test -d ../../../python ;  then
+    start_brokers
+    echo "Running federation tests using brokers on ports $LOCAL_PORT 
$REMOTE_PORT"
+    export PYTHONPATH=../../../python
+    ./federation.py -v -s ../../../specs/amqp.0-10-preview.xml -b 
localhost:$LOCAL_PORT --remote-port $REMOTE_PORT  || { echo "FAIL federation 
tests"; exit 1; }
+fi
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/run_federation_tests
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=646505&r1=646504&r2=646505&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Wed Apr  9 12:52:44 
2008
@@ -86,7 +86,7 @@
             self.mclient.syncWaitForStable (self.mch)
         except socket.error, e:
             print "Connect Error:", e
-            exit (1)
+            sys.exit (1)
 
     def getLink (self):
         links = self.mclient.syncGetObjects (self.mch, "link")
@@ -117,20 +117,20 @@
 
         if link == None:
             print "Protocol Error - Missing link ID"
-            exit (1)
+            sys.exit (1)
 
         bridges = mc.syncGetObjects (self.mch, "bridge")
         for bridge in bridges:
             if bridge.linkRef == link.id and bridge.dest == exchange and 
bridge.key == routingKey:
                 if not _quiet:
                     print "Duplicate Route - ignoring: %s(%s)" % (exchange, 
routingKey)
-                    exit (1)
-                exit (0)
+                    sys.exit (1)
+                sys.exit (0)
 
         if _verbose:
             print "Creating inter-broker binding..."
         bridgeArgs = {}
-        bridgeArgs["src"]          = "src"
+        bridgeArgs["src"]          = exchange
         bridgeArgs["dest"]         = exchange
         bridgeArgs["key"]          = routingKey
         bridgeArgs["src_is_queue"] = 0
@@ -147,8 +147,8 @@
         if link == None:
             if not _quiet:
                 print "No link found from %s to %s" % (self.src.name(), 
self.dest.name())
-                exit (1)
-            exit (0)
+                sys.exit (1)
+            sys.exit (0)
 
         bridges = mc.syncGetObjects (self.mch, "bridge")
         for bridge in bridges:
@@ -158,18 +158,18 @@
                 res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, 
"close")
                 if res.status != 0:
                     print "Error closing bridge: %d - %s" % (res.status, 
res.statusText)
-                    exit (1)
+                    sys.exit (1)
                 if len (bridges) == 1:
                     if _verbose:
                         print "Last bridge on link, closing link..."
                     res = mc.syncCallMethod (self.mch, link.id, link.classKey, 
"close")
                     if res.status != 0:
                         print "Error closing link: %d - %s" % (res.status, 
res.statusText)
-                        exit (1)
-                exit (0)
+                        sys.exit (1)
+                sys.exit (0)
         if not _quiet:
             print "Route not found"
-            exit (1)
+            sys.exit (1)
 
     def ListRoutes (self):
         mc = self.mclient


Reply via email to