Author: astitcher
Date: Wed Jul 11 18:48:13 2007
New Revision: 555455

URL: http://svn.apache.org/viewvc?view=rev&rev=555455
Log:
* Add libuuid to libcommon link (for when apr goes away)
* Latest version of AsynchIO code


Modified:
    incubator/qpid/trunk/qpid/cpp/configure.ac
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp

Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?view=diff&rev=555455&r1=555454&r2=555455
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Wed Jul 11 18:48:13 2007
@@ -159,8 +159,9 @@
 missing=""
 AC_CHECK_HEADERS([libdaemon/daemon.h],,[missing="$missing libdaemon"])
 AC_CHECK_HEADERS([boost/shared_ptr.hpp],,[missing="$missing boost"])
+AC_CHECK_HEADERS([uuid/uuid.h],,[missing="$missing libuuid"])
 test -z "$missing" ||
-    AC_MSG_ERROR([Missing required headers. Install the folowing packages or 
-devel rpms: $missing.])
+    AC_MSG_ERROR([Missing required headers. Install the following packages or 
-devel rpms: $missing.])
 
 # Enable/disable cluster functionality based on presence of usable openais
 # and devel libs.
@@ -178,8 +179,7 @@
 fi
 if test x$libcpg = xno -a x$cpg_h = xyes; then
    AC_MSG_WARN([Found cpg.h but libcpg is missing or does not contain 
cpg_local_get. Need build of openais whitetank branch head as of 2007-06-20])
-fi   
-
+fi
 AC_CONFIG_FILES([
   qpidc.spec
   Makefile

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=555455&r1=555454&r2=555455
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Jul 11 18:48:13 2007
@@ -121,6 +121,7 @@
 
 libqpidcommon_la_LIBADD = \
   -lboost_program_options \
+  -luuid \
   libLogger.la \
   $(APR_LIBS) \
   $(LIB_DLOPEN) \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?view=diff&rev=555455&r1=555454&r2=555455
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Wed Jul 11 18:48:13 
2007
@@ -57,42 +57,61 @@
  * 
  * Writer accepts a buffer and queues it for writing; can also be given
  * a callback for when writing is "idle" (ie fd is writable, but nothing to 
write)
+ * 
+ * The class is implemented in terms of DispatchHandle to allow it to be 
deleted by deleting
+ * the contained DispatchHandle
  */
-class AsynchIO {
+class AsynchIO : private DispatchHandle {
 public:
     struct Buffer {
+        typedef boost::function1<void, const Buffer&> RecycleStorage;
+        
         char* const bytes;
         const int32_t byteCount;
+        int32_t dataStart;
+        int32_t dataCount;
         
         Buffer(char* const b, const int32_t s) :
             bytes(b),
-            byteCount(s)
+            byteCount(s),
+            dataStart(0),
+            dataCount(s)
+        {}
+
+        virtual ~Buffer()
         {}
     };
 
-    typedef boost::function2<void, const Buffer&, int32_t> ReadCallback;
-    typedef boost::function0<void> EofCallback;
-    typedef boost::function0<void> BuffersEmptyCallback;
-    typedef boost::function1<void, int> IdleCallback;
+    typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
+    typedef boost::function1<void, AsynchIO&> EofCallback;
+    typedef boost::function1<void, AsynchIO&> DisconnectCallback;
+    typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
+    typedef boost::function1<void, AsynchIO&> IdleCallback;
 
 private:
     ReadCallback readCallback;
     EofCallback eofCallback;
+    DisconnectCallback disCallback;
     BuffersEmptyCallback emptyCallback;
     IdleCallback idleCallback;
-    DispatchHandle handle;
-    std::deque<Buffer> bufferQueue;
-    std::deque<Buffer> writeQueue;
+    std::deque<Buffer*> bufferQueue;
+    std::deque<Buffer*> writeQueue;
 
 public:
-    AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, BuffersEmptyCallback 
eCb = 0, IdleCallback iCb = 0);
+    AsynchIO(int fd,
+        ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+        BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
+    void queueForDeletion();
+
     void start(Poller::shared_ptr poller);
-    void QueueReadBuffer(const Buffer& buff);
-    void QueueWrite(const Buffer& buff);
+    void queueReadBuffer(Buffer* buff);
+    void queueWrite(Buffer* buff);
 
 private:
+    ~AsynchIO();
     void readable(DispatchHandle& handle);
     void writeable(DispatchHandle& handle);
+    void disconnected(DispatchHandle& handle);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?view=diff&rev=555455&r1=555454&r2=555455
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Wed Jul 11 
18:48:13 2007
@@ -40,10 +40,10 @@
 
         // If can read/write then dispatch appropriate callbacks        
         if (h) {
-            h->dispatchCallbacks(event.dir);
+            h->dispatchCallbacks(event.type);
         } else {
             // Handle shutdown
-            switch (event.dir) {
+            switch (event.type) {
             case Poller::SHUTDOWN:
                 goto dispatcher_shutdown;
             default:
@@ -57,7 +57,11 @@
     ;
 }
 
-void DispatchHandle::watch(Poller::shared_ptr poller0) {
+DispatchHandle::~DispatchHandle() {
+    stopWatch();
+}
+
+void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
     bool r = readableCallback;
     bool w = writableCallback;
 
@@ -84,25 +88,26 @@
 }
 
 void DispatchHandle::rewatch() {
-    assert(poller);
     bool r = readableCallback;
     bool w = writableCallback;
 
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
-    case DispatchHandle::IDLE:
-        assert(false);
+    case IDLE:
         break;
-    case DispatchHandle::DELAYED_R:
-    case DispatchHandle::DELAYED_W:
-    case DispatchHandle::CALLBACK:
+    case DELAYED_R:
+    case DELAYED_W:
+    case CALLBACK:
         state = r ?
             (w ? DELAYED_RW : DELAYED_R) :
             DELAYED_W;
         break;
-    case DispatchHandle::INACTIVE:
-    case DispatchHandle::ACTIVE_R:
-    case DispatchHandle::ACTIVE_W: {
+    case DELAYED_DELETE:
+        break;
+    case INACTIVE:
+    case ACTIVE_R:
+    case ACTIVE_W: {
+        assert(poller);
         Poller::Direction d = r ?
             (w ? Poller::INOUT : Poller::IN) :
             Poller::OUT;
@@ -112,42 +117,43 @@
             ACTIVE_W;
         break;
         }
-    case DispatchHandle::DELAYED_RW:
-    case DispatchHandle::ACTIVE_RW:
+    case DELAYED_RW:
+    case ACTIVE_RW:
         // Don't need to do anything already waiting for readable/writable
         break;
     }
 }
 
 void DispatchHandle::rewatchRead() {
-    assert(poller);
     if (!readableCallback) {
         return;
     }
     
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
-    case DispatchHandle::IDLE:
-        assert(false);
+    case IDLE:
         break;
-    case DispatchHandle::DELAYED_R:
-    case DispatchHandle::DELAYED_RW:
+    case DELAYED_R:
+    case DELAYED_RW:
+    case DELAYED_DELETE:
         break;
-    case DispatchHandle::DELAYED_W:
+    case DELAYED_W:
         state = DELAYED_RW;
         break;
-    case DispatchHandle::CALLBACK:
+    case CALLBACK:
         state = DELAYED_R;
         break;
-    case DispatchHandle::ACTIVE_R:
-    case DispatchHandle::ACTIVE_RW:
-        // Nothing to do: already wating for readable
+    case ACTIVE_R:
+    case ACTIVE_RW:
+        // Nothing to do: already waiting for readable
         break;
-    case DispatchHandle::INACTIVE:
+    case INACTIVE:
+        assert(poller);
         poller->modFd(*this, Poller::IN);
         state = ACTIVE_R;
         break;
-    case DispatchHandle::ACTIVE_W:
+    case ACTIVE_W:
+        assert(poller);
         poller->modFd(*this, Poller::INOUT);
         state = ACTIVE_RW;
         break;
@@ -155,101 +161,245 @@
 }
 
 void DispatchHandle::rewatchWrite() {
-    assert(poller);
     if (!writableCallback) {
         return;
     }
     
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
-    case DispatchHandle::IDLE:
-        assert(false);
+    case IDLE:
         break;
-    case DispatchHandle::DELAYED_W:
-    case DispatchHandle::DELAYED_RW:
+    case DELAYED_W:
+    case DELAYED_RW:
+    case DELAYED_DELETE:
         break;
-    case DispatchHandle::DELAYED_R:
+    case DELAYED_R:
         state = DELAYED_RW;
         break;
-    case DispatchHandle::CALLBACK:
+    case CALLBACK:
         state = DELAYED_W;
         break;
-    case DispatchHandle::INACTIVE:
+    case INACTIVE:
+        assert(poller);
         poller->modFd(*this, Poller::OUT);
         state = ACTIVE_W;
         break;
-    case DispatchHandle::ACTIVE_R:
+    case ACTIVE_R:
+        assert(poller);
         poller->modFd(*this, Poller::INOUT);
         state = ACTIVE_RW;
         break;
-    case DispatchHandle::ACTIVE_W:
-    case DispatchHandle::ACTIVE_RW:
+    case ACTIVE_W:
+    case ACTIVE_RW:
         // Nothing to do: already waiting for writable
         break;
    }
 }
 
+void DispatchHandle::unwatchRead() {
+    if (!readableCallback) {
+        return;
+    }
+    
+    ScopedLock<Mutex> lock(stateLock);
+    switch(state) {
+    case IDLE:
+        break;
+    case DELAYED_R:
+        state = CALLBACK;
+        break;
+    case DELAYED_RW:
+        state = DELAYED_W;    
+        break;
+    case DELAYED_W:
+    case CALLBACK:
+    case DELAYED_DELETE:
+        break;
+    case ACTIVE_R:
+        assert(poller);
+        poller->modFd(*this, Poller::NONE);
+        state = INACTIVE;
+        break;
+    case ACTIVE_RW:
+        assert(poller);
+        poller->modFd(*this, Poller::OUT);
+        state = ACTIVE_W;
+        break;
+    case ACTIVE_W:
+    case INACTIVE:
+        break;
+    }
+}
+
+void DispatchHandle::unwatchWrite() {
+    if (!writableCallback) {
+        return;
+    }
+    
+    ScopedLock<Mutex> lock(stateLock);
+    switch(state) {
+    case IDLE:
+        break;
+    case DELAYED_W:
+        state = CALLBACK;
+        break;
+    case DELAYED_RW:
+        state = DELAYED_R;
+        break;
+    case DELAYED_R:
+    case CALLBACK:
+    case DELAYED_DELETE:
+        break;
+    case ACTIVE_W:
+        assert(poller);
+        poller->modFd(*this, Poller::NONE);
+        state = INACTIVE;
+        break;
+    case ACTIVE_RW:
+        assert(poller);
+        poller->modFd(*this, Poller::IN);
+        state = ACTIVE_R;
+        break;
+    case ACTIVE_R:
+    case INACTIVE:
+        break;
+   }
+}
+
 void DispatchHandle::unwatch() {
-    assert(poller);
     ScopedLock<Mutex> lock(stateLock);
+    switch (state) {
+    case IDLE:
+        break;
+    case DELAYED_R:
+    case DELAYED_W:
+    case DELAYED_RW:
+    case CALLBACK:
+        state = CALLBACK;
+        break;
+    case DELAYED_DELETE:
+        break;
+    default:
+        assert(poller);
+        poller->modFd(*this, Poller::NONE);
+        state = INACTIVE;
+        break;
+    }            
+}
+
+void DispatchHandle::stopWatch() {
+    ScopedLock<Mutex> lock(stateLock);
+    if ( state == IDLE) {
+       return;
+    }
+    assert(poller);
     poller->delFd(*this);
     poller.reset();
     state = IDLE;
 }
 
-void DispatchHandle::dispatchCallbacks(Poller::Direction dir) {
-    // Note that we are now doing the callbacks
+// The slightly strange switch structure
+// is to ensure that the lock is released before
+// we do the delete
+void DispatchHandle::doDelete() {
+    // If we're in the middle of a callback defer the delete
     {
     ScopedLock<Mutex> lock(stateLock);
-    assert(
-        state == ACTIVE_R ||
-        state == ACTIVE_W ||
-        state == ACTIVE_RW);
+    switch (state) {
+    case DELAYED_R:
+    case DELAYED_W:
+    case DELAYED_RW:
+    case CALLBACK:
+    case DELAYED_DELETE:
+        state = DELAYED_DELETE;
+        return;
+    default:
+        break;
+    }
+    }
+    // If we're not then do it right away
+    delete this;
+}
 
-    state = CALLBACK;
+void DispatchHandle::dispatchCallbacks(Poller::EventType type) {
+    // Note that we are now doing the callbacks
+    {
+    ScopedLock<Mutex> lock(stateLock);
+    
+    // Set up to wait for same events next time unless reset
+    switch(state) {
+    case ACTIVE_R:
+        state = DELAYED_R;
+        break;
+    case ACTIVE_W:
+        state = DELAYED_W;
+        break;
+    case ACTIVE_RW:
+        state = DELAYED_RW;
+        break;
+    default:
+        assert(false);
+    }
     }
 
     // Do callbacks - whilst we are doing the callbacks we are prevented from 
processing
     // the same handle until we re-enable it. To avoid rentering the callbacks 
for a single
     // handle re-enabling in the callbacks is actually deferred until they are 
complete.
-    switch (dir) {
-    case Poller::IN:
+    switch (type) {
+    case Poller::READABLE:
         readableCallback(*this);
         break;
-    case Poller::OUT:
+    case Poller::WRITABLE:
         writableCallback(*this);
         break;
-    case Poller::INOUT:
+    case Poller::READ_WRITABLE:
         readableCallback(*this);
         writableCallback(*this);
         break;
+    case Poller::DISCONNECTED:
+        {
+        ScopedLock<Mutex> lock(stateLock);
+        state = CALLBACK;
+        }
+        if (disconnectedCallback) {
+            disconnectedCallback(*this);
+        }
+        break;
     default:
         assert(false);
     }
 
     // If any of the callbacks re-enabled reading/writing then actually
     // do it now
+    {
     ScopedLock<Mutex> lock(stateLock);
     switch (state) {
     case DELAYED_R:
         poller->modFd(*this, Poller::IN);
         state = ACTIVE_R;
-        break;
+        return;
     case DELAYED_W:
         poller->modFd(*this, Poller::OUT);
         state = ACTIVE_W;
-        break;
+        return;
     case DELAYED_RW:
         poller->modFd(*this, Poller::INOUT);
         state = ACTIVE_RW;
-        break;
+        return;
     case CALLBACK:
         state = INACTIVE;
-        break;
+        return;
+    case IDLE:
+       return;
     default:
         // This should be impossible
         assert(false);
+        return;
+    case DELAYED_DELETE:
+        break;
+    }
     }            
+    delete this;
 }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h?view=diff&rev=555455&r1=555454&r2=555455
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h Wed Jul 11 18:48:13 
2007
@@ -44,26 +44,39 @@
 private:
     Callback readableCallback;
     Callback writableCallback;
+    Callback disconnectedCallback;
     Poller::shared_ptr poller;
     Mutex stateLock;
-    enum { IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, CALLBACK, DELAYED_R, 
DELAYED_W, DELAYED_RW} state;
+    enum {
+        IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
+        CALLBACK, DELAYED_R, DELAYED_W, DELAYED_RW, DELAYED_DELETE
+    } state;
 
 public:
-    DispatchHandle(int fd, Callback rCb, Callback wCb) :
+    DispatchHandle(int fd, Callback rCb, Callback wCb, Callback dCb) :
       PollerHandle(fd),
       readableCallback(rCb),
       writableCallback(wCb),
+      disconnectedCallback(dCb),
       state(IDLE)
     {}
 
-    void watch(Poller::shared_ptr poller);
+    ~DispatchHandle();
+
+    void startWatch(Poller::shared_ptr poller);
     void rewatch();
     void rewatchRead();
     void rewatchWrite();
     void unwatch();
+    void unwatchRead();
+    void unwatchWrite();
+    void stopWatch();
+    
+protected:
+    void doDelete();
 
 private:
-    void dispatchCallbacks(Poller::Direction dir);
+    void dispatchCallbacks(Poller::EventType dir);
 };
 
 class Dispatcher : public Runnable {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?view=diff&rev=555455&r1=555454&r2=555455
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Wed Jul 11 18:48:13 2007
@@ -61,20 +61,29 @@
     typedef boost::shared_ptr<Poller> shared_ptr;
 
     enum Direction {
-        NONE,
+        NONE = 0,
         IN,
         OUT,
-        INOUT,
-        SHUTDOWN
+        INOUT
+    };
+
+    enum EventType {
+        INVALID = 0,
+        READABLE,
+        WRITABLE,
+        READ_WRITABLE,
+        DISCONNECTED,
+        SHUTDOWN,
+        TIMEOUT
     };
 
     struct Event {
         PollerHandle* handle;
-        Direction dir;
+        EventType type;
         
-        Event(PollerHandle* handle0, Direction dir0) :
+        Event(PollerHandle* handle0, EventType type0) :
           handle(handle0),
-          dir(dir0) {
+          type(type0) {
         }
     };
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?view=diff&rev=555455&r1=555454&r2=555455
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Wed Jul 11 
18:48:13 2007
@@ -40,7 +40,9 @@
     enum FDStat {
         ABSENT,
         MONITORED,
-        INACTIVE
+        INACTIVE,
+        HUNGUP,
+        MONITORED_HUNGUP
     };
 
     ::__uint32_t events;
@@ -51,6 +53,39 @@
       events(0),
       stat(ABSENT) {
     }
+    
+    bool isActive() const {
+        return stat == MONITORED || stat == MONITORED_HUNGUP;
+    }
+
+    void setActive() {
+        stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED;
+    }
+
+    bool isInactive() const {
+        return stat == INACTIVE || stat == HUNGUP;
+    }
+
+    void setInactive() {
+        stat = INACTIVE;
+    }
+
+    bool isIdle() const {
+        return stat == ABSENT;
+    }
+
+    void setIdle() {
+        stat = ABSENT;
+    }
+
+    bool isHungup() const {
+        return stat == MONITORED_HUNGUP || stat == HUNGUP;
+    }
+
+    void setHungup() {
+        assert(stat == MONITORED);
+        stat = HUNGUP;
+    }
 };
 
 PollerHandle::PollerHandle(int fd0) :
@@ -108,13 +143,16 @@
         }
     }
     
-    static Poller::Direction epollToDirection(::__uint32_t events) {
+    static Poller::EventType epollToDirection(::__uint32_t events) {
+        // POLLOUT & POLLHUP are mutually exclusive really, but at least 
socketpairs
+        // can give you both!
+        events = (events & ::EPOLLHUP) ? events & ~::EPOLLOUT : events;
         ::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT);
         switch (e) {
-            case ::EPOLLIN: return Poller::IN;
-            case ::EPOLLOUT: return Poller::OUT;
-            case ::EPOLLIN | ::EPOLLOUT: return Poller::INOUT;
-            default: return Poller::NONE;
+            case ::EPOLLIN: return Poller::READABLE;
+            case ::EPOLLOUT: return Poller::WRITABLE;
+            case ::EPOLLIN | ::EPOLLOUT: return Poller::READ_WRITABLE;
+            default: return (events & ::EPOLLHUP) ? Poller::DISCONNECTED : 
Poller::INVALID;
         }
     }
 
@@ -138,11 +176,11 @@
     ::epoll_event epe;
     int op;
     
-    if (eh.stat == PollerHandlePrivate::ABSENT) {
+    if (eh.isIdle()) {
         op = EPOLL_CTL_ADD;
         epe.events = PollerPrivate::directionToEpollEvent(dir) | 
::EPOLLONESHOT;
     } else {
-        assert(eh.stat == PollerHandlePrivate::MONITORED);
+        assert(eh.isActive());
         op = EPOLL_CTL_MOD;
         epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
     }
@@ -152,22 +190,27 @@
     
     // Record monitoring state of this fd
     eh.events = epe.events;
-    eh.stat = PollerHandlePrivate::MONITORED;
+    eh.setActive();
 }
 
 void Poller::delFd(PollerHandle& handle) {
     PollerHandlePrivate& eh = *handle.impl;
     ScopedLock<Mutex> l(eh.lock);
-    assert(eh.stat != PollerHandlePrivate::ABSENT);
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 
0));
-    eh.stat = PollerHandlePrivate::ABSENT;
+    assert(!eh.isIdle());
+    int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0);
+    // Ignore EBADF since deleting a nonexistent fd has the overall required 
result!
+    // And allows the case where a sloppy program closes the fd and then does 
the delFd()
+    if (rc == -1 && errno != EBADF) {
+           QPID_POSIX_CHECK(rc);
+    }
+    eh.setIdle();
 }
 
 // modFd is equivalent to delFd followed by addFd
 void Poller::modFd(PollerHandle& handle, Direction dir) {
     PollerHandlePrivate& eh = *handle.impl;
     ScopedLock<Mutex> l(eh.lock);
-    assert(eh.stat != PollerHandlePrivate::ABSENT);
+    assert(!eh.isIdle());
     
     ::epoll_event epe;
     epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
@@ -177,13 +220,13 @@
     
     // Record monitoring state of this fd
     eh.events = epe.events;
-    eh.stat = PollerHandlePrivate::MONITORED;
+    eh.setActive();
 }
 
 void Poller::rearmFd(PollerHandle& handle) {
     PollerHandlePrivate& eh = *handle.impl;
     ScopedLock<Mutex> l(eh.lock);
-    assert(eh.stat == PollerHandlePrivate::INACTIVE);
+    assert(eh.isInactive());
 
     ::epoll_event epe;
     epe.events = eh.events;        
@@ -191,7 +234,7 @@
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), 
&epe));
 
