Author: gsim
Date: Fri Feb  1 10:21:01 2008
New Revision: 617590

URL: http://svn.apache.org/viewvc?rev=617590&view=rev
Log:
Initial cut of inter-broker bridging


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Feb  1 
10:21:01 2008
@@ -242,7 +242,7 @@
 }
 
 Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
-                                               Args&    /*_args*/)
+                                               Args&    args)
 {
     Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
 
@@ -253,6 +253,10 @@
     case management::Broker::METHOD_ECHO :
         status = Manageable::STATUS_OK;
         break;
+    case management::Broker::METHOD_CONNECT :
+        connect(dynamic_cast<management::ArgsBrokerConnect&>(args));
+        status = Manageable::STATUS_OK;
+        break;
 
     case management::Broker::METHOD_JOINCLUSTER :
     case management::Broker::METHOD_LEAVECLUSTER :
@@ -261,6 +265,11 @@
     }
 
     return status;
+}
+
+void Broker::connect(management::ArgsBrokerConnect& args)
+{
+    getAcceptor().connect(args.i_host, args.i_port, &factory);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Feb  1 10:21:01 
2008
@@ -34,6 +34,7 @@
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/management/Broker.h"
+#include "qpid/management/ArgsBrokerConnect.h"
 #include "qpid/Options.h"
 #include "qpid/Plugin.h"
 #include "qpid/framing/FrameHandler.h"
@@ -123,6 +124,7 @@
     Vhost::shared_ptr              vhostObject;
 
     void declareStandardExchange(const std::string& name, const std::string& 
type);
+    void connect(management::ArgsBrokerConnect& args);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Feb  1 
10:21:01 2008
@@ -21,14 +21,18 @@
 #include "Connection.h"
 #include "SessionState.h"
 #include "BrokerAdapter.h"
+#include "Bridge.h"
 #include "SemanticHandler.h"
 
 #include "qpid/log/Statement.h"
 #include "qpid/ptr_map.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/management/ManagementAgent.h"
+#include "qpid/management/ArgsLinkBind.h"
+#include "qpid/management/ArgsLinkPull.h"
 
 #include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
 
 #include <algorithm>
 #include <iostream>
@@ -47,7 +51,43 @@
 namespace qpid {
 namespace broker {
 
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const 
std::string& mgmtId) :
+class Connection::MgmtClient : public Connection::MgmtWrapper
+{
+    management::Client::shared_ptr mgmtClient;
+
+public:
+    MgmtClient(Connection* conn, Manageable* parent, 
ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+    ~MgmtClient();
+    void received(framing::AMQFrame& frame);
+    management::ManagementObject::shared_ptr getManagementObject() const;
+    void closing();
+};
+
+class Connection::MgmtLink : public Connection::MgmtWrapper
+{
+    typedef boost::ptr_vector<Bridge> Bridges;
+
+    management::Link::shared_ptr mgmtLink;
+    Bridges created;//holds list of bridges pending creation
+    Bridges cancelled;//holds list of bridges pending cancellation
+    Bridges active;//holds active bridges
+    uint channelCounter;
+    sys::Mutex lock;
+
+    void cancel(Bridge*);
+
+public:
+    MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr 
agent, const std::string& mgmtId);
+    ~MgmtLink();
+    void received(framing::AMQFrame& frame);
+    management::ManagementObject::shared_ptr getManagementObject() const;
+    void closing();
+    void processPending();
+    void process(Connection& connection, const management::Args& args);
+};
+
+
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const 
std::string& mgmtId_) :
     broker(broker_),
     outputTasks(*out_),
     out(out_),
@@ -56,7 +96,11 @@
     client(0),
     stagingThreshold(broker.getStagingThreshold()),
     adapter(*this),
-    mgmtClosing(0)
+    mgmtClosing(0),
+    mgmtId(mgmtId_)
+{}
+
+void Connection::initMgmt(bool asLink)
 {
     Manageable* parent = broker.GetVhostObject ();
 
@@ -66,18 +110,16 @@
 
         if (agent.get () != 0)
         {
-            mgmtObject = management::Client::shared_ptr
-                (new management::Client (this, parent, mgmtId));
-            agent->addObject (mgmtObject);
+            if (asLink) {
+                mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, 
parent, agent, mgmtId));
+            } else {
+                mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, 
parent, agent, mgmtId));
+            }
         }
     }
 }
 
