Author: gsim
Date: Fri Feb 1 10:21:01 2008
New Revision: 617590
URL: http://svn.apache.org/viewvc?rev=617590&view=rev
Log:
Initial cut of inter-broker bridging
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
incubator/qpid/trunk/qpid/specs/management-schema.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Feb 1
10:21:01 2008
@@ -242,7 +242,7 @@
}
Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
- Args& /*_args*/)
+ Args& args)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -253,6 +253,10 @@
case management::Broker::METHOD_ECHO :
status = Manageable::STATUS_OK;
break;
+ case management::Broker::METHOD_CONNECT :
+ connect(dynamic_cast<management::ArgsBrokerConnect&>(args));
+ status = Manageable::STATUS_OK;
+ break;
case management::Broker::METHOD_JOINCLUSTER :
case management::Broker::METHOD_LEAVECLUSTER :
@@ -261,6 +265,11 @@
}
return status;
+}
+
+void Broker::connect(management::ArgsBrokerConnect& args)
+{
+ getAcceptor().connect(args.i_host, args.i_port, &factory);
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Feb 1 10:21:01
2008
@@ -34,6 +34,7 @@
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/management/Broker.h"
+#include "qpid/management/ArgsBrokerConnect.h"
#include "qpid/Options.h"
#include "qpid/Plugin.h"
#include "qpid/framing/FrameHandler.h"
@@ -123,6 +124,7 @@
Vhost::shared_ptr vhostObject;
void declareStandardExchange(const std::string& name, const std::string&
type);
+ void connect(management::ArgsBrokerConnect& args);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Feb 1
10:21:01 2008
@@ -21,14 +21,18 @@
#include "Connection.h"
#include "SessionState.h"
#include "BrokerAdapter.h"
+#include "Bridge.h"
#include "SemanticHandler.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/ArgsLinkBind.h"
+#include "qpid/management/ArgsLinkPull.h"
#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
#include <algorithm>
#include <iostream>
@@ -47,7 +51,43 @@
namespace qpid {
namespace broker {
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const
std::string& mgmtId) :
+class Connection::MgmtClient : public Connection::MgmtWrapper
+{
+ management::Client::shared_ptr mgmtClient;
+
+public:
+ MgmtClient(Connection* conn, Manageable* parent,
ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ ~MgmtClient();
+ void received(framing::AMQFrame& frame);
+ management::ManagementObject::shared_ptr getManagementObject() const;
+ void closing();
+};
+
+class Connection::MgmtLink : public Connection::MgmtWrapper
+{
+ typedef boost::ptr_vector<Bridge> Bridges;
+
+ management::Link::shared_ptr mgmtLink;
+ Bridges created;//holds list of bridges pending creation
+ Bridges cancelled;//holds list of bridges pending cancellation
+ Bridges active;//holds active bridges
+ uint channelCounter;
+ sys::Mutex lock;
+
+ void cancel(Bridge*);
+
+public:
+ MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr
agent, const std::string& mgmtId);
+ ~MgmtLink();
+ void received(framing::AMQFrame& frame);
+ management::ManagementObject::shared_ptr getManagementObject() const;
+ void closing();
+ void processPending();
+ void process(Connection& connection, const management::Args& args);
+};
+
+
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const
std::string& mgmtId_) :
broker(broker_),
outputTasks(*out_),
out(out_),
@@ -56,7 +96,11 @@
client(0),
stagingThreshold(broker.getStagingThreshold()),
adapter(*this),
- mgmtClosing(0)
+ mgmtClosing(0),
+ mgmtId(mgmtId_)
+{}
+
+void Connection::initMgmt(bool asLink)
{
Manageable* parent = broker.GetVhostObject ();
@@ -66,18 +110,16 @@
if (agent.get () != 0)
{
- mgmtObject = management::Client::shared_ptr
- (new management::Client (this, parent, mgmtId));
- agent->addObject (mgmtObject);
+ if (asLink) {
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this,
parent, agent, mgmtId));
+ } else {
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this,
parent, agent, mgmtId));
+ }
}
}
}
-Connection::~Connection ()
-{
- if (mgmtObject.get () != 0)
- mgmtObject->resourceDestroy ();
-}
+Connection::~Connection () {}
void Connection::received(framing::AMQFrame& frame){
if (mgmtClosing)
@@ -88,12 +130,8 @@
} else {
getChannel(frame.getChannel()).in(frame);
}
-
- if (mgmtObject.get () != 0)
- {
- mgmtObject->inc_framesFromClient ();
- mgmtObject->inc_bytesFromClient (frame.size ());
- }
+
+ if (mgmtWrapper.get()) mgmtWrapper->received(frame);
}
void Connection::close(
@@ -107,6 +145,7 @@
void Connection::initiated(const framing::ProtocolInitiation& header) {
version = ProtocolVersion(header.getMajor(), header.getMinor());
adapter.init(header);
+ initMgmt();
}
void Connection::idleOut(){}
@@ -133,8 +172,12 @@
}
bool Connection::doOutput()
-{
+{
try{
+ //process any pending mgmt commands:
+ if (mgmtWrapper.get()) mgmtWrapper->processPending();
+
+ //then do other output as needed:
return outputTasks.doOutput();
}catch(ConnectionException& e){
close(e.code, e.what(), 0, 0);
@@ -159,11 +202,11 @@
ManagementObject::shared_ptr Connection::GetManagementObject (void) const
{
- return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+ return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() :
ManagementObject::shared_ptr();
}
Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
- Args& /*args*/)
+ Args& args)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -173,7 +216,13 @@
{
case management::Client::METHOD_CLOSE :
mgmtClosing = 1;
- mgmtObject->set_closing (1);
+ if (mgmtWrapper.get()) mgmtWrapper->closing();
+ status = Manageable::STATUS_OK;
+ break;
+ case management::Link::METHOD_BRIDGE :
+ //queue this up and request chance to do output (i.e. get connections
thread of control):
+ mgmtWrapper->process(*this, args);
+ out->activateOutput();
status = Manageable::STATUS_OK;
break;
}
@@ -190,6 +239,107 @@
const string& Connection::getUserId() const
{
return userId;
+}
+
+Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent,
ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+ : channelCounter(1)
+{
+ mgmtLink = management::Link::shared_ptr
+ (new management::Link(conn, parent, mgmtId));
+ agent->addObject (mgmtLink);
+}
+
+Connection::MgmtLink::~MgmtLink()
+{
+ if (mgmtLink.get () != 0)
+ mgmtLink->resourceDestroy ();
+}
+
+void Connection::MgmtLink::received(framing::AMQFrame& frame)
+{
+ if (mgmtLink.get () != 0)
+ {
+ mgmtLink->inc_framesFromPeer ();
+ mgmtLink->inc_bytesFromPeer (frame.size ());
+ }
+}
+
+management::ManagementObject::shared_ptr
Connection::MgmtLink::getManagementObject() const
+{
+ return dynamic_pointer_cast<ManagementObject>(mgmtLink);
+}
+
+void Connection::MgmtLink::closing()
+{
+ if (mgmtLink) mgmtLink->set_closing (1);
+}
+
+void Connection::MgmtLink::processPending()
+{
+ //process any pending creates
+ if (!created.empty()) {
+ for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+ i->create();
+ }
+ active.transfer(active.end(), created.begin(), created.end(), created);
+ }
+ if (!cancelled.empty()) {
+ //process any pending cancellations
+ for (Bridges::iterator i = cancelled.begin(); i != cancelled.end();
++i) {
+ i->cancel();
+ }
+ cancelled.clear();
+ }
+}
+
+void Connection::MgmtLink::process(Connection& connection, const
management::Args& args)
+{
+ created.push_back(new Bridge(channelCounter++, connection,
+ boost::bind(&MgmtLink::cancel, this, _1),
+ dynamic_cast<const
management::ArgsLinkBridge&>(args)));
+}
+
+void Connection::MgmtLink::cancel(Bridge* b)
+{
+ //need to take this out the active map and add it to the cancelled map
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if (&(*i) == b) {
+ cancelled.transfer(cancelled.end(), i, active);
+ break;
+ }
+ }
+}
+
+Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent,
ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+{
+ mgmtClient = management::Client::shared_ptr
+ (new management::Client (conn, parent, mgmtId));
+ agent->addObject (mgmtClient);
+}
+
+Connection::MgmtClient::~MgmtClient()
+{
+ if (mgmtClient.get () != 0)
+ mgmtClient->resourceDestroy ();
+}
+
+void Connection::MgmtClient::received(framing::AMQFrame& frame)
+{
+ if (mgmtClient.get () != 0)
+ {
+ mgmtClient->inc_framesFromClient ();
+ mgmtClient->inc_bytesFromClient (frame.size ());
+ }
+}
+
+management::ManagementObject::shared_ptr
Connection::MgmtClient::getManagementObject() const
+{
+ return dynamic_pointer_cast<ManagementObject>(mgmtClient);
+}
+
+void Connection::MgmtClient::closing()
+{
+ if (mgmtClient) mgmtClient->set_closing (1);
}
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Feb 1
10:21:01 2008
@@ -21,6 +21,7 @@
#ifndef _Connection_
#define _Connection_
+#include <memory>
#include <sstream>
#include <vector>
@@ -41,6 +42,7 @@
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Client.h"
+#include "qpid/management/Link.h"
#include <boost/ptr_container/ptr_map.hpp>
@@ -87,6 +89,7 @@
void idleIn();
void closed();
bool doOutput();
+ framing::ProtocolInitiation getInitiation() { return
framing::ProtocolInitiation(version); }
void closeChannel(framing::ChannelId channel);
@@ -98,10 +101,31 @@
void setUserId(const string& uid);
const string& getUserId() const;
+ void initMgmt(bool asLink = false);
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+ /**
+ * Connection may appear, for the purposes of management, as a
+ * normal client initiated connection or as an agent initiated
+ * inter-broker link. This wrapper abstracts the common interface
+ * for both.
+ */
+ class MgmtWrapper
+ {
+ public:
+ virtual ~MgmtWrapper(){}
+ virtual void received(framing::AMQFrame& frame) = 0;
+ virtual management::ManagementObject::shared_ptr getManagementObject()
const = 0;
+ virtual void closing() = 0;
+ virtual void processPending(){}
+ virtual void process(Connection&, const management::Args&){}
+ };
+ class MgmtClient;
+ class MgmtLink;
+
framing::ProtocolVersion version;
ChannelMap channels;
sys::ConnectionOutputHandler* out;
@@ -110,9 +134,10 @@
framing::AMQP_ClientProxy::Connection* client;
uint64_t stagingThreshold;
ConnectionHandler adapter;
- management::Client::shared_ptr mgmtObject;
+ std::auto_ptr<MgmtWrapper> mgmtWrapper;
bool mgmtClosing;
string userId;
+ const std::string mgmtId;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Fri Feb
1 10:21:01 2008
@@ -23,6 +23,7 @@
#include "ConnectionHandler.h"
#include "Connection.h"
#include "qpid/framing/ConnectionStartBody.h"
+#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
using namespace qpid;
@@ -40,6 +41,7 @@
FieldTable properties;
string mechanisms(PLAIN);
string locales(en_US);
+ handler->serverMode = true;
handler->client.start(header.getMajor(), header.getMinor(), properties,
mechanisms, locales);
}
@@ -52,8 +54,13 @@
{
AMQMethodBody* method=frame.getBody()->getMethod();
try{
- if (!invoke(*handler.get(), *method))
- throw ChannelErrorException(QPID_MSG("Class can't be accessed over
channel 0"));
+ if (handler->serverMode) {
+ if
(!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()),
*method))
+ throw ChannelErrorException(QPID_MSG("Class can't be accessed
over channel 0"));
+ } else {
+ if
(!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()),
*method))
+ throw ChannelErrorException(QPID_MSG("Class can't be accessed
over channel 0"));
+ }
}catch(ConnectionException& e){
handler->client.close(e.code, e.what(), method->amqpClassId(),
method->amqpMethodId());
}catch(std::exception& e){
@@ -63,9 +70,10 @@
ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new
Handler(connection)) {}
-ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()),
connection(c) {}
+ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()),
server(c.getOutput()),
+ connection(c),
serverMode(false) {}
-void ConnectionHandler::Handler::startOk(const FieldTable&
/*clientProperties*/,
+void ConnectionHandler::Handler::startOk(const framing::FieldTable&
/*clientProperties*/,
const string& mechanism,
const string& response, const string& /*locale*/)
{
@@ -110,3 +118,41 @@
void ConnectionHandler::Handler::closeOk(){
connection.getOutput().close();
}
+
+
+void ConnectionHandler::Handler::start(uint8_t /*versionMajor*/,
+ uint8_t /*versionMinor*/,
+ const FieldTable& /*serverProperties*/,
+ const string& /*mechanisms*/,
+ const string& /*locales*/)
+{
+ string uid = "qpidd";
+ string pwd = "qpidd";
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ server.startOk(FieldTable(), PLAIN, response, en_US);
+ connection.initMgmt(true);
+}
+
+void ConnectionHandler::Handler::secure(const string& /*challenge*/)
+{
+ server.secureOk("");
+}
+
+void ConnectionHandler::Handler::tune(uint16_t channelMax,
+ uint32_t frameMax,
+ uint16_t heartbeat)
+{
+ connection.setFrameMax(frameMax);
+ connection.setHeartbeat(heartbeat);
+ server.tuneOk(channelMax, frameMax, heartbeat);
+ server.open("/", "", true);
+}
+
+void ConnectionHandler::Handler::openOk(const string& /*knownHosts*/)
+{
+}
+
+void ConnectionHandler::Handler::redirect(const string& /*host*/, const
string& /*knownHosts*/)
+{
+
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h Fri Feb
1 10:21:01 2008
@@ -24,8 +24,10 @@
#include <memory>
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
@@ -39,10 +41,13 @@
// TODO aconway 2007-09-18: Rename to ConnectionHandler
class ConnectionHandler : public framing::FrameHandler
{
- struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
+ struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
+ public framing::AMQP_ClientOperations::ConnectionHandler
{
framing::AMQP_ClientProxy::Connection client;
+ framing::AMQP_ServerProxy::Connection server;
Connection& connection;
+ bool serverMode;
Handler(Connection& connection);
void startOk(const qpid::framing::FieldTable& clientProperties,
@@ -55,6 +60,23 @@
void close(uint16_t replyCode, const std::string& replyText,
uint16_t classId, uint16_t methodId);
void closeOk();
+
+
+ void start(uint8_t versionMajor,
+ uint8_t versionMinor,
+ const qpid::framing::FieldTable& serverProperties,
+ const std::string& mechanisms,
+ const std::string& locales);
+
+ void secure(const std::string& challenge);
+
+ void tune(uint16_t channelMax,
+ uint32_t frameMax,
+ uint16_t heartbeat);
+
+ void openOk(const std::string& knownHosts);
+
+ void redirect(const std::string& host, const std::string& knownHosts);
};
std::auto_ptr<Handler> handler;
public:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Feb 1
10:21:01 2008
@@ -23,6 +23,7 @@
#include "Connection.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
@@ -57,17 +58,19 @@
//
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (m && invoke(*this, *m))
+ if (m &&
invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
return;
- else if (session.get()) {
+ } else if (session.get()) {
boost::optional<SequenceNumber> ack=session->received(f);
session->in.handle(f);
if (ack)
peerSession.ack(*ack, SequenceNumberSet());
- }
- else if (!ignoring)
+ } else if (m &&
invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else if (!ignoring) {
throw ChannelErrorException(
QPID_MSG("Channel " << channel.get() << " is not open"));
+ }
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
session->detach();
@@ -186,6 +189,19 @@
void SessionHandler::solicitAck() {
assertAttached("solicit-ack");
peerSession.ack(session->sendingAck(), SequenceNumberSet());
+}
+
+void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t
detachedLifetime)
+{
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, detachedLifetime));
+ session.reset(state.release());
+}
+
+void SessionHandler::detached()
+{
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Fri Feb 1
10:21:01 2008
@@ -23,6 +23,7 @@
*/
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
@@ -43,6 +44,7 @@
*/
class SessionHandler : public framing::FrameHandler::InOutHandler,
public framing::AMQP_ServerOperations::SessionHandler,
+ public framing::AMQP_ClientOperations::SessionHandler,
private boost::noncopyable
{
public:
@@ -81,11 +83,16 @@
const framing::SequenceNumberSet& seenFrameSet);
void highWaterMark(uint32_t lastSentMark);
void solicitAck();
+
+ //extra methods required for assuming client role
+ void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
+ void detached();
void assertAttached(const char* method) const;
void assertActive(const char* method) const;
void assertClosed(const char* method) const;
+
Connection& connection;
framing::ChannelHandler channel;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h Fri Feb 1 10:21:01
2008
@@ -38,6 +38,7 @@
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
virtual void run(ConnectionInputHandlerFactory* factory) = 0;
+ virtual void connect(const std::string& host, int16_t port,
ConnectionInputHandlerFactory* factory) = 0;
virtual void shutdown() = 0;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Fri Feb 1
10:21:01 2008
@@ -30,6 +30,7 @@
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/AMQDataBlock.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
@@ -53,6 +54,7 @@
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
void run(ConnectionInputHandlerFactory* factory);
+ void connect(const std::string& host, int16_t port,
ConnectionInputHandlerFactory* factory);
void shutdown();
uint16_t getPort() const;
@@ -92,13 +94,17 @@
bool initiated;
bool readError;
std::string identifier;
+ bool isClient;
+
+ void write(const framing::AMQDataBlock&);
public:
AsynchIOHandler() :
inputHandler(0),
frameQueueClosed(false),
initiated(false),
- readError(false)
+ readError(false),
+ isClient(false)
{}
~AsynchIOHandler() {
@@ -107,6 +113,8 @@
delete inputHandler;
}
+ void setClient() { isClient = true; }
+
void init(AsynchIO* a, ConnectionInputHandler* h) {
aio = a;
inputHandler = h;
@@ -179,11 +187,48 @@
t[i].join();
}
}
+
+void AsynchIOAcceptor::connect(const std::string& host, int16_t port,
ConnectionInputHandlerFactory* f)
+{
+ Socket* socket = new Socket();//Should be deleted by handle when socket
closes
+ socket->connect(host, port);
+ AsynchIOHandler* async = new AsynchIOHandler;
+ async->setClient();
+ ConnectionInputHandler* handler = f->create(async, *socket);
+ AsynchIO* aio = new AsynchIO(*socket,
+ boost::bind(&AsynchIOHandler::readbuff,
async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect,
async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket,
async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async,
_1),
+ boost::bind(&AsynchIOHandler::idle, async,
_1));
+ async->init(aio, handler);
+
+ // Give connection some buffers to use
+ for (int i = 0; i < 4; i++) {
+ aio->queueReadBuffer(new Buff);
+ }
+ aio->start(poller);
+
+}
+
void AsynchIOAcceptor::shutdown() {
poller->shutdown();
}
+
+void AsynchIOHandler::write(const framing::AMQDataBlock& data)
+{
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.size();
+ aio->queueWrite(buff);
+}
+
// Output side
void AsynchIOHandler::send(framing::AMQFrame& frame) {
// TODO: Need to find out if we are in the callback context,
@@ -274,6 +319,12 @@
}
void AsynchIOHandler::idle(AsynchIO&){
+ if (isClient && !initiated) {
+ //get & write protocol header from upper layers
+ write(inputHandler->getInitiation());
+ initiated = true;
+ return;
+ }
ScopedLock<Mutex> l(frameQueueLock);
if (frameQueue.empty()) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h Fri Feb
1 10:21:01 2008
@@ -36,6 +36,7 @@
public TimeoutHandler, public OutputTask
{
public:
+ virtual qpid::framing::ProtocolInitiation getInitiation() = 0;
virtual void closed() = 0;
};
Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=617590&r1=617589&r2=617590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Fri Feb 1 10:21:01
2008
@@ -82,6 +82,11 @@
<arg name="sequence" dir="IO" type="uint32" default="0"/>
<arg name="body" dir="IO" type="lstr" default=""/>
</method>
+
+ <method name="connect" desc="Establish a connection to another broker">
+ <arg name="host" dir="I" type="sstr" default=""/>
+ <arg name="port" dir="I" type="uint32" default="0"/>
+ </method>
</class>
<!--
@@ -191,6 +196,51 @@
<method name="close"/>
</class>
+
+ <!--
+ ===============================================================
+ Link
+ ===============================================================
+ -->
+ <class name="link">
+ <configElement name="vhostRef" type="objId" access="RC" index="y"
parentRef="y"/>
+ <configElement name="address" type="sstr" access="RC" index="y"/>
+
+ <instElement name="closing" type="bool" desc="This link is
closing by management request"/>
+ <instElement name="authIdentity" type="sstr"/>
+ <instElement name="framesFromPeer" type="count64"/>
+ <instElement name="framesToPeer" type="count64"/>
+ <instElement name="bytesFromPeer" type="count64"/>
+ <instElement name="bytesToPeer" type="count64"/>
+
+ <method name="close"/>
+
+ <method name="bridge" desc="Bridge messages over the link">
+ <arg name="src" dir="I" type="sstr"/>
+ <arg name="dest" dir="I" type="sstr"/>
+ <arg name="key" dir="I" type="sstr" default=""/>
+ <arg name="src_is_queue" dir="I" type="bool" default="0"/>
+ <arg name="src_is_local" dir="I" type="bool" default="0"/>
+ </method>
+ </class>
+
+
+ <!--
+ ===============================================================
+ Bridge
+ ===============================================================
+ -->
+ <class name="bridge">
+ <configElement name="linkRef" type="objId" access="RC" index="y"
parentRef="y"/>
+ <configElement name="channelId" type="uint16" access="RC"
index="y"/>
+ <configElement name="src" type="sstr" access="RC"/>
+ <configElement name="dest" type="sstr" access="RC"/>
+ <configElement name="key" type="sstr" access="RC"/>
+ <configElement name="src_is_queue" type="bool" access="RC"/>
+ <configElement name="src_is_local" type="bool" access="RC"/>
+ <method name="close"/>
+ </class>
+
<!--
===============================================================