-    eh.stat = PollerHandlePrivate::MONITORED;
+    eh.setActive();
 }
 
 void Poller::shutdown() {
@@ -229,8 +272,17 @@
             ScopedLock<Mutex> l(eh.lock);
             
             // the handle could have gone inactive since we left the epoll_wait
-            if (eh.stat == PollerHandlePrivate::MONITORED) {
-                eh.stat = PollerHandlePrivate::INACTIVE;
+            if (eh.isActive()) {
+                // If the connection has been hungup we could still be readable
+                // (just not writable), allow us to readable until we get here 
again
+                if (epe.events & ::EPOLLHUP) {
+                    if (eh.isHungup()) {
+                        return Event(handle, DISCONNECTED);
+                    }
+                    eh.setHungup();
+                } else {
+                    eh.setInactive();
+                }
                 return Event(handle, 
PollerPrivate::epollToDirection(epe.events));
             }
         }
@@ -245,8 +297,8 @@
         // If the wait wasn't indefinite, but we were interrupted then we have 
to return
         // with a timeout as we don't know how long we've waited so far and so 
we can't
         // continue the wait.
-        if (rc == 0 || timeoutMs == -1) {
-            return Event(0, NONE);
+        if (rc == 0 || timeoutMs != -1) {
+            return Event(0, TIMEOUT);
         }
     } while (true);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?view=diff&rev=555455&r1=555454&r2=555455
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Wed Jul 11 
18:48:13 2007
@@ -27,6 +27,7 @@
 #include <fcntl.h>
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <signal.h>
 #include <errno.h>
 
 #include <boost/bind.hpp>
@@ -42,6 +43,14 @@
     QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK));
 }
 
