Author: astitcher
Date: Tue Apr 15 08:41:21 2008
New Revision: 648288

URL: http://svn.apache.org/viewvc?rev=648288&view=rev
Log:
Refactored the IO framework that sits on top of Poller so that it uses a 
generalised IOHandle.
This means that you can define new classes derived from IOHandle (other than 
Socket) that
can also be added to a Poller and waited for.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/IOHandle.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Apr 15 08:41:21 2008
@@ -62,7 +62,7 @@
 
 posix_plat_src = \
   qpid/sys/epoll/EpollPoller.cpp \
-  qpid/sys/DeletionManager.h \
+  qpid/sys/posix/IOHandle.cpp \
   qpid/sys/posix/Socket.cpp \
   qpid/sys/posix/AsynchIO.cpp \
   qpid/sys/posix/Time.cpp \
@@ -469,15 +469,17 @@
   qpid/sys/AtomicCount.h \
   qpid/sys/BlockingQueue.h \
   qpid/sys/Condition.h \
+  qpid/sys/ConnectionCodec.h \
   qpid/sys/ConnectionInputHandler.h \
   qpid/sys/ConnectionInputHandlerFactory.h \
   qpid/sys/ConnectionOutputHandler.h \
+  qpid/sys/DeletionManager.h \
   qpid/sys/Dispatcher.h \
+  qpid/sys/IOHandle.h \
   qpid/sys/Module.h \
   qpid/sys/Monitor.h \
   qpid/sys/Mutex.h \
   qpid/sys/OutputControl.h \
-  qpid/sys/ConnectionCodec.h \
   qpid/sys/OutputTask.h \
   qpid/sys/Poller.h \
   qpid/sys/Runnable.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Tue Apr 15 
08:41:21 2008
@@ -18,16 +18,17 @@
  * under the License.
  *
  */
-#include <iostream>
+#include "Connector.h"
+
 #include "qpid/log/Statement.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/AMQFrame.h"
-#include "Connector.h"
-
 #include "qpid/sys/AsynchIO.h"
 #include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Poller.h"
 #include "qpid/Msg.h"
+
+#include <iostream>
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
 
@@ -62,7 +63,7 @@
     Mutex::ScopedLock l(closedLock);
     assert(closed);
     socket.connect(host, port);
-    identifier=str(format("[%1% %2%]") % socket.getLocalPort() % 
socket.getPeerAddress());
+    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % 
socket.getPeerAddress());
     closed = false;
     poller = Poller::shared_ptr(new Poller);
     aio = new AsynchIO(socket,
@@ -72,7 +73,7 @@
                        0, // closed
                        0, // nobuffs
                        boost::bind(&Connector::writebuff, this, _1));
