Author: tross
Date: Fri Oct 10 12:24:40 2008
New Revision: 703561

URL: http://svn.apache.org/viewvc?rev=703561&view=rev
Log:
QPID-1349 - Push routing for federation (includes hook for dynamic routing)

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
    incubator/qpid/trunk/qpid/python/commands/qpid-route
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=703561&r1=703560&r2=703561&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Fri Oct 10 
12:24:40 2008
@@ -20,6 +20,7 @@
  */
 #include "Bridge.h"
 #include "ConnectionState.h"
+#include "Connection.h"
 #include "LinkRegistry.h"
 
 #include "qpid/agent/ManagementAgent.h"
@@ -36,6 +37,11 @@
 namespace qpid {
 namespace broker {
 
+void Bridge::PushHandler::handle(framing::AMQFrame& frame)
+{
+    conn->received(frame);
+}
+
 Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
                const _qmf::ArgsLinkBridge& _args) : 
     link(_link), id(_id), args(_args), mgmtObject(0),
@@ -46,7 +52,7 @@
         mgmtObject = new _qmf::Bridge
             (agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
              args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
-             args.i_tag, args.i_excludes);
+             args.i_tag, args.i_excludes, args.i_dynamic);
         if (!args.i_durable)
             agent->addObject(mgmtObject);
     }
@@ -59,39 +65,47 @@
 
 void Bridge::create(ConnectionState& c)
 {
-    channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+    if (args.i_srcIsLocal) {
+        // Point the bridging commands at the local connection handler
+        Connection* conn = dynamic_cast<Connection*>(&c);
+        if (conn == 0)
+            return;
+        pushHandler.reset(new PushHandler(conn));
+        channelHandler.reset(new framing::ChannelHandler(id, 
pushHandler.get()));
+    } else {
+        // Point the bridging commands at the remote peer broker
+        channelHandler.reset(new framing::ChannelHandler(id, 
&(c.getOutput())));
+    }
+
     session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
     peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
 
     session->attach(name, false);
     session->commandPoint(0,0);
-
-    if (args.i_srcIsLocal) {
-        //TODO: handle 'push' here... simplest way is to create frames and 
pass them to Connection::received()
+        
+    if (args.i_srcIsQueue) {
+        peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 
0, FieldTable());
+        peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+        peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
     } else {
-        if (args.i_srcIsQueue) {
-            peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, 
"", 0, FieldTable());
-            peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
-            peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-        } else {
-            string queue = "bridge_queue_";
-            queue += Uuid(true).str();
-            FieldTable queueSettings;
-            if (args.i_tag.size()) {
-                queueSettings.setString("qpid.trace.id", args.i_tag);
-            }
-            if (args.i_excludes.size()) {
-                queueSettings.setString("qpid.trace.exclude", args.i_excludes);
-            }
-
-            bool durable = false;//should this be an arg, or would be use 
srcIsQueue for durable queues?
-            bool autoDelete = !durable;//auto delete transient queues?
-            peer->getQueue().declare(queue, "", false, durable, true, 
autoDelete, queueSettings);
-            peer->getExchange().bind(queue, args.i_src, args.i_key, 
FieldTable());
-            peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 
0, FieldTable());
-            peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
-            peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+        string queue = "bridge_queue_";
+        queue += Uuid(true).str();
+        FieldTable queueSettings;
+        if (args.i_tag.size()) {
+            queueSettings.setString("qpid.trace.id", args.i_tag);
         }
+        if (args.i_excludes.size()) {
+            queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+        }
+
+        bool durable = false;//should this be an arg, or would be use 
srcIsQueue for durable queues?
+        bool autoDelete = !durable;//auto delete transient queues?
+        peer->getQueue().declare(queue, "", false, durable, true, autoDelete, 
queueSettings);
+        if (!args.i_dynamic)
+            peer->getExchange().bind(queue, args.i_src, args.i_key, 
FieldTable());
+        peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, 
FieldTable());
+        peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+        peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
     }
 }
 