+/*
+ * Make *process* not generate SIGPIPE when writing to closed
+ * pipe/socket (necessary as default action is to terminate process)
+ */
+void ignoreSigpipe() {
+       ::signal(SIGPIPE, SIG_IGN);
+}
+
 }
 
 /*
@@ -50,13 +59,14 @@
 
 AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) :
     acceptedCallback(callback),
-    handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0) {
+    handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
 
     nonblocking(fd);
+    ignoreSigpipe();
 }
 
 void AsynchAcceptor::start(Poller::shared_ptr poller) {
-    handle.watch(poller);
+    handle.startWatch(poller);
 }
 
 /*
@@ -84,28 +94,50 @@
 /*
  * Asynch reader/writer
  */
-AsynchIO::AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, 
BuffersEmptyCallback eCb, IdleCallback iCb) :
+AsynchIO::AsynchIO(int fd,
+    ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+    BuffersEmptyCallback eCb, IdleCallback iCb) :
+
+    DispatchHandle(fd, 
+        boost::bind(&AsynchIO::readable, this, _1),
+        boost::bind(&AsynchIO::writeable, this, _1),
+        boost::bind(&AsynchIO::disconnected, this, _1)),
     readCallback(rCb),
     eofCallback(eofCb),
+    disCallback(disCb),
     emptyCallback(eCb),