-    writer.setAio(aio);
+    writer.init(identifier, aio);
 }
 
 void Connector::init(){
@@ -184,11 +185,11 @@
 
 Connector::Writer::~Writer() { delete buffer; }
 
-void Connector::Writer::setAio(sys::AsynchIO* a) {
+void Connector::Writer::init(std::string id, sys::AsynchIO* a) {
     Mutex::ScopedLock l(lock);
+    identifier = id;
     aio = a;
     newBuffer(l);
-    identifier = str(format("[%1% %2%]") % aio->getSocket().getLocalPort() % 
aio->getSocket().getPeerAddress());
 }
 
 void Connector::Writer::handle(framing::AMQFrame& frame) { 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Tue Apr 15 
08:41:21 2008
@@ -68,7 +68,7 @@
         
         Writer();
         ~Writer();
-        void setAio(sys::AsynchIO*);
+        void init(std::string id, sys::AsynchIO*);
         void handle(framing::AMQFrame&);
         void write(sys::AsynchIO&);
     };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Tue Apr 15 08:41:21 
2008
@@ -40,6 +40,7 @@
 private:
     Callback acceptedCallback;
     DispatchHandle handle;
+    const Socket& socket;
 
 public:
     AsynchAcceptor(const Socket& s, Callback callback);
@@ -94,6 +95,7 @@
     ClosedCallback closedCallback;
     BuffersEmptyCallback emptyCallback;
     IdleCallback idleCallback;
+    const Socket& socket;
     std::deque<BufferBase*> bufferQueue;
     std::deque<BufferBase*> writeQueue;
     bool queuedClose;
@@ -119,7 +121,6 @@
     void queueWriteClose();
     bool writeQueueEmpty() { return writeQueue.empty(); }
     BufferBase* getQueuedBuffer();
-    const Socket& getSocket() const { return DispatchHandle::getSocket(); }
 
 private:
     ~AsynchIO();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Tue Apr 15 
08:41:21 2008
@@ -84,19 +84,20 @@
 };
 
 class AsynchIOHandler : public OutputControl {
+    std::string identifier;
     AsynchIO* aio;
     ConnectionCodec::Factory* factory;
     ConnectionCodec* codec;
     bool readError;
-    std::string identifier;
     bool isClient;
 
     void write(const framing::ProtocolInitiation&);
 
   public:
-    AsynchIOHandler() :
+    AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+       identifier(id),
         aio(0),
-        factory(0),
+        factory(f),
         codec(0),
         readError(false),
         isClient(false)
@@ -110,11 +111,8 @@
 
     void setClient() { isClient = true; }
     
-    void init(AsynchIO* a, ConnectionCodec::Factory* f) {
+    void init(AsynchIO* a) {
         aio = a;
-        factory = f;
-        identifier = aio->getSocket().getPeerAddress();
-
     }
 
     // Output side
@@ -133,7 +131,7 @@
 };
 
 void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, 
ConnectionCodec::Factory* f) {
-    AsynchIOHandler* async = new AsynchIOHandler; 
+    AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
     AsynchIO* aio = new AsynchIO(s,
                                  boost::bind(&AsynchIOHandler::readbuff, 
async, _1, _2),
                                  boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -141,7 +139,8 @@
                                  boost::bind(&AsynchIOHandler::closedSocket, 
async, _1, _2),
                                  boost::bind(&AsynchIOHandler::nobuffs, async, 
_1),
                                  boost::bind(&AsynchIOHandler::idle, async, 
_1));
-    async->init(aio, f);
+    async->init(aio);
+
     // Give connection some buffers to use
     for (int i = 0; i < 4; i++) {
         aio->queueReadBuffer(new Buff);
@@ -185,7 +184,7 @@
 {
     Socket* socket = new Socket();//Should be deleted by handle when socket 
closes
     socket->connect(host, port);
-    AsynchIOHandler* async = new AsynchIOHandler; 
+    AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f);
     async->setClient();
     AsynchIO* aio = new AsynchIO(*socket,
                                  boost::bind(&AsynchIOHandler::readbuff, 
async, _1, _2),
@@ -194,7 +193,8 @@
                                  boost::bind(&AsynchIOHandler::closedSocket, 
async, _1, _2),
                                  boost::bind(&AsynchIOHandler::nobuffs, async, 
_1),
                                  boost::bind(&AsynchIOHandler::idle, async, 
_1));
-    async->init(aio, f);
+    async->init(aio);
+
     // Give connection some buffers to use
     for (int i = 0; i < 4; i++) {
         aio->queueReadBuffer(new Buff);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h Tue Apr 15 08:41:21 
2008
@@ -55,8 +55,8 @@
     } state;
 
 public:
-    DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) :
-      PollerHandle(s),
+    DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback 
dCb) :
+      PollerHandle(h),
       readableCallback(rCb),
       writableCallback(wCb),
       disconnectedCallback(dCb),

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/IOHandle.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/IOHandle.h?rev=648288&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/IOHandle.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/IOHandle.h Tue Apr 15 08:41:21 
2008
@@ -0,0 +1,45 @@
+#ifndef _sys_IOHandle_h
+#define _sys_IOHandle_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace sys {
+
+/**
+ * This is a class intended to abstract the Unix concept of file descriptor or 
the Windows concept of HANDLE
+ */
+class PollerHandle;
+class IOHandlePrivate;
+class IOHandle {
+    friend class PollerHandle;
+
+protected:
+    IOHandlePrivate* const impl;
+
+    IOHandle(IOHandlePrivate*);
+    virtual ~IOHandle();
+};
+
+}}
+
+#endif // _sys_IOHandle_h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Tue Apr 15 08:41:21 2008
@@ -35,16 +35,16 @@
 /**
  * Handle class to use for polling
  */
+class IOHandle;
 class Poller;
 class PollerHandlePrivate;
 class PollerHandle {
     friend class Poller;
 
     PollerHandlePrivate* const impl;
-    const Socket& socket;
 
 public:
-    PollerHandle(const Socket& s);
+    PollerHandle(const IOHandle& h);
     
     // Usual way to delete (will defer deletion until we
     // can't be returned from a Poller::wait any more)
@@ -52,8 +52,6 @@
     
     // Class clients shouldn't ever use this
     virtual ~PollerHandle();
-    
-    const Socket& getSocket() const {return socket;}
 };
 
 /**

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Tue Apr 15 08:41:21 2008
@@ -21,25 +21,22 @@
  * under the License.
  *
  */
