Author: astitcher
Date: Tue Apr 22 14:13:20 2008
New Revision: 650657

URL: http://svn.apache.org/viewvc?rev=650657&view=rev
Log:
* Renamed the Acceptor class to be the ProtocolFactory class
  which better approximates its current behaviour
* Slightly refactored TCPIOPlugin to better approximate how it would look
  when we implement a proper AsynchConnector

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
      - copied, changed from r650640, 
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=650657&r1=650656&r2=650657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Apr 22 14:13:20 2008
@@ -473,7 +473,6 @@
   qpid/management/ManagementAgent.h \
   qpid/management/ManagementExchange.h \
   qpid/management/ManagementObject.h \
-  qpid/sys/Acceptor.h \
   qpid/sys/AggregateOutput.h \
   qpid/sys/AsynchIO.h \
   qpid/sys/AsynchIOHandler.h \
@@ -493,6 +492,7 @@
   qpid/sys/OutputControl.h \
   qpid/sys/OutputTask.h \
   qpid/sys/Poller.h \
+  qpid/sys/ProtocolFactory.h \
   qpid/sys/Runnable.h \
   qpid/sys/ScopedIncrement.h \
   qpid/sys/Semaphore.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=650657&r1=650656&r2=650657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Apr 22 
14:13:20 2008
@@ -35,7 +35,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/ProtocolFactory.h"
 #include "qpid/sys/Poller.h"
 #include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Thread.h"
@@ -54,7 +54,7 @@
 #include <sasl/sasl.h>
 #endif
 
-using qpid::sys::Acceptor;
+using qpid::sys::ProtocolFactory;
 using qpid::sys::Poller;
 using qpid::sys::Dispatcher;
 using qpid::sys::Thread;
@@ -334,41 +334,33 @@
     return status;
 }
 