-Connection::~Connection ()
-{
-    if (mgmtObject.get () != 0)
-        mgmtObject->resourceDestroy ();
-}
+Connection::~Connection () {}
 
 void Connection::received(framing::AMQFrame& frame){
     if (mgmtClosing)
@@ -88,12 +130,8 @@
     } else {
         getChannel(frame.getChannel()).in(frame);
     }
-
-    if (mgmtObject.get () != 0)
-    {
-        mgmtObject->inc_framesFromClient ();
-        mgmtObject->inc_bytesFromClient (frame.size ());
-    }
+    
+    if (mgmtWrapper.get()) mgmtWrapper->received(frame);
 }
 
 void Connection::close(
@@ -107,6 +145,7 @@
 void Connection::initiated(const framing::ProtocolInitiation& header) {
     version = ProtocolVersion(header.getMajor(), header.getMinor());
     adapter.init(header);
+    initMgmt();
 }
 
 void Connection::idleOut(){}
@@ -133,8 +172,12 @@
 }
 
 bool Connection::doOutput()
-{
+{    
     try{
+        //process any pending mgmt commands:
+        if (mgmtWrapper.get()) mgmtWrapper->processPending();
+
+        //then do other output as needed:
         return outputTasks.doOutput();
     }catch(ConnectionException& e){
         close(e.code, e.what(), 0, 0);
@@ -159,11 +202,11 @@
 
 ManagementObject::shared_ptr Connection::GetManagementObject (void) const
 {
-    return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+    return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : 
ManagementObject::shared_ptr();
 }
 
 Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
-                                                   Args&    /*args*/)
+                                                   Args&    args)
 {
     Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
 
@@ -173,7 +216,13 @@
     {
     case management::Client::METHOD_CLOSE :
         mgmtClosing = 1;
-        mgmtObject->set_closing (1);
+        if (mgmtWrapper.get()) mgmtWrapper->closing();
+        status = Manageable::STATUS_OK;
+        break;
+    case management::Link::METHOD_BRIDGE :
+        //queue this up and request chance to do output (i.e. get connections 
thread of control):
+        mgmtWrapper->process(*this, args);
+        out->activateOutput();
         status = Manageable::STATUS_OK;
         break;
     }
@@ -190,6 +239,107 @@
 const string& Connection::getUserId() const
 {
     return userId;
+}
+
+Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, 
ManagementAgent::shared_ptr agent, const std::string& mgmtId) 
+    : channelCounter(1)
+{
+    mgmtLink = management::Link::shared_ptr
+        (new management::Link(conn, parent, mgmtId));
+    agent->addObject (mgmtLink);
+}
+
+Connection::MgmtLink::~MgmtLink()
+{
+    if (mgmtLink.get () != 0)
+        mgmtLink->resourceDestroy ();
+}
+
+void Connection::MgmtLink::received(framing::AMQFrame& frame)
+{
+    if (mgmtLink.get () != 0)
+    {
+        mgmtLink->inc_framesFromPeer ();
+        mgmtLink->inc_bytesFromPeer (frame.size ());
+    }
+}
+
+management::ManagementObject::shared_ptr 
Connection::MgmtLink::getManagementObject() const
+{
+    return dynamic_pointer_cast<ManagementObject>(mgmtLink);
+}
+
+void Connection::MgmtLink::closing()
+{
+    if (mgmtLink) mgmtLink->set_closing (1);
+}
+
+void Connection::MgmtLink::processPending()
+{
+    //process any pending creates
+    if (!created.empty()) {
+        for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+            i->create();
+        }
+        active.transfer(active.end(), created.begin(), created.end(), created);
+    }
+    if (!cancelled.empty()) {
+        //process any pending cancellations
+        for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); 
++i) {
+            i->cancel();
+        }
+        cancelled.clear();
+    }
+}
+
+void Connection::MgmtLink::process(Connection& connection, const 
management::Args& args)
+{   
+    created.push_back(new Bridge(channelCounter++, connection, 
+                                 boost::bind(&MgmtLink::cancel, this, _1),
+                                 dynamic_cast<const 
management::ArgsLinkBridge&>(args)));
+}
+
+void Connection::MgmtLink::cancel(Bridge* b)
+{   
+    //need to take this out the active map and add it to the cancelled map
+    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+        if (&(*i) == b) {
+            cancelled.transfer(cancelled.end(), i, active);
+            break;
+        }
+    }
+}
+
+Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, 
ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+{
+    mgmtClient = management::Client::shared_ptr
+        (new management::Client (conn, parent, mgmtId));
+    agent->addObject (mgmtClient);
+}
+
+Connection::MgmtClient::~MgmtClient()
+{
+    if (mgmtClient.get () != 0)
+        mgmtClient->resourceDestroy ();
+}
+
+void Connection::MgmtClient::received(framing::AMQFrame& frame)
+{
+    if (mgmtClient.get () != 0)
+    {
+        mgmtClient->inc_framesFromClient ();
+        mgmtClient->inc_bytesFromClient (frame.size ());
+    }
+}
+
+management::ManagementObject::shared_ptr 
Connection::MgmtClient::getManagementObject() const
+{
+    return dynamic_pointer_cast<ManagementObject>(mgmtClient);
+}
+
+void Connection::MgmtClient::closing()
+{
+    if (mgmtClient) mgmtClient->set_closing (1);
 }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Feb  1 