+#include "IOHandle.h"
+
 #include <string>
-#include "qpid/sys/Time.h"
 
 struct sockaddr;
 
 namespace qpid {
 namespace sys {
 
-class SocketPrivate;
-class Socket
-{
-       friend class Poller;
-
-       SocketPrivate* const impl;
+class Duration;
 
+class Socket : public IOHandle
+{
 public:
     /** Create a socket wrapper for descriptor. */
     Socket();
-    ~Socket();
 
     /** Create an initialized TCP socket */
     void createTcp() const;
@@ -106,10 +103,8 @@
     int read(void *buf, size_t count) const;
     int write(const void *buf, size_t count) const;
 
-    int toFd() const;
-    
 private:
-       Socket(SocketPrivate*);
+       Socket(IOHandlePrivate*);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Apr 15 
08:41:21 2008
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/sys/Poller.h"
+#include "qpid/sys/IOHandle.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/DeletionManager.h"
 #include "qpid/sys/posix/check.h"
@@ -54,11 +55,13 @@
         MONITORED_HUNGUP
     };
 
+    int fd;
     ::__uint32_t events;
     FDStat stat;
     Mutex lock;
 
-    PollerHandlePrivate() :
+    PollerHandlePrivate(int f) :
+      fd(f),
       events(0),
       stat(ABSENT) {
     }
@@ -97,9 +100,8 @@
     }
 };
 
-PollerHandle::PollerHandle(const Socket& s) :
-    impl(new PollerHandlePrivate),
-    socket(s)
+PollerHandle::PollerHandle(const IOHandle& h) :
+    impl(new PollerHandlePrivate(toFd(h.impl)))
 {}
 
 PollerHandle::~PollerHandle() {
@@ -201,7 +203,7 @@
     }
     epe.data.ptr = &handle;
     
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, toFd(handle.socket.impl), 
&epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
     
     // Record monitoring state of this fd
     eh.events = epe.events;
@@ -212,7 +214,7 @@
     PollerHandlePrivate& eh = *handle.impl;
     ScopedLock<Mutex> l(eh.lock);
     assert(!eh.isIdle());
-    int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, 
toFd(handle.socket.impl), 0);
+    int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
     // Ignore EBADF since deleting a nonexistent fd has the overall required 
result!
     // And allows the case where a sloppy program closes the fd and then does 