-    idleCallback(iCb),
-    handle(fd, boost::bind(&AsynchIO::readable, this, _1), 
boost::bind(&AsynchIO::writeable, this, _1)) {
+    idleCallback(iCb) {
 
     nonblocking(fd);
 }
 
+struct deleter
+{
+  template <typename T>
+  void operator()(T *ptr){ delete ptr;}
+};
+
+AsynchIO::~AsynchIO() {
+    std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
+    std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
+}
+
+void AsynchIO::queueForDeletion() {
+    DispatchHandle::doDelete();
+}
+
 void AsynchIO::start(Poller::shared_ptr poller) {
-    handle.watch(poller);
+    DispatchHandle::startWatch(poller);
 }
 
-void AsynchIO::QueueReadBuffer(const Buffer& buff) {
+void AsynchIO::queueReadBuffer(Buffer* buff) {
     bufferQueue.push_front(buff);
-    handle.rewatchRead();
+    DispatchHandle::rewatchRead();
 }
 
-void AsynchIO::QueueWrite(const Buffer& buff) {
+void AsynchIO::queueWrite(Buffer* buff) {
     writeQueue.push_front(buff);
-    handle.rewatchWrite();
+    DispatchHandle::rewatchWrite();
 }
 
 /*
@@ -117,22 +149,34 @@
         // (Try to) get a buffer
         if (!bufferQueue.empty()) {
             // Read into buffer
-            Buffer buff = bufferQueue.back();
+            Buffer* buff = bufferQueue.back();
             bufferQueue.pop_back();
             errno = 0;
-            int rc = ::read(h.getFD(), buff.bytes, buff.byteCount);
+            int rc = ::read(h.getFD(), buff->bytes, buff->byteCount);
             if (rc == 0) {
-                eofCallback();    
+                eofCallback(*this);
+                h.unwatchRead();
+                return;
             } else if (rc > 0) {
-                readCallback(buff, rc);
+                buff->dataStart = 0;
+                buff->dataCount = rc;
+                readCallback(*this, buff);
+                if (rc != buff->byteCount) {
+                    // If we didn't fill the read buffer then time to stop 
reading
+                    return;
+                }
             } else {
                 // Put buffer back
                 bufferQueue.push_back(buff);
                 
-                if (errno == EAGAIN) {
-                    // We must have just put a buffer back so we know
-                    // we can do this
-                    h.rewatchRead();
+                // This is effectively the same as eof
+                if (errno == ECONNRESET) {
+                    eofCallback(*this);
+                    h.unwatchRead();
+                    return;
+                } else if (errno == EAGAIN) {
+                    // We have just put a buffer back so we know
+                    // we can carry on watching for reads
                     return;
                 } else {
                     QPID_POSIX_CHECK(rc);
@@ -141,10 +185,11 @@
         } else {
             // Something to read but no buffer
             if (emptyCallback) {
-                emptyCallback();
+                emptyCallback(*this);
             }
             // If we still have no buffers we can't do anything more
             if (bufferQueue.empty()) {
+                h.unwatchRead();
                 return;
             }
             
@@ -160,36 +205,55 @@
         // See if we've got something to write
         if (!writeQueue.empty()) {
             // Write buffer
-            Buffer buff = writeQueue.back();
+            Buffer* buff = writeQueue.back();
             writeQueue.pop_back();
             errno = 0;
-            int rc = ::write(h.getFD(), buff.bytes, buff.byteCount);
+            assert(buff->dataStart+buff->dataCount <= buff->byteCount);
+            int rc = ::write(h.getFD(), buff->bytes+buff->dataStart, 
buff->dataCount);
             if (rc >= 0) {
+                // If we didn't write full buffer put rest back
+                if (rc != buff->dataCount) {
+                    buff->dataStart += rc;
+                    buff->dataCount -= rc;
+                    writeQueue.push_back(buff);
+                    return;
+                }
+                
                 // Recycle the buffer
-                QueueReadBuffer(buff);
+                queueReadBuffer(buff);
             } else {
                 // Put buffer back
                 writeQueue.push_back(buff);
-                
-                if (errno == EAGAIN) {
+                if (errno == ECONNRESET || errno == EPIPE) {
+                    // Just stop watching for write here - we'll get a
+                    // disconnect callback soon enough
+                    h.unwatchWrite();
+                    return;
+                } else if (errno == EAGAIN) {
                     // We have just put a buffer back so we know
-                    // we can do this
-                    h.rewatchWrite();
+                    // we can carry on watching for writes
                     return;
                 } else {
                     QPID_POSIX_CHECK(rc);
                 }
             }
         } else {
-            // Something to read but no buffer
+            // Fd is writable, but nothing to write
             if (idleCallback) {
-                idleCallback(h.getFD());
+                idleCallback(*this);
             }
             // If we still have no buffers to write we can't do anything more
             if (writeQueue.empty()) {
+                h.unwatchWrite();
                 return;
             }
         }
     } while (true);
 }
         
+void AsynchIO::disconnected(DispatchHandle& h) {
+    if (disCallback) {
+        disCallback(*this);
+        h.unwatch();
+    }
+}


Reply via email to