Author: astitcher
Date: Wed Apr 16 17:19:14 2008
New Revision: 648904

URL: http://svn.apache.org/viewvc?rev=648904&view=rev
Log:
Refactored IO Thread creation so that it happens in the Broker class
- There is now a single Poller created by the Broker class that is
  passed to the Acceptor for use in network IO. It can also now be passed
  to anything else that wants to put work in the IO threads
- The Broker class itself is now responsible for actually creating the
  threads

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=648904&r1=648903&r2=648904&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Apr 16 
17:19:14 2008
@@ -37,6 +37,9 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/sys/Acceptor.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Thread.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionInputHandlerFactory.h"
 #include "qpid/sys/TimeoutHandler.h"
@@ -53,6 +56,9 @@
 #endif
 
 using qpid::sys::Acceptor;
+using qpid::sys::Poller;
+using qpid::sys::Dispatcher;
+using qpid::sys::Thread;
 using qpid::framing::FrameHandler;
 using qpid::framing::ChannelId;
 using qpid::management::ManagementAgent;
@@ -121,6 +127,7 @@
 const std::string qpid_management("qpid.management");
 
 Broker::Broker(const Broker::Options& conf) :
+    poller(new Poller),
     config(conf),
     store(0),
     dataDir(conf.noDataDir ? std::string () : conf.dataDir),
@@ -253,15 +260,31 @@
 }
 
 void Broker::run() {
-    getAcceptor().run(&factory);
+
+    getAcceptor().run(poller, &factory);
+       
+    Dispatcher d(poller);
+    int numIOThreads = config.workerThreads;
+    std::vector<Thread> t(numIOThreads-1);
+
+    // Run n-1 io threads
+    for (int i=0; i<numIOThreads-1; ++i)
+        t[i] = Thread(d);
+
+    // Run final thread
+    d.run();
+       
+    // Now wait for n-1 io threads to exit
+    for (int i=0; i<numIOThreads-1; ++i) {
+        t[i].join();
+    }
 }
 
 void Broker::shutdown() {
     // NB: this function must be async-signal safe, it must not
     // call any function that is not async-signal safe.
     // Any unsafe shutdown actions should be done in the destructor.
-    if (acceptor)
-        acceptor->shutdown();
+    poller->shutdown();
 }
 
 Broker::~Broker() {
@@ -281,8 +304,7 @@
     if (!acceptor) {
         const_cast<Acceptor::shared_ptr&>(acceptor) =
             Acceptor::create(config.port,
-                             config.connectionBacklog,
-                             config.workerThreads);
+                             config.connectionBacklog);
         QPID_LOG(info, "Listening on port " << getPort());
     }
     return *acceptor;