-boost::shared_ptr<Acceptor> Broker::getAcceptor() const {
-    assert(acceptors.size() > 0);
-#if 0
-    if (!acceptor) {
-        const_cast<Acceptor::shared_ptr&>(acceptor) =
-            Acceptor::create(config.port,
-                             config.connectionBacklog);
-        QPID_LOG(info, "Listening on port " << getPort());
-    }
-#endif
-    return acceptors[0];
+boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory() const {
+    assert(protocolFactories.size() > 0);
+    return protocolFactories[0];
 }
 
-void Broker::registerAccepter(Acceptor::shared_ptr acceptor) {
-    acceptors.push_back(acceptor);
+void Broker::registerProtocolFactory(ProtocolFactory::shared_ptr 
protocolFactory) {
+    protocolFactories.push_back(protocolFactory);
 }
 
-// TODO: This can only work if there is only one acceptor
+// TODO: This can only work if there is only one protocolFactory
 uint16_t Broker::getPort() const  {
-    return getAcceptor()->getPort();
+    return getProtocolFactory()->getPort();
 }
 
-// TODO: This should iterate over all acceptors
+// TODO: This should iterate over all protocolFactories
 void Broker::accept() {
-    for (unsigned int i = 0; i < acceptors.size(); ++i)
-        acceptors[i]->run(poller, &factory);
+    for (unsigned int i = 0; i < protocolFactories.size(); ++i)
+        protocolFactories[i]->accept(poller, &factory);
 }
 
 
-// TODO: How to chose the acceptor to use for the connection
+// TODO: How to chose the protocolFactory to use for the connection
 void Broker::connect(
     const std::string& host, uint16_t port,
     sys::ConnectionCodec::Factory* f)
 {
-    getAcceptor()->connect(poller, host, port, f ? f : &factory);
+    getProtocolFactory()->connect(poller, host, port, f ? f : &factory);
 }
 
 void Broker::connect(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=650657&r1=650656&r2=650657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Apr 22 14:13:20 
2008
@@ -50,7 +50,7 @@
 namespace qpid { 
 
 namespace sys {
-    class Acceptor;
+    class ProtocolFactory;
     class Poller;
 }
 
@@ -124,8 +124,8 @@
     management::Manageable::status_t
         ManagementMethod (uint32_t methodId, management::Args& args);
     
-    /** Add to the broker's acceptors */
-    void registerAccepter(boost::shared_ptr<sys::Acceptor>);
+    /** Add to the broker's protocolFactorys */
+    void registerProtocolFactory(boost::shared_ptr<sys::ProtocolFactory>);
 
     /** Accept connections */
     void accept();
@@ -139,7 +139,7 @@
   private:
     boost::shared_ptr<sys::Poller> poller;
     Options config;
-    std::vector< boost::shared_ptr<sys::Acceptor> > acceptors;
+    std::vector< boost::shared_ptr<sys::ProtocolFactory> > protocolFactories;
     MessageStore* store;
     DataDir dataDir;
 
@@ -154,9 +154,9 @@
     Vhost::shared_ptr              vhostObject;
     System::shared_ptr             systemObject;
 
-    // TODO: There is no longer a single acceptor so the use of the following 
needs to be fixed
-    // For the present just return the first acceptor registered.
-    boost::shared_ptr<sys::Acceptor> getAcceptor() const;
+    // TODO: There isn't a single ProtocolFactory so the use of the following 
needs to be fixed
+    // For the present just return the first ProtocolFactory registered.
+    boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
 
     void declareStandardExchange(const std::string& name, const std::string& 
type);
 };

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h (from 
r650640, incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h)
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h&r1=650640&r2=650657&rev=650657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h Tue Apr 22 
14:13:20 2008
@@ -1,5 +1,5 @@
-#ifndef _sys_Acceptor_h
-#define _sys_Acceptor_h
+#ifndef _sys_ProtocolFactory_h
+#define _sys_ProtocolFactory_h
 
 /*
  *
@@ -32,23 +32,23 @@
 
 class Poller;
 
-class Acceptor : public qpid::SharedObject<Acceptor>
+class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
 {
   public:
-    virtual ~Acceptor() = 0;
+    virtual ~ProtocolFactory() = 0;
     virtual uint16_t getPort() const = 0;
     virtual std::string getHost() const = 0;
-    virtual void run(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
+    virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) 
= 0;
     virtual void connect(
         boost::shared_ptr<Poller>,
         const std::string& host, int16_t port,
         ConnectionCodec::Factory* codec) = 0;
 };
 
-inline Acceptor::~Acceptor() {}
+inline ProtocolFactory::~ProtocolFactory() {}
 
 }}
 
 
     
-#endif  /*!_sys_Acceptor_h*/
+#endif  //!_sys_ProtocolFactory_h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=650657&r1=650656&r2=650657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Tue Apr 22 
14:13:20 2008
@@ -19,7 +19,7 @@
  *
  */
 
-#include "Acceptor.h"
+#include "ProtocolFactory.h"
 #include "AsynchIOHandler.h"
 #include "AsynchIO.h"
 
@@ -33,21 +33,21 @@
 namespace qpid {
 namespace sys {
 
-class AsynchIOAcceptor : public Acceptor {
+class AsynchIOProtocolFactory : public ProtocolFactory {
     Socket listener;
     const uint16_t listeningPort;
     std::auto_ptr<AsynchAcceptor> acceptor;
 
   public:
-    AsynchIOAcceptor(int16_t port, int backlog);
-    void run(Poller::shared_ptr, ConnectionCodec::Factory*);
+    AsynchIOProtocolFactory(int16_t port, int backlog);
+    void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
     void connect(Poller::shared_ptr, const std::string& host, int16_t port, 
ConnectionCodec::Factory*);
 
     uint16_t getPort() const;
     std::string getHost() const;
 
   private:
-    void accepted(Poller::shared_ptr, const Socket&, 
ConnectionCodec::Factory*);
+    void established(Poller::shared_ptr, const Socket&, 
ConnectionCodec::Factory*, bool isClient);
 };
 
 // Static instance to initialise plugin
@@ -56,24 +56,26 @@
     }
     
     void initialize(Target& target) {
-    
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         // Only provide to a Broker
         if (broker) {
             const broker::Broker::Options& opts = broker->getOptions();
-            Acceptor::shared_ptr acceptor(new AsynchIOAcceptor(opts.port, 
opts.connectionBacklog));
-            QPID_LOG(info, "Listening on TCP port " << acceptor->getPort());
-            broker->registerAccepter(acceptor);
+            ProtocolFactory::shared_ptr protocol(new 
AsynchIOProtocolFactory(opts.port, opts.connectionBacklog));
+            QPID_LOG(info, "Listening on TCP port " << protocol->getPort());
+            broker->registerProtocolFactory(protocol);
         }
     }
-} acceptor;
+} tcpPlugin;
 
-AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) :
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) :
     listeningPort(listener.listen(port, backlog))
 {}
 
-void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, 
ConnectionCodec::Factory* f) {
+void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const 
Socket& s,
+                                          ConnectionCodec::Factory* f, bool 
isClient) {
     AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
+    if (isClient)
+        async->setClient();
     AsynchIO* aio = new AsynchIO(s,
                                  boost::bind(&AsynchIOHandler::readbuff, 
async, _1, _2),
                                  boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -85,40 +87,30 @@
     aio->start(poller);
 }
 
-
-uint16_t AsynchIOAcceptor::getPort() const {
+uint16_t AsynchIOProtocolFactory::getPort() const {
     return listeningPort; // Immutable no need for lock.
 }
 
-std::string AsynchIOAcceptor::getHost() const {
+std::string AsynchIOProtocolFactory::getHost() const {
     return listener.getSockname();
 }
 
-void AsynchIOAcceptor::run(Poller::shared_ptr poller, 
ConnectionCodec::Factory* fact) {
+void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, 
ConnectionCodec::Factory* fact) {
     acceptor.reset(
         new AsynchAcceptor(listener,
-                           boost::bind(&AsynchIOAcceptor::accepted, this, 
poller, _1, fact)));
+                           boost::bind(&AsynchIOProtocolFactory::established, 
this, poller, _1, fact, false)));
     acceptor->start(poller);
 }
     
-void AsynchIOAcceptor::connect(
+void AsynchIOProtocolFactory::connect(
     Poller::shared_ptr poller,
     const std::string& host, int16_t port,
     ConnectionCodec::Factory* f)
 {
     Socket* socket = new Socket();//Should be deleted by handle when socket 
closes
     socket->connect(host, port);
-    AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f);
-    async->setClient();
-    AsynchIO* aio = new AsynchIO(*socket,
-                                 boost::bind(&AsynchIOHandler::readbuff, 
async, _1, _2),
-                                 boost::bind(&AsynchIOHandler::eof, async, _1),
-                                 boost::bind(&AsynchIOHandler::disconnect, 
async, _1),
-                                 boost::bind(&AsynchIOHandler::closedSocket, 
async, _1, _2),
-                                 boost::bind(&AsynchIOHandler::nobuffs, async, 
_1),
-                                 boost::bind(&AsynchIOHandler::idle, async, 
_1));
-    async->init(aio, 4);
-    aio->start(poller);
+
+    established(poller, *socket, f, true);
 }
 
 }} // namespace qpid::sys


Reply via email to