Author: gsim
Date: Tue Aug 14 05:20:14 2007
New Revision: 565725

URL: http://svn.apache.org/viewvc?view=rev&rev=565725
Log:
Undoing change to asyncio as incomplete writes aren't handled.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?view=diff&rev=565725&r1=565724&r2=565725
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Tue Aug 14 
05:20:14 2007
@@ -25,12 +25,6 @@
 #include "qpid/framing/AMQFrame.h"
 #include "Connector.h"
 
-#include "qpid/sys/AsynchIO.h"
-#include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/Poller.h"
-
-#include <boost/bind.hpp>
-
 namespace qpid {
 namespace client {
 
@@ -49,6 +43,7 @@
     idleIn(0), idleOut(0), 
     timeoutHandler(0),
     shutdownHandler(0),
+    inbuf(receive_buffer_size), 
     outbuf(send_buffer_size)
 { }
 
@@ -61,7 +56,6 @@
 void Connector::connect(const std::string& host, int port){
     socket.connect(host, port);
     closed = false;
-    poller = Poller::shared_ptr(new Poller);
     receiver = Thread(this);
 }
 
@@ -74,7 +68,7 @@
 bool Connector::closeInternal() {
     Mutex::ScopedLock l(closedLock);
     if (!closed) {
-        poller->shutdown();
+        socket.close();
         closed = true;
         return true;
     }
@@ -97,8 +91,6 @@
     return this; 
 }
 
-// TODO: astitcher 20070908: Writing still needs to be transferred to the 
aynchronous IO
-// framework.
 void Connector::send(AMQFrame& frame){
     writeBlock(&frame);
     QPID_LOG(trace, "SENT: " << frame);
@@ -129,10 +121,6 @@
         shutdownHandler->shutdown();
 }
 
-// TODO: astitcher 20070908: This version of the code can never time out, so 
the idle processing
-// can never be called. The timeut processing needs to be added into the 
underlying Dispatcher code
-//
-// TODO: astitcher 20070908: EOF is dealt with separately now via a callback 
to eof
 void Connector::checkIdle(ssize_t status){
     if(timeoutHandler){
         AbsTime t = now();
@@ -178,65 +166,33 @@
     timeoutHandler = handler;
 }
 
-
-// Buffer definition
-struct Buff : public AsynchIO::Buffer {
-    Buff() :
-        AsynchIO::Buffer(new char[65536], 65536)
-    {}
-    ~Buff()
-    { delete [] bytes;}
-};
-
-void Connector::readbuff(AsynchIO& aio, AsynchIO::Buffer* buff) {
-    framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
-
-       AMQFrame frame(version);
-       while(frame.decode(in)){
-           QPID_LOG(trace, "RECV: " << frame);
-               input->received(frame);
-       }
-       // TODO: unreading needs to go away, and when we can cope
-       // with multiple sub-buffers in the general buffer scheme, it will
-       if (in.available() != 0) {
-               // Adjust buffer for used bytes and then "unread them"
-               buff->dataStart += buff->dataCount-in.available();
-               buff->dataCount = in.available();
-               aio.unread(buff);
-       } else {
-               // Give whole buffer back to aio subsystem
-               aio.queueReadBuffer(buff);
-       }
-}
-
-void Connector::eof(AsynchIO&) {
-       handleClosed();
-}
-
-// TODO: astitcher 20070908 This version of the code can never time out, so 
the idle processing
-// will never be called
 void Connector::run(){
-       try {
-           Dispatcher d(poller);
-       
-           AsynchIO* aio = new AsynchIO(socket,
-               boost::bind(&Connector::readbuff, this, _1, _2),
-               boost::bind(&Connector::eof, this, _1),
-                boost::bind(&Connector::eof, this, _1));
-           
-           for (int i = 0; i < 32; i++) {
-               aio->queueReadBuffer(new Buff);
+    try{
+       while(!closed){
+            ssize_t available = inbuf.available();
+            if(available < 1){
+                THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+            }
+            ssize_t received = socket.recv(inbuf.start(), available);
+           checkIdle(received);
+
+           if(!closed && received > 0){
+               inbuf.move(received);
+               inbuf.flip();//position = 0, limit = total data read
+               
+               AMQFrame frame(version);
+               while(frame.decode(inbuf)){
+                    QPID_LOG(trace, "RECV: " << frame);
+                   input->received(frame);
+               }
+                //need to compact buffer to preserve any 'extra' data
+                inbuf.compact();
            }
-       
-           aio->start(poller);
-           d.run();
-        aio->queueForDeletion();
-        socket.close();
-       } catch (const std::exception& e) {
+       }
+    } catch (const std::exception& e) {
         QPID_LOG(error, e.what());
         handleClosed();
     }
 }
-
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?view=diff&rev=565725&r1=565724&r2=565725
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Tue Aug 14 
05:20:14 2007
@@ -34,10 +34,9 @@
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Socket.h"
 #include "qpid/sys/Time.h"
-#include "qpid/sys/AsynchIO.h"
 
 namespace qpid {
-       
+
 namespace client {
 
 class Connector : public framing::OutputHandler, 
@@ -63,6 +62,7 @@
     framing::InitiationHandler* initialiser;
     framing::OutputHandler* output;
        
+    framing::Buffer inbuf;
     framing::Buffer outbuf;
 
     sys::Mutex writeLock;
@@ -70,8 +70,6 @@
 
     sys::Socket socket;
 
-    sys::Poller::shared_ptr poller;
-
     void checkIdle(ssize_t status);
     void writeBlock(framing::AMQDataBlock* data);
     void writeToSocket(char* data, size_t available);
@@ -80,10 +78,7 @@
     void run();
     void handleClosed();
     bool closeInternal();
-    
-    void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::Buffer*);
-    void eof(qpid::sys::AsynchIO&);
-    
+
   friend class Channel;
   public:
     Connector(framing::ProtocolVersion pVersion,


Reply via email to