@@ -140,9 +154,10 @@
     bool is_local(buffer.getOctet());
     buffer.getShortString(id);
     buffer.getShortString(excludes);
+    bool dynamic(buffer.getOctet());
 
     return links.declare(host, port, durable, src, dest, key,
-                         is_queue, is_local, id, excludes).first;
+                         is_queue, is_local, id, excludes, dynamic).first;
 }
 
 void Bridge::encode(Buffer& buffer) const 
@@ -158,6 +173,7 @@
     buffer.putOctet(args.i_srcIsLocal ? 1 : 0);
     buffer.putShortString(args.i_tag);
     buffer.putShortString(args.i_excludes);
+    buffer.putOctet(args.i_dynamic ? 1 : 0);
 }
 
 uint32_t Bridge::encodedSize() const 
@@ -172,7 +188,8 @@
         + 1                // srcIsQueue
         + 1                // srcIsLocal
         + args.i_tag.size() + 1
-        + args.i_excludes.size() + 1;
+        + args.i_excludes.size() + 1
+        + 1;               // dynamic
 }
 
 management::ManagementObject* Bridge::GetManagementObject (void) const

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=703561&r1=703560&r2=703561&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Fri Oct 10 12:24:40 
2008
@@ -25,6 +25,7 @@
 #include "qpid/framing/AMQP_ServerProxy.h"
 #include "qpid/framing/ChannelHandler.h"
 #include "qpid/framing/Buffer.h"
+#include "qpid/framing/FrameHandler.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
 #include "qmf/org/apache/qpid/broker/Bridge.h"
@@ -35,6 +36,7 @@
 namespace qpid {
 namespace broker {
 
+class Connection;
 class ConnectionState;
 class Link;
 class LinkRegistry;
@@ -68,6 +70,13 @@
     static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& 
buffer);
 
 private:
+    struct PushHandler : framing::FrameHandler {
+        PushHandler(Connection* c) { conn = c; }
+        void handle(framing::AMQFrame& frame);
+        Connection* conn;
+    };
+
+    std::auto_ptr<PushHandler>                        pushHandler;
     std::auto_ptr<framing::ChannelHandler>            channelHandler;
     std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
     std::auto_ptr<framing::AMQP_ServerProxy>          peer;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=703561&r1=703560&r2=703561&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Fri Oct 10 12:24:40 
2008
@@ -353,7 +353,8 @@
         std::pair<Bridge::shared_ptr, bool> result =
             links->declare (host, port, iargs.i_durable, iargs.i_src,
                             iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
-                            iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes);
+                            iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
+                            iargs.i_dynamic);
 
         if (result.second && iargs.i_durable)
             store->create(*result.first);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=703561&r1=703560&r2=703561&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Fri Oct 10 
12:24:40 2008
@@ -91,7 +91,8 @@
                                                      bool         isQueue,
                                                      bool         isLocal,
                                                      std::string& tag,
