Author: astitcher
Date: Fri Apr 18 14:03:49 2008
New Revision: 649689
URL: http://svn.apache.org/viewvc?rev=649689&view=rev
Log:
Refactored Acceptor code to allow multiple acceptors to be present in the broker
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp?rev=649689&r1=649688&r2=649689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp Fri Apr 18 14:03:49 2008
@@ -22,11 +22,20 @@
namespace qpid {
-Plugin::Plugins Plugin::plugins;
+namespace {
+// This is a single threaded singleton implementation so
+// it is important to be sure that the first use of this
+// singleton is when the program is still single threaded
+Plugin::Plugins& thePlugins() {
+ static Plugin::Plugins plugins;
+
+ return plugins;
+}
+}
Plugin::Plugin() {
// Register myself.
- plugins.push_back(this);
+ thePlugins().push_back(this);
}
Plugin::~Plugin() {}
@@ -34,7 +43,7 @@
Options* Plugin::getOptions() { return 0; }
const Plugin::Plugins& Plugin::getPlugins() {
- return plugins;
+ return thePlugins();
}
} // namespace qpid
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h?rev=649689&r1=649688&r2=649689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h Fri Apr 18 14:03:49 2008
@@ -88,9 +88,6 @@
* Caller must not delete plugin pointers.
*/
static const Plugins& getPlugins();
-
- private:
- static Plugins plugins;
};
} // namespace qpid
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=649689&r1=649688&r2=649689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Apr 18
14:03:49 2008
@@ -260,8 +260,7 @@
}
void Broker::run() {
-
- getAcceptor().run(poller, &factory);
+ accept();
Dispatcher d(poller);
int numIOThreads = config.workerThreads;
@@ -298,18 +297,6 @@
}
}
-uint16_t Broker::getPort() const { return getAcceptor().getPort(); }
-
-Acceptor& Broker::getAcceptor() const {
- if (!acceptor) {
- const_cast<Acceptor::shared_ptr&>(acceptor) =
- Acceptor::create(config.port,
- config.connectionBacklog);
- QPID_LOG(info, "Listening on port " << getPort());
- }
- return *acceptor;
-}
-
ManagementObject::shared_ptr Broker::GetManagementObject(void) const
{
return dynamic_pointer_cast<ManagementObject> (mgmtObject);
@@ -348,11 +335,41 @@
return status;
}
+boost::shared_ptr<Acceptor> Broker::getAcceptor() const {
+ assert(acceptors.size() > 0);
+#if 0
+ if (!acceptor) {
+ const_cast<Acceptor::shared_ptr&>(acceptor) =
+ Acceptor::create(config.port,
+ config.connectionBacklog);
+ QPID_LOG(info, "Listening on port " << getPort());
+ }
+#endif
+ return acceptors[0];
+}
+
+void Broker::registerAccepter(Acceptor::shared_ptr acceptor) {
+ acceptors.push_back(acceptor);
+}
+
+// TODO: This can only work if there is only one acceptor
+uint16_t Broker::getPort() const {
+ return getAcceptor()->getPort();
+}
+
+// TODO: This should iterate over all acceptors
+void Broker::accept() {
+ for (unsigned int i = 0; i < acceptors.size(); ++i)
+ acceptors[i]->run(poller, &factory);
+}
+
+
+// TODO: How to chose the acceptor to use for the connection
void Broker::connect(
const std::string& host, uint16_t port,
sys::ConnectionCodec::Factory* f)
{
- getAcceptor().connect(poller, host, port, f ? f : &factory);
+ getAcceptor()->connect(poller, host, port, f ? f : &factory);
}
void Broker::connect(
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=649689&r1=649688&r2=649689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Apr 18 14:03:49
2008
@@ -43,7 +43,6 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/sys/Acceptor.h"
#include "qpid/sys/Runnable.h"
#include <vector>
@@ -51,6 +50,7 @@
namespace qpid {
namespace sys {
+ class Acceptor;
class Poller;
}
@@ -124,6 +124,12 @@
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
+ /** Add to the broker's acceptors */
+ void registerAccepter(boost::shared_ptr<sys::Acceptor>);
+
+ /** Accept connections */
+ void accept();
+
/** Create a connection to another broker. */
void connect(const std::string& host, uint16_t port,
sys::ConnectionCodec::Factory* =0);
@@ -131,11 +137,9 @@
void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
private:
- sys::Acceptor& getAcceptor() const;
-
- boost::shared_ptr<qpid::sys::Poller> poller;
+ boost::shared_ptr<sys::Poller> poller;
Options config;
- sys::Acceptor::shared_ptr acceptor;
+ std::vector< boost::shared_ptr<sys::Acceptor> > acceptors;
MessageStore* store;
DataDir dataDir;
@@ -149,6 +153,10 @@
management::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
+
+ // TODO: There is no longer a single acceptor so the use of the following
needs to be fixed
+ // For the present just return the first acceptor registered.
+ boost::shared_ptr<sys::Acceptor> getAcceptor() const;
void declareStandardExchange(const std::string& name, const std::string&
type);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h?rev=649689&r1=649688&r2=649689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h Fri Apr 18 14:03:49
2008
@@ -35,7 +35,6 @@
class Acceptor : public qpid::SharedObject<Acceptor>
{
public:
- static Acceptor::shared_ptr create(int16_t port, int backlog);
virtual ~Acceptor() = 0;
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=649689&r1=649688&r2=649689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Apr 18
14:03:49 2008
@@ -20,10 +20,13 @@
*/
#include "Acceptor.h"
-
#include "AsynchIOHandler.h"
#include "AsynchIO.h"
+#include "qpid/Plugin.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+
#include <boost/bind.hpp>
#include <memory>
@@ -47,14 +50,26 @@
void accepted(Poller::shared_ptr, const Socket&,
ConnectionCodec::Factory*);
};
-Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog)
-{
- return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog));
-}
+// Static instance to initialise plugin
+static class TCPIOPlugin : public Plugin {
+ void earlyInitialize(Target&) {
+ }
+
+ void initialize(Target& target) {
+
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ // Only provide to a Broker
+ if (broker) {
+ const broker::Broker::Options& opts = broker->getOptions();
+ Acceptor::shared_ptr acceptor(new AsynchIOAcceptor(opts.port,
opts.connectionBacklog));
+ QPID_LOG(info, "Listening on TCP port " << acceptor->getPort());
+ broker->registerAccepter(acceptor);
+ }
+ }
+} acceptor;
AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) :
- listeningPort(listener.listen(port, backlog)),
- acceptor(0)
+ listeningPort(listener.listen(port, backlog))
{}
void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f) {