10:21:01 2008
@@ -21,6 +21,7 @@
 #ifndef _Connection_
 #define _Connection_
 
+#include <memory>
 #include <sstream>
 #include <vector>
 
@@ -41,6 +42,7 @@
 #include "SessionHandler.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/Client.h"
+#include "qpid/management/Link.h"
 
 #include <boost/ptr_container/ptr_map.hpp>
 
@@ -87,6 +89,7 @@
     void idleIn();
     void closed();
     bool doOutput();
+    framing::ProtocolInitiation getInitiation() { return 
framing::ProtocolInitiation(version); }
 
     void closeChannel(framing::ChannelId channel);
 
@@ -98,10 +101,31 @@
     void setUserId(const string& uid);
     const string& getUserId() const;
 
+    void initMgmt(bool asLink = false);
+
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
 
+    /**
+     * Connection may appear, for the purposes of management, as a
+     * normal client initiated connection or as an agent initiated
+     * inter-broker link. This wrapper abstracts the common interface
+     * for both.
+     */
+    class MgmtWrapper
+    {
+    public:
+        virtual ~MgmtWrapper(){}
+        virtual void received(framing::AMQFrame& frame) = 0;
+        virtual management::ManagementObject::shared_ptr getManagementObject() 
const = 0;
+        virtual void closing() = 0;
+        virtual void processPending(){}
+        virtual void process(Connection&, const management::Args&){}
+    };
+    class MgmtClient;
+    class MgmtLink;
+
     framing::ProtocolVersion version;
     ChannelMap channels;
     sys::ConnectionOutputHandler* out;
@@ -110,9 +134,10 @@
     framing::AMQP_ClientProxy::Connection* client;
     uint64_t stagingThreshold;
     ConnectionHandler adapter;
-    management::Client::shared_ptr mgmtObject;
+    std::auto_ptr<MgmtWrapper> mgmtWrapper;
     bool mgmtClosing;
     string userId;
+    const std::string mgmtId;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Fri Feb 
 1 10:21:01 2008
@@ -23,6 +23,7 @@
 #include "ConnectionHandler.h"
 #include "Connection.h"
 #include "qpid/framing/ConnectionStartBody.h"
+#include "qpid/framing/ClientInvoker.h"
 #include "qpid/framing/ServerInvoker.h"
 
 using namespace qpid;
@@ -40,6 +41,7 @@
     FieldTable properties;
     string mechanisms(PLAIN);
     string locales(en_US);
+    handler->serverMode = true;
     handler->client.start(header.getMajor(), header.getMinor(), properties, 
mechanisms, locales);
 }
 