@@ -330,7 +352,7 @@
     const std::string& host, uint16_t port,
     sys::ConnectionCodec::Factory* f)
 {
-    getAcceptor().connect(host, port, f ? f : &factory);
+    getAcceptor().connect(poller, host, port, f ? f : &factory);
 }
 
 void Broker::connect(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=648904&r1=648903&r2=648904&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Apr 16 17:19:14 
2008
@@ -50,6 +50,10 @@
 
 namespace qpid { 
 
+namespace sys {
+    class Poller;
+}
+
 class Url;
 
 namespace broker {
@@ -129,6 +133,7 @@
   private:
     sys::Acceptor& getAcceptor() const;
 
+    boost::shared_ptr<qpid::sys::Poller> poller;
     Options config;
     sys::Acceptor::shared_ptr acceptor;
     MessageStore* store;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h?rev=648904&r1=648903&r2=648904&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h Wed Apr 16 17:19:14 
2008
@@ -26,22 +26,24 @@
 #include "qpid/SharedObject.h"
 #include "ConnectionCodec.h"
 
+
 namespace qpid {
 namespace sys {
 
+class Poller;
+
 class Acceptor : public qpid::SharedObject<Acceptor>
 {
   public:
-    static Acceptor::shared_ptr create(int16_t port, int backlog, int threads);
+    static Acceptor::shared_ptr create(int16_t port, int backlog);
     virtual ~Acceptor() = 0;
     virtual uint16_t getPort() const = 0;
     virtual std::string getHost() const = 0;
-    virtual void run(ConnectionCodec::Factory*) = 0;
+    virtual void run(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
     virtual void connect(
-        const std::string& host, int16_t port, ConnectionCodec::Factory* 
codec) = 0;
-
-    /** Note: this function is async-signal safe */
-    virtual void shutdown() = 0;
+        boost::shared_ptr<Poller>,
+        const std::string& host, int16_t port,
+        ConnectionCodec::Factory* codec) = 0;
 };
 
 inline Acceptor::~Acceptor() {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=648904&r1=648903&r2=648904&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Wed Apr 16 
17:19:14 2008
@@ -43,19 +43,15 @@
 namespace sys {
 
 class AsynchIOAcceptor : public Acceptor {
-    Poller::shared_ptr poller;
     Socket listener;
-    int numIOThreads;
     const uint16_t listeningPort;
+    std::auto_ptr<AsynchAcceptor> acceptor;
 
   public:
-    AsynchIOAcceptor(int16_t port, int backlog, int threads);
-    ~AsynchIOAcceptor() {}
-    void run(ConnectionCodec::Factory*);
-    void connect(const std::string& host, int16_t port, 
ConnectionCodec::Factory*);
+    AsynchIOAcceptor(int16_t port, int backlog);
+    void run(Poller::shared_ptr, ConnectionCodec::Factory*);
+    void connect(Poller::shared_ptr, const std::string& host, int16_t port, 
ConnectionCodec::Factory*);
 
-    void shutdown();
-        
     uint16_t getPort() const;
     std::string getHost() const;
 
@@ -63,15 +59,14 @@
     void accepted(Poller::shared_ptr, const Socket&, 
ConnectionCodec::Factory*);
 };
 
-Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog)
 {
-    return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
+    return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog));
 }
 
-AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
-    poller(new Poller),
-    numIOThreads(threads),
-    listeningPort(listener.listen(port, backlog))
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) :
+    listeningPort(listener.listen(port, backlog)),
+    acceptor(0)
 {}
 
 // Buffer definition
@@ -157,30 +152,17 @@
     return listener.getSockname();
 }
 
-void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) {
-    Dispatcher d(poller);
-    AsynchAcceptor
-        acceptor(listener,
-                 boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, 
fact));
-    acceptor.start(poller);
-       
-    std::vector<Thread> t(numIOThreads-1);
-
-    // Run n-1 io threads
-    for (int i=0; i<numIOThreads-1; ++i)
-        t[i] = Thread(d);
-
-    // Run final thread
-    d.run();
-       
-    // Now wait for n-1 io threads to exit
-    for (int i=0; i<numIOThreads-1; ++i) {
-        t[i].join();
-    }
+void AsynchIOAcceptor::run(Poller::shared_ptr poller, 
ConnectionCodec::Factory* fact) {
+    acceptor.reset(
+        new AsynchAcceptor(listener,
+                           boost::bind(&AsynchIOAcceptor::accepted, this, 
poller, _1, fact)));
+    acceptor->start(poller);
 }
     
 void AsynchIOAcceptor::connect(
-    const std::string& host, int16_t port, ConnectionCodec::Factory* f)
+    Poller::shared_ptr poller,
+    const std::string& host, int16_t port,
+    ConnectionCodec::Factory* f)
 {
     Socket* socket = new Socket();//Should be deleted by handle when socket 
closes
     socket->connect(host, port);
@@ -201,14 +183,6 @@
     }
     aio->start(poller);
 }
-
-
-void AsynchIOAcceptor::shutdown() {
-    // NB: this function must be async-signal safe, it must not
-    // call any function that is not async-signal safe.
-    poller->shutdown();
-}
-
 
 void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
 {


Reply via email to