the delFd()
     if (rc == -1 && errno != EBADF) {
@@ -231,7 +233,7 @@
     epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
     epe.data.ptr = &handle;
     
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, 
toFd(handle.socket.impl), &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
     
     // Record monitoring state of this fd
     eh.events = epe.events;
@@ -247,7 +249,7 @@
     epe.events = eh.events;        
     epe.data.ptr = &handle;
 
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, 
toFd(handle.socket.impl), &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
 
     eh.setActive();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Tue Apr 15 
08:41:21 2008
@@ -65,7 +65,8 @@
 
 AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
     acceptedCallback(callback),
-    handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
+    handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0),
+    socket(s) {
 
     s.setNonblocking();
     ignoreSigpipe();
@@ -84,7 +85,7 @@
         errno = 0;
         // TODO: Currently we ignore the peers address, perhaps we should
         // log it or use it for connection acceptance.
-        s = h.getSocket().accept(0, 0);
+        s = socket.accept(0, 0);
         if (s) {
             acceptedCallback(*s);
         } else {
@@ -112,6 +113,7 @@
     closedCallback(cCb),
     emptyCallback(eCb),
     idleCallback(iCb),
+    socket(s),
     queuedClose(false),
     writePending(false) {
 
@@ -209,7 +211,7 @@
             bufferQueue.pop_front();
             errno = 0;
             int readCount = buff->byteCount-buff->dataCount;
-            int rc = h.getSocket().read(buff->bytes + buff->dataCount, 
readCount);
+            int rc = socket.read(buff->bytes + buff->dataCount, readCount);
             if (rc > 0) {
                 buff->dataCount += rc;
                 threadReadTotal += rc;
@@ -276,7 +278,7 @@
             writeQueue.pop_back();
             errno = 0;
             assert(buff->dataStart+buff->dataCount <= buff->byteCount);
-            int rc = h.getSocket().write(buff->bytes+buff->dataStart, 
buff->dataCount);
+            int rc = socket.write(buff->bytes+buff->dataStart, 
buff->dataCount);
             if (rc >= 0) {
                 threadWriteTotal += rc;
                 writeTotal += rc;
@@ -356,9 +358,9 @@
  */
 void AsynchIO::close(DispatchHandle& h) {
     h.stopWatch();
-    h.getSocket().close();
+    socket.close();
     if (closedCallback) {
-        closedCallback(*this, getSocket());
+        closedCallback(*this, socket);
     }
 }
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp?rev=648288&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp Tue Apr 15 
08:41:21 2008
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IOHandle.h"
+
+#include "PrivatePosix.h"
+
+namespace qpid {
+namespace sys {
+
+int toFd(const IOHandlePrivate* h)
+{
+    return h->fd;
+}
+
+IOHandle::IOHandle(IOHandlePrivate* h) :
+  impl(h)
+{}
+
+IOHandle::~IOHandle() {
+       delete impl;
+}
+
+}} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h Tue Apr 15 
08:41:21 2008
@@ -35,9 +35,17 @@
 struct timeval& toTimeval(struct timeval& tv, const Duration& t);
 Duration toTime(const struct timespec& ts);
 
-// Private socket related implementation details
-class SocketPrivate;
-int toFd(const SocketPrivate* s);
+// Private fd related implementation details
+class IOHandlePrivate {
+public:
+    IOHandlePrivate(int f = -1) :
+            fd(f)
+    {}
+    
+    int fd;
+};
+
+int toFd(const IOHandlePrivate* h);
 
 }}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Tue Apr 15 
08:41:21 2008
@@ -38,19 +38,8 @@
 namespace qpid {
 namespace sys {
 
-class SocketPrivate {
-public:
-    SocketPrivate(int f = -1) :
-            fd(f)
-    {}
-    
-    int fd;
-
-    std::string getName(bool local, bool includeService = false) const;   
-    std::string getService(bool local) const;   
-};
-
-std::string SocketPrivate::getName(bool local, bool includeService) const
+namespace {
+std::string getName(int fd, bool local, bool includeService = false)
 {
     ::sockaddr_storage name; // big enough for any socket address    
     ::socklen_t namelen = sizeof(name);
@@ -80,7 +69,7 @@
     }
 }
 
-std::string SocketPrivate::getService(bool local) const
+std::string getService(int fd, bool local)
 {
     ::sockaddr_storage name; // big enough for any socket address    
     ::socklen_t namelen = sizeof(name);
@@ -101,21 +90,18 @@
         throw QPID_POSIX_ERROR(rc);
     return servName;
 }
+}
 
 Socket::Socket() :
-       impl(new SocketPrivate)
+       IOHandle(new IOHandlePrivate)
 {
        createTcp();
 }
 
-Socket::Socket(SocketPrivate* sp) :
-       impl(sp)
+Socket::Socket(IOHandlePrivate* h) :
+       IOHandle(h)
 {}
 
-Socket::~Socket() {
-       delete impl;
-}
-
 void Socket::createTcp() const
 {
        int& socket = impl->fd;
@@ -225,7 +211,7 @@
 {
        int afd = ::accept(impl->fd, addr, addrlen);
        if ( afd >= 0)
-               return new Socket(new SocketPrivate(afd));
+               return new Socket(new IOHandlePrivate(afd));
        else if (errno == EAGAIN)
                return 0;
     else throw QPID_POSIX_ERROR(errno);
@@ -243,41 +229,32 @@
 
 std::string Socket::getSockname() const
 {
-    return impl->getName(true);
+    return getName(impl->fd, true);
 }
 
 std::string Socket::getPeername() const
 {
-    return impl->getName(false);
+    return getName(impl->fd, false);
 }
 
 std::string Socket::getPeerAddress() const
 {
-    return impl->getName(false, true);
+    return getName(impl->fd, false, true);
 }
 
 std::string Socket::getLocalAddress() const
 {
-    return impl->getName(true, true);
+    return getName(impl->fd, true, true);
 }
 
 uint16_t Socket::getLocalPort() const
 {
-    return atoi(impl->getService(true).c_str());
+    return atoi(getService(impl->fd, true).c_str());
 }
 
 uint16_t Socket::getRemotePort() const
 {
-    return atoi(impl->getService(true).c_str());
-}
-
-int Socket::toFd() const {
-    return impl->fd;
-}
-
-int toFd(const SocketPrivate* s)
-{
-    return s->fd;
+    return atoi(getService(impl->fd, true).c_str());
 }
 
 }} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h?rev=648288&r1=648287&r2=648288&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h Tue Apr 15 08:41:21 
2008
@@ -22,6 +22,7 @@
  */
 
 #include "qpid/sys/Socket.h"
+#include "qpid/sys/Poller.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Mutex.h"
@@ -43,8 +44,6 @@
     SocketProxy(int connectPort, const std::string host="localhost")
         : closed(false), port(listener.listen())
     {
-        int r=::pipe(closePipe);
-        if (r<0) throwErrno(QPID_MSG("::pipe returned " << r));
         client.connect(host, connectPort);
         thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
     }
@@ -58,11 +57,9 @@
             if (closed) return;
             closed=true;
         }
-        write(closePipe[1], this, 1); // Random byte to closePipe
+        poller.shutdown();
         thread.join();
         client.close();
-        ::close(closePipe[0]);
-        ::close(closePipe[1]);
     }
 
     bool isClosed() const {
@@ -79,71 +76,38 @@
     static void throwIf(bool condition, const std::string& msg) {
         if (condition) throw qpid::Exception(msg);
     }
-    
-    struct FdSet : fd_set {
-        FdSet() : maxFd(0) { clear(); }
-        void clear() { FD_ZERO(this); }
-        void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); }
-        bool isSet(int fd) const { return FD_ISSET(fd, this); }
-        bool operator[](int fd) const { return isSet(fd); }
-
-        int maxFd;
-    };
-
-    enum { RD=1, WR=2, ER=4 };
-    
-    struct Selector {
-        FdSet rd, wr, er;
-
-        void set(int fd, int sets) {
-            if (sets & RD) rd.set(fd);
-            if (sets & WR) wr.set(fd);
-            if (sets & ER) er.set(fd);
-        }
-        
-        int select() {
-            for (;;) {
-                int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd));
-                int r = ::select(maxFd + 1, &rd, &wr, &er, NULL);
-                if (r == -1 && errno == EINTR) continue;
-                if (r < 0) throwErrno(QPID_MSG("select returned " <<r));
-                return r;
-            }
-        }
-    };
 
     void run() {
         std::auto_ptr<qpid::sys::Socket> server;
         try {
-            // Accept incoming connections, watch closePipe.
-            Selector accept;
-            accept.set(listener.toFd(), RD|ER);
-            accept.set(closePipe[0], RD|ER);
-            accept.select();
-            throwIf(accept.rd[closePipe[0]], "Closed by close()");
-            throwIf(!accept.rd[listener.toFd()],"Accept failed");
+            qpid::sys::PollerHandle listenerHandle(listener);
+            poller.addFd(listenerHandle, qpid::sys::Poller::IN);
+            qpid::sys::Poller::Event event = poller.wait();
+            throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by 
close()");
+            throwIf(!(event.type == qpid::sys::Poller::READABLE && 
event.handle == &listenerHandle), "Accept failed");
+
+            poller.delFd(listenerHandle);
             server.reset(listener.accept(0, 0));
 
-            // Pump data between client & server sockets, watch closePipe.
+            // Pump data between client & server sockets
+            qpid::sys::PollerHandle clientHandle(client);
+            qpid::sys::PollerHandle serverHandle(*server);
+            poller.addFd(clientHandle, qpid::sys::Poller::IN);
+            poller.addFd(serverHandle, qpid::sys::Poller::IN);
             char buffer[1024];
             for (;;) {
-                Selector select;
-                select.set(server->toFd(), RD|ER);
-                select.set(client.toFd(), RD|ER);
-                select.set(closePipe[0], RD|ER);
-                select.select();
-                throwIf(select.rd[closePipe[0]], "Closed by close()");
-                // Read even if fd is in error to throw a useful exception.
-                bool gotData=false;
-                if (select.rd[server->toFd()] || select.er[server->toFd()]) {
+                qpid::sys::Poller::Event event = poller.wait();
+                throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by 
close()");
+                throwIf(event.type == qpid::sys::Poller::DISCONNECTED, 
"client/server disconnected");
+                if (event.handle == &serverHandle) {
                     client.write(buffer, server->read(buffer, sizeof(buffer)));
-                    gotData=true;
-                }
-                if (select.rd[client.toFd()] || select.er[client.toFd()]) {
+                    poller.rearmFd(serverHandle);
+                } else if (event.handle == &clientHandle) {
                     server->write(buffer, client.read(buffer, sizeof(buffer)));
-                    gotData=true;
+                    poller.rearmFd(clientHandle);
+                } else {
+                    throwIf(true, "No handle ready");
                 }
-                throwIf(!gotData, "No data from select()");
             }
         }
         catch (const std::exception& e) {
@@ -155,9 +119,9 @@
 
     mutable qpid::sys::Mutex lock;
     bool closed;
+    qpid::sys::Poller poller;
     qpid::sys::Socket client, listener;
     uint16_t port;
-    int closePipe[2];
     qpid::sys::Thread thread;
 };
 


Reply via email to