@@ -52,8 +54,13 @@
 {
     AMQMethodBody* method=frame.getBody()->getMethod();
     try{
-        if (!invoke(*handler.get(), *method))
-            throw ChannelErrorException(QPID_MSG("Class can't be accessed over 
channel 0"));
+        if (handler->serverMode) {
+            if 
(!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()),
 *method))
+                throw ChannelErrorException(QPID_MSG("Class can't be accessed 
over channel 0"));
+        } else {
+            if 
(!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()),
 *method))
+                throw ChannelErrorException(QPID_MSG("Class can't be accessed 
over channel 0"));
+        }
     }catch(ConnectionException& e){
         handler->client.close(e.code, e.what(), method->amqpClassId(), 
method->amqpMethodId());
     }catch(std::exception& e){
@@ -63,9 +70,10 @@
 
 ConnectionHandler::ConnectionHandler(Connection& connection)  : handler(new 
Handler(connection)) {}
 
-ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), 
connection(c) {}
+ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), 
server(c.getOutput()), 
+                                                      connection(c), 
serverMode(false) {}
 
-void ConnectionHandler::Handler::startOk(const FieldTable& 
/*clientProperties*/,
+void ConnectionHandler::Handler::startOk(const framing::FieldTable& 
/*clientProperties*/,
     const string& mechanism, 
     const string& response, const string& /*locale*/)
 {
@@ -110,3 +118,41 @@
 void ConnectionHandler::Handler::closeOk(){
     connection.getOutput().close();
 } 
+
+
+void ConnectionHandler::Handler::start(uint8_t /*versionMajor*/,
+                                       uint8_t /*versionMinor*/,
+                                       const FieldTable& /*serverProperties*/,
+                                       const string& /*mechanisms*/,
+                                       const string& /*locales*/)
+{
+    string uid = "qpidd";
+    string pwd = "qpidd";
+    string response = ((char)0) + uid + ((char)0) + pwd;
+    server.startOk(FieldTable(), PLAIN, response, en_US);
+    connection.initMgmt(true);
+}
+
+void ConnectionHandler::Handler::secure(const string& /*challenge*/)
+{
+    server.secureOk("");
+}
+
+void ConnectionHandler::Handler::tune(uint16_t channelMax,
+                                      uint32_t frameMax,
+                                      uint16_t heartbeat)
+{
+    connection.setFrameMax(frameMax);
+    connection.setHeartbeat(heartbeat);
+    server.tuneOk(channelMax, frameMax, heartbeat);
+    server.open("/", "", true);
+}
+
+void ConnectionHandler::Handler::openOk(const string& /*knownHosts*/)
+{
+}
+
+void ConnectionHandler::Handler::redirect(const string& /*host*/, const 
string& /*knownHosts*/)
+{
+    
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h Fri Feb  
1 10:21:01 2008
@@ -24,8 +24,10 @@
 #include <memory>
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/framing/ProtocolVersion.h"
@@ -39,10 +41,13 @@
 // TODO aconway 2007-09-18: Rename to ConnectionHandler
 class ConnectionHandler : public framing::FrameHandler
 {
-    struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
+    struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler, 
+        public framing::AMQP_ClientOperations::ConnectionHandler
     {
         framing::AMQP_ClientProxy::Connection client;
+        framing::AMQP_ServerProxy::Connection server;
         Connection& connection;
+        bool serverMode;
     
         Handler(Connection& connection);
         void startOk(const qpid::framing::FieldTable& clientProperties,
@@ -55,6 +60,23 @@
         void close(uint16_t replyCode, const std::string& replyText,
                    uint16_t classId, uint16_t methodId); 
         void closeOk(); 
+
+
+        void start(uint8_t versionMajor,
+                   uint8_t versionMinor,
+                   const qpid::framing::FieldTable& serverProperties,
+                   const std::string& mechanisms,
+                   const std::string& locales);
+        
+        void secure(const std::string& challenge);
+        
+        void tune(uint16_t channelMax,
+                  uint32_t frameMax,
+                  uint16_t heartbeat);
+        
+        void openOk(const std::string& knownHosts);
+        
+        void redirect(const std::string& host, const std::string& knownHosts); 
       
     };
     std::auto_ptr<Handler> handler;
   public:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Feb  1 
10:21:01 2008
@@ -23,6 +23,7 @@
 #include "Connection.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
 #include "qpid/framing/ServerInvoker.h"
 #include "qpid/log/Statement.h"
 
@@ -57,17 +58,19 @@
     //
     AMQMethodBody* m = f.getBody()->getMethod();
     try {
-        if (m && invoke(*this, *m))
+        if (m && 
invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
             return;
-        else if (session.get()) {
+        } else if (session.get()) {
             boost::optional<SequenceNumber> ack=session->received(f);
             session->in.handle(f);
             if (ack)
                 peerSession.ack(*ack, SequenceNumberSet());
-        }
-        else if (!ignoring)
+        } else if (m && 
invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+            return;
+        } else if (!ignoring) {
             throw ChannelErrorException(
                 QPID_MSG("Channel " << channel.get() << " is not open"));
+        }
     } catch(const ChannelException& e) {
         ignoring=true;          // Ignore trailing frames sent by client.
         session->detach();
@@ -186,6 +189,19 @@
 void  SessionHandler::solicitAck() {
     assertAttached("solicit-ack");
     peerSession.ack(session->sendingAck(), SequenceNumberSet());    
+}
+
+void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t 
detachedLifetime)
+{
+    std::auto_ptr<SessionState> state(
+        connection.broker.getSessionManager().open(*this, detachedLifetime));
+    session.reset(state.release());
+}
+
+void SessionHandler::detached()
+{
+    connection.broker.getSessionManager().suspend(session);
+    session.reset();
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Fri Feb  1 
10:21:01 2008
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/framing/amqp_types.h"
@@ -43,6 +44,7 @@
  */
 class SessionHandler : public framing::FrameHandler::InOutHandler,
                        public framing::AMQP_ServerOperations::SessionHandler,
+                       public framing::AMQP_ClientOperations::SessionHandler,
                        private boost::noncopyable
 {
   public:
@@ -81,11 +83,16 @@
              const framing::SequenceNumberSet& seenFrameSet);
     void highWaterMark(uint32_t lastSentMark);
     void solicitAck();
+    
+    //extra methods required for assuming client role
+    void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
+    void detached();
 
 
     void assertAttached(const char* method) const;
     void assertActive(const char* method) const;
     void assertClosed(const char* method) const;
+
 
     Connection& connection;
     framing::ChannelHandler channel;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h Fri Feb  1 10:21:01 
2008
@@ -38,6 +38,7 @@
     virtual uint16_t getPort() const = 0;
     virtual std::string getHost() const = 0;
     virtual void run(ConnectionInputHandlerFactory* factory) = 0;
+    virtual void connect(const std::string& host, int16_t port, 
ConnectionInputHandlerFactory* factory) = 0;
     virtual void shutdown() = 0;
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Fri Feb  1 
10:21:01 2008
@@ -30,6 +30,7 @@
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionInputHandlerFactory.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/AMQDataBlock.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/log/Statement.h"
@@ -53,6 +54,7 @@
     AsynchIOAcceptor(int16_t port, int backlog, int threads);
     ~AsynchIOAcceptor() {}
     void run(ConnectionInputHandlerFactory* factory);
+    void connect(const std::string& host, int16_t port, 
ConnectionInputHandlerFactory* factory);
     void shutdown();
         
     uint16_t getPort() const;
@@ -92,13 +94,17 @@
     bool initiated;
     bool readError;
     std::string identifier;
+    bool isClient;
+
+    void write(const framing::AMQDataBlock&);
 
   public:
     AsynchIOHandler() :
         inputHandler(0),
         frameQueueClosed(false),
         initiated(false),
-        readError(false)
+        readError(false),
+        isClient(false)
     {}
        
     ~AsynchIOHandler() {
@@ -107,6 +113,8 @@
         delete inputHandler;
     }
 
+    void setClient() { isClient = true; }
+
     void init(AsynchIO* a, ConnectionInputHandler* h) {
         aio = a;
         inputHandler = h;
@@ -179,11 +187,48 @@
         t[i].join();
     }
 }
+    
+void AsynchIOAcceptor::connect(const std::string& host, int16_t port, 
ConnectionInputHandlerFactory* f)
+{
+    Socket* socket = new Socket();//Should be deleted by handle when socket 
closes
+    socket->connect(host, port);
+    AsynchIOHandler* async = new AsynchIOHandler; 
+    async->setClient();
+    ConnectionInputHandler* handler = f->create(async, *socket);
+    AsynchIO* aio = new AsynchIO(*socket,
+                                 boost::bind(&AsynchIOHandler::readbuff, 
async, _1, _2),
+                                 boost::bind(&AsynchIOHandler::eof, async, _1),
+                                 boost::bind(&AsynchIOHandler::disconnect, 
async, _1),
+                                 boost::bind(&AsynchIOHandler::closedSocket, 
async, _1, _2),
+                                 boost::bind(&AsynchIOHandler::nobuffs, async, 
_1),
+                                 boost::bind(&AsynchIOHandler::idle, async, 
_1));
+    async->init(aio, handler);
+
+    // Give connection some buffers to use
+    for (int i = 0; i < 4; i++) {
+        aio->queueReadBuffer(new Buff);
+    }
+    aio->start(poller);
+
+}
+
 
 void AsynchIOAcceptor::shutdown() {
     poller->shutdown();
 }
 
+
+void AsynchIOHandler::write(const framing::AMQDataBlock& data)
+{
+    AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+    if (!buff)
+        buff = new Buff;
+    framing::Buffer out(buff->bytes, buff->byteCount);
+    data.encode(out);
+    buff->dataCount = data.size();
+    aio->queueWrite(buff);
+}
+
 // Output side
 void AsynchIOHandler::send(framing::AMQFrame& frame) {
     // TODO: Need to find out if we are in the callback context,
@@ -274,6 +319,12 @@
 }
 
 void AsynchIOHandler::idle(AsynchIO&){
+    if (isClient && !initiated) {
+        //get & write protocol header from upper layers
+        write(inputHandler->getInitiation());
+        initiated = true;
+        return;
+    }
     ScopedLock<Mutex> l(frameQueueLock);
        
     if (frameQueue.empty()) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h Fri Feb 
 1 10:21:01 2008
@@ -36,6 +36,7 @@
         public TimeoutHandler, public OutputTask
     {
     public:
+        virtual qpid::framing::ProtocolInitiation getInitiation() = 0;
         virtual void closed() = 0;
     };
 

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Fri Feb  1 10:21:01 
2008
@@ -82,6 +82,11 @@
       <arg name="sequence" dir="IO" type="uint32" default="0"/>
       <arg name="body"     dir="IO" type="lstr"   default=""/>
     </method>
+
+    <method name="connect" desc="Establish a connection to another broker">
+      <arg name="host"     dir="I" type="sstr"   default=""/>
+      <arg name="port" dir="I" type="uint32" default="0"/>
+    </method>
   </class>
 
   <!--
@@ -191,6 +196,51 @@
 
     <method name="close"/> 
   </class>
+
+  <!--
+  ===============================================================
+  Link
+  ===============================================================
+  -->
+  <class name="link">
+    <configElement name="vhostRef" type="objId"  access="RC" index="y" 
parentRef="y"/>
+    <configElement name="address"  type="sstr"   access="RC" index="y"/>
+
+    <instElement name="closing"          type="bool" desc="This link is 
closing by management request"/>
+    <instElement name="authIdentity"     type="sstr"/>
+    <instElement name="framesFromPeer"   type="count64"/>
+    <instElement name="framesToPeer"     type="count64"/>
+    <instElement name="bytesFromPeer"    type="count64"/>
+    <instElement name="bytesToPeer"      type="count64"/>
+
+    <method name="close"/> 
+
+    <method name="bridge" desc="Bridge messages over the link">
+      <arg name="src"     dir="I" type="sstr"/>
+      <arg name="dest"     dir="I" type="sstr"/>
+      <arg name="key"     dir="I" type="sstr"   default=""/>
+      <arg name="src_is_queue"     dir="I" type="bool"   default="0"/>
+      <arg name="src_is_local"     dir="I" type="bool"   default="0"/>
+    </method>
+  </class>
+
+
+  <!--
+  ===============================================================
+  Bridge
+  ===============================================================
+  -->
+  <class name="bridge">
+    <configElement name="linkRef" type="objId"  access="RC" index="y" 
parentRef="y"/>
+    <configElement name="channelId"        type="uint16" access="RC"  
index="y"/>
+    <configElement name="src"             type="sstr"   access="RC"/>
+    <configElement name="dest"             type="sstr"   access="RC"/>
+    <configElement name="key"             type="sstr"   access="RC"/>
+    <configElement name="src_is_queue"     type="bool"   access="RC"/>
+    <configElement name="src_is_local"     type="bool"   access="RC"/>
+    <method name="close"/> 
+  </class>
+
 
   <!--
   ===============================================================


Reply via email to