-                                                     std::string& excludes)
+                                                     std::string& excludes,
+                                                     bool         dynamic)
 {
     Mutex::ScopedLock locker(lock);
     stringstream      keystream;
@@ -119,6 +120,7 @@
         args.i_srcIsLocal = isLocal;
         args.i_tag        = tag;
         args.i_excludes   = excludes;
+        args.i_dynamic    = dynamic;
 
         bridge = Bridge::shared_ptr
             (new Bridge (l->second.get(), l->second->nextChannel(),

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=703561&r1=703560&r2=703561&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Fri Oct 10 
12:24:40 2008
@@ -84,7 +84,8 @@
                     bool         isQueue,
                     bool         isLocal,
                     std::string& id,
-                    std::string& excludes);
+                    std::string& excludes,
+                    bool         dynamic);
 
         void destroy(const std::string& host, const uint16_t port);
         void destroy(const std::string& host,

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=703561&r1=703560&r2=703561&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Fri Oct 10 12:24:40 
2008
@@ -148,6 +148,50 @@
 
         mgmt.shutdown()
 
+    def test_push_to_exchange(self):
+        session = self.session
+        
+        mgmt = Helper(self)
+        broker = mgmt.get_object("broker")
+
+        mgmt.call_method(broker, "connect", {"host":remote_host(), 
"port":remote_port()})
+        link = mgmt.get_object("link")
+        
+        mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", 
"dest":"amq.fanout",
+                                          "key":"my-key", "tag":"", 
"excludes":"", "srcIsQueue":0,
+                                          "srcIsLocal":1})
+        bridge = mgmt.get_object("bridge")
+
+        #setup queue to receive messages from remote broker
+        r_conn = self.connect(host=remote_host(), port=remote_port())
+        r_session = r_conn.session("test_push_to_exchange")
+        r_session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        r_session.exchange_bind(queue="fed1", exchange="amq.fanout")
+        self.subscribe(session=r_session, queue="fed1", destination="f1")
+        queue = r_session.incoming("f1")
+        sleep(6)
+
+        #send messages to local broker and confirm it is routed to remote 
broker
+        for i in range(1, 11):
+            dp = session.delivery_properties(routing_key="my-key")
+            session.message_transfer(destination="amq.direct", 
message=Message(dp, "Message %d" % i))
+
+        for i in range(1, 11):
+            msg = queue.get(timeout=5)
+            self.assertEqual("Message %d" % i, msg.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        mgmt.call_method(bridge, "close")
+        mgmt.call_method(link, "close")
+        sleep(6)
+        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+        self.assertEqual(len(mgmt.get_objects("link")), 0)
+
+        mgmt.shutdown()
+
     def test_pull_from_queue(self):
         session = self.session
 

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=703561&r1=703560&r2=703561&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Fri Oct 10 12:24:40 
2008
@@ -150,7 +150,7 @@
 
         if _verbose:
             print "Creating inter-broker binding..."
-        res = link.bridge(_durable, exchange, exchange, routingKey, tag, 
excludes, 0, 0)
+        res = link.bridge(_durable, exchange, exchange, routingKey, tag, 
excludes, False, False, False)
         if res.status == 4:
             raise Exception("Can't create a durable route on a non-durable 
link")
         if _verbose:

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=703561&r1=703560&r2=703561&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Fri Oct 10 12:24:40 
2008
@@ -245,11 +245,12 @@
       <arg name="durable"     dir="I" type="bool"/>
       <arg name="src"         dir="I" type="sstr"/>
       <arg name="dest"        dir="I" type="sstr"/>
-      <arg name="key"         dir="I" type="sstr" default=""/>
-      <arg name="tag"         dir="I" type="sstr" default=""/>
-      <arg name="excludes"    dir="I" type="sstr" default=""/>
-      <arg name="srcIsQueue"  dir="I" type="bool" default="0"/>
-      <arg name="srcIsLocal"  dir="I" type="bool" default="0"/>
+      <arg name="key"         dir="I" type="sstr"/>
+      <arg name="tag"         dir="I" type="sstr"/>
+      <arg name="excludes"    dir="I" type="sstr"/>
+      <arg name="srcIsQueue"  dir="I" type="bool"/>
+      <arg name="srcIsLocal"  dir="I" type="bool"/>
+      <arg name="dynamic"     dir="I" type="bool"/>
     </method>
   </class>
 
@@ -270,6 +271,7 @@
     <property name="srcIsLocal"  type="bool"   access="RC"/>
     <property name="tag"         type="sstr"   access="RC"/>
     <property name="excludes"    type="sstr"   access="RC"/>
+    <property name="dynamic"     type="bool"   access="RC"/>
     <method name="close"/> 
   </class>
 


Reply via email to