Author: shuston
Date: Wed Dec 10 15:37:01 2008
New Revision: 725486

URL: http://svn.apache.org/viewvc?rev=725486&view=rev
Log:
Add startReading() method required by sys::AsynchIO. Fixes QPID-1525.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=725486&r1=725485&r2=725486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Wed Dec 10 
15:37:01 2008
@@ -185,8 +185,8 @@
 }
 
 void AsynchAcceptResult::failure(int status) {
-  if (status != WSA_OPERATION_ABORTED)
-    ;
+  //if (status != WSA_OPERATION_ABORTED)
+  // Can there be anything else?  ;
   delete this;
 }
 
@@ -283,6 +283,7 @@
     virtual void notifyPendingWrite();
     virtual void queueWriteClose();
     virtual bool writeQueueEmpty();
+    virtual void startReading();
 
     /**
      * getQueuedBuffer returns a buffer from the buffer queue, if one is
@@ -320,7 +321,6 @@
 
 private:
     // Dispatch events that have completed.
-    void dispatchReadComplete(AsynchIO::BufferBase *buffer);
     void notifyEof(void);
     void notifyDisconnect(void);
     void notifyClosed(void);
@@ -328,12 +328,6 @@
     void notifyIdle(void);
 
     /**
-     * Initiate a read operation. AsynchIO::dispatchReadComplete() will be
-     * called when the read is complete and data is available.
-     */
-    void startRead(void);
-
-    /**
      * Initiate a write of the specified buffer. There's no callback for
      * write completion to the AsynchIO object.
      */
@@ -431,7 +425,7 @@
     poller->addFd(PollerHandle(socket), Poller::INPUT);
     if (writeQueue.size() > 0)  // Already have data queued for write
         notifyPendingWrite();
-    startRead();
+    startReading();
 }
 
 void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
@@ -487,56 +481,11 @@
     return writeQueue.size() == 0;
 }
 
-/**
- * Return a queued buffer if there are enough to spare.
- */
-AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
-    QLock l(bufferQueueLock);
-    // Always keep at least one buffer (it might have data that was
-    // "unread" in it).
-    if (bufferQueue.size() <= 1)
-        return 0;
-    BufferBase* buff = bufferQueue.back();
-    assert(buff);
-    bufferQueue.pop_back();
-    return buff;
-}
-
-void AsynchIO::dispatchReadComplete(AsynchIO::BufferBase *buffer) {
-    if (readCallback)
-        readCallback(*this, buffer);
-}
-
-void AsynchIO::notifyEof(void) {
-    if (eofCallback)
-        eofCallback(*this);
-}
-
-void AsynchIO::notifyDisconnect(void) {
-    if (disCallback)
-        disCallback(*this);
-}
-
-void AsynchIO::notifyClosed(void) {
-    if (closedCallback)
-        closedCallback(*this, socket);
-}
-
-void AsynchIO::notifyBuffersEmpty(void) {
-    if (emptyCallback)
-        emptyCallback(*this);
-}
-
-void AsynchIO::notifyIdle(void) {
-    if (idleCallback)
-        idleCallback(*this);
-}
-
 /*
- * Asynch reader/writer using overlapped I/O
+ * Initiate a read operation. AsynchIO::readComplete() will be
+ * called when the read is complete and data is available.
  */
-
-void AsynchIO::startRead(void) {
+void AsynchIO::startReading() {
     if (queuedDelete)
         return;
 
@@ -582,6 +531,50 @@
     return;
 }
 
+/**
+ * Return a queued buffer if there are enough to spare.
+ */
+AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
+    QLock l(bufferQueueLock);
+    // Always keep at least one buffer (it might have data that was
+    // "unread" in it).
+    if (bufferQueue.size() <= 1)
+        return 0;
+    BufferBase* buff = bufferQueue.back();
+    assert(buff);
+    bufferQueue.pop_back();
+    return buff;
+}
+
+void AsynchIO::notifyEof(void) {
+    if (eofCallback)
+        eofCallback(*this);
+}
+
+void AsynchIO::notifyDisconnect(void) {
+    if (disCallback)
+        disCallback(*this);
+}
+
+void AsynchIO::notifyClosed(void) {
+    if (closedCallback)
+        closedCallback(*this, socket);
+}
+
+void AsynchIO::notifyBuffersEmpty(void) {
+    if (emptyCallback)
+        emptyCallback(*this);
+}
+
+void AsynchIO::notifyIdle(void) {
+    if (idleCallback)
+        idleCallback(*this);
+}
+
+/*
+ * Asynch reader/writer using overlapped I/O
+ */
+
 void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
     writeInProgress = true;
     InterlockedIncrement(&opsInProgress);
@@ -622,9 +615,12 @@
     int status = result->getStatus();
     size_t bytes = result->getTransferred();
     if (status == 0 && bytes > 0) {
+        bool restartRead = true;     // May not if receiver doesn't want more
         threadReadTotal += bytes;
-        dispatchReadComplete(result->getBuff());
-        startRead();
+        if (readCallback)
+            restartRead = readCallback(*this, result->getBuff());
+        if (restartRead)
+            startReading();
     }
     else {
         // No data read, so put the buffer back. It may be partially filled,


Reply via email to