Author: gsim
Date: Tue Apr 22 13:14:15 2008
New Revision: 650635
URL: http://svn.apache.org/viewvc?rev=650635&view=rev
Log:
Moved federation to final 0-10 codepath
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Tue Apr 22
13:14:15 2008
@@ -27,12 +27,21 @@
using sys::Mutex;
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const
std::string& id)
- : frameQueueClosed(false), output(o), connection(this, broker, id),
- identifier(id), initialized(false) {}
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const
std::string& id, bool _isClient)
+ : frameQueueClosed(false), output(o), connection(this, broker, id,
_isClient),
+ identifier(id), initialized(false), isClient(_isClient) {}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
+ if (isClient && !initialized) {
+ //read in protocol header
+ framing::ProtocolInitiation pi;
+ if (pi.decode(in)) {
+ //TODO: check the version is correct
+ QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi << ")");
+ }
+ initialized = true;
+ }
framing::AMQFrame frame;
while(frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
@@ -44,7 +53,7 @@
bool Connection::canEncode() {
if (!frameQueueClosed) connection.doOutput();
Mutex::ScopedLock l(frameQueueLock);
- return !initialized || !frameQueue.empty();
+ return (!isClient && !initialized) || !frameQueue.empty();
}
bool Connection::isClosed() const {
@@ -55,10 +64,11 @@
size_t Connection::encode(const char* buffer, size_t size) {
Mutex::ScopedLock l(frameQueueLock);
framing::Buffer out(const_cast<char*>(buffer), size);
- if (!initialized) {
+ if (!isClient && !initialized) {
framing::ProtocolInitiation pi(getVersion());
pi.encode(out);
initialized = true;
+ QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")");
}
while (!frameQueue.empty() && (frameQueue.front().size() <=
out.available())) {
frameQueue.front().encode(out);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Tue Apr 22
13:14:15 2008
@@ -43,9 +43,10 @@
broker::Connection connection; // FIXME aconway 2008-03-18:
std::string identifier;
bool initialized;
+ bool isClient;
public:
- Connection(sys::OutputControl&, broker::Broker&, const std::string& id);
+ Connection(sys::OutputControl&, broker::Broker&, const std::string& id,
bool isClient = false);
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool isClosed() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue Apr 22
13:14:15 2008
@@ -34,7 +34,7 @@
Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener
l, const management::ArgsLinkBridge& _args) :
args(_args), channel(id, &(c.getOutput())), peer(channel),
mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest,
args.i_key, args.i_src_is_queue, args.i_src_is_local)),
- connection(c), listener(l)
+ connection(c), listener(l), name(Uuid(true).str())
{
management::ManagementAgent::getAgent()->addObject(mgmtObject);
}
@@ -46,24 +46,24 @@
void Bridge::create()
{
- framing::AMQP_ServerProxy::Session session(channel);
- session.open(0);
+ framing::AMQP_ServerProxy::Session010 session(channel);
+ session.attach(name, false);
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and
pass them to Connection::received()
} else {
if (args.i_src_is_queue) {
- peer.getMessage().subscribe(0, args.i_src, args.i_dest, false, 0,
0, false, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer.getMessage010().subscribe(args.i_src, args.i_dest, 0, 0,
false, "", 0, FieldTable());
+ peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
string queue = "bridge_queue_";
queue += Uuid(true).str();
- peer.getQueue().declare(0, queue, "", false, false, true, true,
FieldTable());
- peer.getQueue().bind(0, queue, args.i_src, args.i_key,
FieldTable());
- peer.getMessage().subscribe(0, queue, args.i_dest, false, 0, 0,
false, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer.getQueue010().declare(queue, "", false, false, true, true,
FieldTable());
+ peer.getExchange010().bind(queue, args.i_src, args.i_key,
FieldTable());
+ peer.getMessage010().subscribe(queue, args.i_dest, 0, 0, false,
"", 0, FieldTable());
+ peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF);
}
}
@@ -71,8 +71,8 @@
void Bridge::cancel()
{
- peer.getMessage().cancel(args.i_dest);
- peer.getSession().close();
+ peer.getMessage010().cancel(args.i_dest);
+ peer.getSession010().detach(name);
}
management::ManagementObject::shared_ptr Bridge::GetManagementObject (void)
const
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Tue Apr 22 13:14:15
2008
@@ -56,6 +56,7 @@
management::Bridge::shared_ptr mgmtObject;
ConnectionState& connection;
CancellationListener listener;
+ std::string name;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Apr 22
13:14:15 2008
@@ -85,9 +85,9 @@
};
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const
std::string& mgmtId_) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const
std::string& mgmtId_, bool isLink) :
ConnectionState(out_, broker_),
- adapter(*this),
+ adapter(*this, isLink),
mgmtClosing(false),
mgmtId(mgmtId_)
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Apr 22
13:14:15 2008
@@ -54,7 +54,7 @@
public ConnectionState
{
public:
- Connection(sys::ConnectionOutputHandler* out, Broker& broker, const
std::string& mgmtId);
+ Connection(sys::ConnectionOutputHandler* out, Broker& broker, const
std::string& mgmtId, bool isLink = false);
~Connection ();
/** Get the SessionHandler for channel. Create if it does not already
exist */
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Tue Apr
22 13:14:15 2008
@@ -43,11 +43,8 @@
sys::ConnectionCodec*
ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
- // FIXME aconway 2008-03-18:
-
- // gsim 2008-03-26 this seems only to be used when creating
- // connections from one broker to another
- return new PreviewConnectionCodec(out, broker, id, true);
+ // used to create connections from one broker to another
+ return new amqp_0_10::Connection(out, broker, id, true);
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Tue Apr
22 13:14:15 2008
@@ -65,23 +65,24 @@
}
}
-ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new
Handler(connection)) {}
+ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) :
handler(new Handler(connection, isClient)) {}
-ConnectionHandler::Handler::Handler(Connection& c) :
+ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
client(c.getOutput()), server(c.getOutput()),
- connection(c), serverMode(false)
+ connection(c), serverMode(!isClient)
{
- FieldTable properties;
- Array mechanisms(0x95);
-
- authenticator = SaslAuthenticator::createAuthenticator(c);
- authenticator->getMechanisms(mechanisms);
-
- Array locales(0x95);
- boost::shared_ptr<FieldValue> l(new Str16Value(en_US));
- locales.add(l);
- serverMode = true;
- client.start(properties, mechanisms, locales);
+ if (serverMode) {
+ FieldTable properties;
+ Array mechanisms(0x95);
+
+ authenticator = SaslAuthenticator::createAuthenticator(c);
+ authenticator->getMechanisms(mechanisms);
+
+ Array locales(0x95);
+ boost::shared_ptr<FieldValue> l(new Str16Value(en_US));
+ locales.add(l);
+ client.start(properties, mechanisms, locales);
+ }
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h Tue Apr
22 13:14:15 2008
@@ -50,7 +50,7 @@
bool serverMode;
std::auto_ptr<SaslAuthenticator> authenticator;
- Handler(Connection& connection);
+ Handler(Connection& connection, bool isClient);
~Handler();
void startOk(const qpid::framing::FieldTable& clientProperties,
const std::string& mechanism, const std::string& response,
@@ -81,7 +81,7 @@
};
std::auto_ptr<Handler> handler;
public:
- ConnectionHandler(Connection& connection);
+ ConnectionHandler(Connection& connection, bool isClient);
void close(framing::ReplyCode code, const std::string& text,
framing::ClassId classId, framing::MethodId methodId);
void handle(framing::AMQFrame& frame);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Apr 22
13:14:15 2008
@@ -83,7 +83,7 @@
}
bool SessionHandler::isValid(AMQMethodBody* m) {
- return session.get() || m->isA<SessionAttachBody>();
+ return session.get() || m->isA<SessionAttachBody>() ||
m->isA<SessionAttachedBody>();
}
void SessionHandler::handleOut(AMQFrame& f) {
@@ -134,10 +134,13 @@
peerSession.commandPoint(session->nextOut, 0);
}
-void SessionHandler::attached(const std::string& /*name*/)
+void SessionHandler::attached(const std::string& _name)
{
+ name = _name;//TODO: this should be used in conjunction with
+ //userid for connection as sessions identity
std::auto_ptr<SessionState>
state(connection.broker.getSessionManager().open(*this, 0));
session.reset(state.release());
+ peerSession.commandPoint(session->nextOut, 0);
}
void SessionHandler::detach(const std::string& name)
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Tue Apr 22 13:14:15
2008
@@ -94,7 +94,7 @@
mgmt.call_method(link, "close")
self.assertEqual(len(mgmt.get_objects("link")), 0)
- def DISABLED_test_pull_from_exchange(self):
+ def test_pull_from_exchange(self):
session = self.session
mgmt = Helper(self)
@@ -122,10 +122,10 @@
for i in range(1, 11):
msg = queue.get(timeout=5)
- self.assertEqual("Message %d" % i, msg.content.body)
+ self.assertEqual("Message %d" % i, msg.body)
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected message in queue: " + extra.content.body)
+ self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
@@ -135,7 +135,7 @@
mgmt.call_method(link, "close")
self.assertEqual(len(mgmt.get_objects("link")), 0)
- def DISABLED_test_pull_from_queue(self):
+ def test_pull_from_queue(self):
session = self.session
#setup queue on remote broker and add some messages
@@ -168,10 +168,10 @@
for i in range(1, 11):
msg = queue.get(timeout=5)
- self.assertEqual("Message %d" % i, msg.content.body)
+ self.assertEqual("Message %d" % i, msg.body)
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected message in queue: " + extra.content.body)
+ self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None