Author: astitcher
Date: Fri Aug 31 11:20:29 2007
New Revision: 571529

URL: http://svn.apache.org/viewvc?rev=571529&view=rev
Log:
* Changes to make C++ client code use the asynchronous network IO
* Fixed up the test for buffer changes
* Removed unused buffer operations

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/FieldTableTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Fri Aug 31 
11:20:29 2007
@@ -149,14 +149,17 @@
     for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); 
i++) {
         size += i->size() + 1/*shortstr size*/;        
     }
-    Buffer buffer(size + 4/*longstr size*/);
-    buffer.putLong(size);
+
+    char* bytes = static_cast<char*>(::alloca(size + 4/*longstr size*/));
+    Buffer wbuffer(bytes, size + 4/*longstr size*/);
+    wbuffer.putLong(size);
     for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); 
i++) {
-        buffer.putShortString(*i);
+        wbuffer.putShortString(*i);
     }
-    buffer.flip();
+
+    Buffer rbuffer(bytes, size + 4/*longstr size*/);
     string data;
-    buffer.getLongString(data);
+    rbuffer.getLongString(data);
 
     FieldTable response;
     response.setString("xids", data);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Fri Aug 31 
11:20:29 2007
@@ -25,6 +25,12 @@
 #include "qpid/framing/AMQFrame.h"
 #include "Connector.h"
 
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+
+#include <boost/bind.hpp>
+
 namespace qpid {
 namespace client {
 
@@ -43,9 +49,9 @@
     idleIn(0), idleOut(0), 
     timeoutHandler(0),
     shutdownHandler(0),
-    inbuf(receive_buffer_size), 
-    outbuf(send_buffer_size)
-{ }
+    aio(0)
+{
+}
 
 Connector::~Connector(){
     if (receiver.id()) {
@@ -56,19 +62,28 @@
 void Connector::connect(const std::string& host, int port){
     socket.connect(host, port);
     closed = false;
-    receiver = Thread(this);
+    poller = Poller::shared_ptr(new Poller);
+    aio = new AsynchIO(socket,
+        boost::bind(&Connector::readbuff, this, _1, _2),
+        boost::bind(&Connector::eof, this, _1),
+        boost::bind(&Connector::eof, this, _1),
+        0, // closed
+        0, // nobuffs
+        boost::bind(&Connector::writebuff, this, _1));
 }
 
 void Connector::init(){
     ProtocolInitiation init(version);
-    writeBlock(&init);
+
+    writeDataBlock(init);
+    receiver = Thread(this);
 }
 
 // Call with closedLock held
 bool Connector::closeInternal() {
     Mutex::ScopedLock l(closedLock);
     if (!closed) {
-        socket.close();
+        poller->shutdown();
         closed = true;
         return true;
     }
@@ -92,28 +107,11 @@
 }
 
 void Connector::send(AMQFrame& frame){
-    writeBlock(&frame);
-    QPID_LOG(trace, "SENT: " << frame);
-}
-
-void Connector::writeBlock(AMQDataBlock* data){
     Mutex::ScopedLock l(writeLock);
-    data->encode(outbuf);
-    //transfer data to wire
-    outbuf.flip();
-    writeToSocket(outbuf.start(), outbuf.available());
-    outbuf.clear();
-}
+    writeFrameQueue.push(frame);
+    aio->queueWrite();
 
-void Connector::writeToSocket(char* data, size_t available){
-    size_t written = 0;
-    while(written < available && !closed){
-       ssize_t sent = socket.send(data + written, available-written);
-        if(sent > 0) {
-            lastOut = now();
-            written += sent;
-        }
-    }
+    QPID_LOG(trace, "SENT: " << frame);
 }
 
 void Connector::handleClosed() {
@@ -121,6 +119,10 @@
         shutdownHandler->shutdown();
 }
 
+// TODO: astitcher 20070908: This version of the code can never time out, so 
the idle processing
+// can never be called. The timeut processing needs to be added into the 
underlying Dispatcher code
+//
+// TODO: astitcher 20070908: EOF is dealt with separately now via a callback 
to eof
 void Connector::checkIdle(ssize_t status){
     if(timeoutHandler){
         AbsTime t = now();
@@ -166,33 +168,103 @@
     timeoutHandler = handler;
 }
 
-void Connector::run(){
-    try{
-       while(!closed){
-            ssize_t available = inbuf.available();
-            if(available < 1){
-                THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
-            }
-            ssize_t received = socket.recv(inbuf.start(), available);
-           checkIdle(received);
 
-           if(!closed && received > 0){
-               inbuf.move(received);
-               inbuf.flip();//position = 0, limit = total data read
-               
-               AMQFrame frame;
-               while(frame.decode(inbuf)){
-                    QPID_LOG(trace, "RECV: " << frame);
-                   input->received(frame);
+// Buffer definition
+struct Buff : public AsynchIO::BufferBase {
+    Buff() :
+        AsynchIO::BufferBase(new char[65536], 65536)
+    {}
+    ~Buff()
+    { delete [] bytes;}
+};
+
+void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
+    framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+
+       AMQFrame frame;
+       while(frame.decode(in)){
+           QPID_LOG(trace, "RECV: " << frame);
+               input->received(frame);
+       }
+       // TODO: unreading needs to go away, and when we can cope
+       // with multiple sub-buffers in the general buffer scheme, it will
+       if (in.available() != 0) {
+               // Adjust buffer for used bytes and then "unread them"
+               buff->dataStart += buff->dataCount-in.available();
+               buff->dataCount = in.available();
+               aio.unread(buff);
+       } else {
+               // Give whole buffer back to aio subsystem
+               aio.queueReadBuffer(buff);
+       }
+}
+
+void Connector::writebuff(AsynchIO& aio) {
+    Mutex::ScopedLock l(writeLock);
+    
+    if (writeFrameQueue.empty()) {
+       return;
+    }
+
+       do {
+               // Try and get a queued buffer if not then construct new one
+               AsynchIO::BufferBase* buff = aio.getQueuedBuffer();
+               if (!buff)
+                       buff = new Buff;
+               framing::Buffer out(buff->bytes, buff->byteCount);
+               int buffUsed = 0;
+       
+               framing::AMQFrame frame = writeFrameQueue.front();
+               int frameSize = frame.size();
+               while (frameSize <= int(out.available())) {
+       
+                       // Encode output frame  
+                       frame.encode(out);
+                       buffUsed += frameSize;
+                       
+                       writeFrameQueue.pop();
+                       if (writeFrameQueue.empty())
+                               break;
+                       frame = writeFrameQueue.front();
+                       frameSize = frame.size();
                }
-                //need to compact buffer to preserve any 'extra' data
-                inbuf.compact();
+       
+               buff->dataCount = buffUsed;
+               aio.queueWrite(buff);
+       } while (!writeFrameQueue.empty());
+}
+
+void Connector::writeDataBlock(const AMQDataBlock& data) {
+    AsynchIO::BufferBase* buff = new Buff;
+    framing::Buffer out(buff->bytes, buff->byteCount);
+    data.encode(out);
+    buff->dataCount = data.size();
+    aio->queueWrite(buff);
+}
+
+void Connector::eof(AsynchIO&) {
+       handleClosed();
+}
+
+// TODO: astitcher 20070908 This version of the code can never time out, so 
the idle processing
+// will never be called
+void Connector::run(){
+       try {
+           Dispatcher d(poller);
+       
+           for (int i = 0; i < 32; i++) {
+               aio->queueReadBuffer(new Buff);
            }
-       }
-    } catch (const std::exception& e) {
+       
+           aio->start(poller);
+           d.run();
+        aio->queueForDeletion();
+        socket.close();
+       } catch (const std::exception& e) {
         QPID_LOG(error, e.what());
         handleClosed();
     }
 }
+
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Fri Aug 31 
11:20:29 2007
@@ -34,9 +34,12 @@
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Socket.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/AsynchIO.h"
 
-namespace qpid {
+#include <queue>
 
+namespace qpid {
+       
 namespace client {
 
 class Connector : public framing::OutputHandler, 
@@ -61,24 +64,29 @@
     framing::InputHandler* input;
     framing::InitiationHandler* initialiser;
     framing::OutputHandler* output;
-       
-    framing::Buffer inbuf;
-    framing::Buffer outbuf;
 
     sys::Mutex writeLock;
+    std::queue<framing::AMQFrame> writeFrameQueue;
+    
     sys::Thread receiver;
 
     sys::Socket socket;
 
+    sys::AsynchIO* aio;
+    sys::Poller::shared_ptr poller;
+
     void checkIdle(ssize_t status);
-    void writeBlock(framing::AMQDataBlock* data);
-    void writeToSocket(char* data, size_t available);
     void setSocketTimeout();
 
     void run();
     void handleClosed();
     bool closeInternal();
-
+    
+    void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::BufferBase*);
+    void writebuff(qpid::sys::AsynchIO&);
+    void writeDataBlock(const framing::AMQDataBlock& data);
+    void eof(qpid::sys::AsynchIO&);
+    
   friend class Channel;
   public:
     Connector(framing::ProtocolVersion pVersion,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp Fri Aug 31 
11:20:29 2007
@@ -22,9 +22,9 @@
 #include "FramingContent.h" 
 #include "FieldTable.h" 
 
-qpid::framing::Buffer::Buffer(uint32_t _size) : size(_size), owner(true), 
position(0), limit(_size){
-    data = new char[size];
-}
+//qpid::framing::Buffer::Buffer(uint32_t _size) : size(_size), owner(true), 
position(0), limit(_size){
+//    data = new char[size];
+//}
 
 qpid::framing::Buffer::Buffer(char* _data, uint32_t _size) : size(_size), 
owner(false), data(_data), position(0), limit(_size){
 }
@@ -33,23 +33,23 @@
     if(owner) delete[] data;
 }
 
-void qpid::framing::Buffer::flip(){
-    limit = position;
-    position = 0;
-}
-
-void qpid::framing::Buffer::clear(){
-    limit = size;
-    position = 0;
-}
-
-void qpid::framing::Buffer::compact(){
-    uint32_t p = limit - position;
-    //copy p chars from position to 0
-    memmove(data, data + position, p);
-    limit = size;
-    position = p;
-}
+//void qpid::framing::Buffer::flip(){
+//    limit = position;
+//    position = 0;
+//}
+
+//void qpid::framing::Buffer::clear(){
+//    limit = size;
+//    position = 0;
+//}
+
+//void qpid::framing::Buffer::compact(){
+//    uint32_t p = limit - position;
+//    //copy p chars from position to 0
+//    memmove(data, data + position, p);
+//    limit = size;
+//    position = p;
+//}
 
 void qpid::framing::Buffer::record(){
     r_position = position;
@@ -65,13 +65,13 @@
     return limit - position;
 }
 
-char* qpid::framing::Buffer::start(){
-    return data + position;
-}
-
-void qpid::framing::Buffer::move(uint32_t bytes){
-    position += bytes;
-}
+//char* qpid::framing::Buffer::start(){
+//    return data + position;
+//}
+
+//void qpid::framing::Buffer::move(uint32_t bytes){
+//    position += bytes;
+//}
     
 void qpid::framing::Buffer::putOctet(uint8_t i){
     data[position++] = i;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h Fri Aug 31 11:20:29 
2007
@@ -41,18 +41,18 @@
 
 public:
 
-    Buffer(uint32_t size);
+    //Buffer(uint32_t size);
     Buffer(char* data, uint32_t size);
     ~Buffer();
 
-    void flip();
-    void clear();
-    void compact();
+    //void flip();
+    //void clear();
+    //void compact();
     void record();
     void restore();
     uint32_t available();
-    char* start();
-    void move(uint32_t bytes);
+    //char* start();
+    //void move(uint32_t bytes);
     
     void putOctet(uint8_t i);
     void putShort(uint16_t i);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h Fri Aug 31 
11:20:29 2007
@@ -24,6 +24,8 @@
 #include "qpid/Exception.h"
 #include "Buffer.h"
 
+#include <stdlib.h> // For alloca
+
 namespace qpid {
 namespace framing {
 
@@ -33,20 +35,24 @@
 
     template <class T> void encode(const T t, std::string& data) {
         uint32_t size = t.size() + 2/*type*/;
-        Buffer buffer(size);
-        buffer.putShort(T::TYPE);
-        t.encode(buffer);
-        buffer.flip();
-        buffer.getRawData(data, size);        
+        char* bytes = static_cast<char*>(::alloca(size));
+        Buffer wbuffer(bytes, size);
+        wbuffer.putShort(T::TYPE);
+        t.encode(wbuffer);
+        
+        Buffer rbuffer(bytes, size);
+        rbuffer.getRawData(data, size);        
     }
 
     template <class T> void decode(T t, std::string& data) {
-        Buffer buffer(data.length());
-        buffer.putRawData(data);        
-        buffer.flip();
-        uint16_t type = buffer.getShort();
+        char* bytes = static_cast<char*>(::alloca(data.length()));
+        Buffer wbuffer(bytes, data.length());
+        wbuffer.putRawData(data);        
+
+        Buffer rbuffer(bytes, data.length());
+        uint16_t type = rbuffer.getShort();
         if (type == T::TYPE) {
-            t.decode(buffer);
+            t.decode(rbuffer);
         } else {
             throw Exception("Type code does not match");
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Fri Aug 31 11:20:29 
2007
@@ -63,26 +63,24 @@
  */
 class AsynchIO : private DispatchHandle {
 public:
-    struct Buffer {
-        typedef boost::function1<void, const Buffer&> RecycleStorage;
-        
+    struct BufferBase {
         char* const bytes;
         const int32_t byteCount;
         int32_t dataStart;
         int32_t dataCount;
         
-        Buffer(char* const b, const int32_t s) :
+        BufferBase(char* const b, const int32_t s) :
             bytes(b),
             byteCount(s),
             dataStart(0),
             dataCount(0)
         {}
         
-        virtual ~Buffer()
+        virtual ~BufferBase()
         {}
     };
 
-    typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
+    typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback;
     typedef boost::function1<void, AsynchIO&> EofCallback;
     typedef boost::function1<void, AsynchIO&> DisconnectCallback;
     typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
@@ -96,8 +94,8 @@
     ClosedCallback closedCallback;
     BuffersEmptyCallback emptyCallback;
     IdleCallback idleCallback;
-    std::deque<Buffer*> bufferQueue;
-    std::deque<Buffer*> writeQueue;
+    std::deque<BufferBase*> bufferQueue;
+    std::deque<BufferBase*> writeQueue;
     bool queuedClose;
 
 public:
@@ -107,11 +105,11 @@
     void queueForDeletion();
 
     void start(Poller::shared_ptr poller);
-    void queueReadBuffer(Buffer* buff);
-    void queueWrite(Buffer* buff = 0);
-    void unread(Buffer* buff);
+    void queueReadBuffer(BufferBase* buff);
+    void queueWrite(BufferBase* buff = 0);
+    void unread(BufferBase* buff);
     void queueWriteClose();
-    Buffer* getQueuedBuffer();
+    BufferBase* getQueuedBuffer();
     const Socket& getSocket() const { return DispatchHandle::getSocket(); }
 
 private:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Fri Aug 31 
11:20:29 2007
@@ -74,9 +74,9 @@
 {}
 
 // Buffer definition
-struct Buff : public AsynchIO::Buffer {
+struct Buff : public AsynchIO::BufferBase {
     Buff() :
-        AsynchIO::Buffer(new char[65536], 65536)
+        AsynchIO::BufferBase(new char[65536], 65536)
     {}
     ~Buff()
     { delete [] bytes;}
@@ -113,7 +113,7 @@
        void close();
 
        // Input side
-       void readbuff(AsynchIO& aio, AsynchIO::Buffer* buff);
+       void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
        void eof(AsynchIO& aio);
        void disconnect(AsynchIO& aio);
        
@@ -200,7 +200,7 @@
 }
 
 // Input side
-void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::Buffer* buff) {
+void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
        framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
     if(initiated){
         framing::AMQFrame frame;
@@ -264,7 +264,7 @@
        
        do {
                // Try and get a queued buffer if not then construct new one
-               AsynchIO::Buffer* buff = aio->getQueuedBuffer();
+               AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
                if (!buff)
                        buff = new Buff;
                framing::Buffer out(buff->bytes, buff->byteCount);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Fri Aug 31 
11:20:29 2007
@@ -121,7 +121,7 @@
     DispatchHandle::startWatch(poller);
 }
 
-void AsynchIO::queueReadBuffer(Buffer* buff) {
+void AsynchIO::queueReadBuffer(BufferBase* buff) {
        assert(buff);
     buff->dataStart = 0;
     buff->dataCount = 0;
@@ -129,7 +129,7 @@
     DispatchHandle::rewatchRead();
 }
 
-void AsynchIO::unread(Buffer* buff) {
+void AsynchIO::unread(BufferBase* buff) {
        assert(buff);
        if (buff->dataStart != 0) {
                memmove(buff->bytes, buff->bytes+buff->dataStart, 
buff->dataCount);
@@ -141,7 +141,7 @@
 
 // Either queue for writing or announce that there is something to write
 // and we should ask for it
-void AsynchIO::queueWrite(Buffer* buff) {
+void AsynchIO::queueWrite(BufferBase* buff) {
        // If no buffer then don't queue anything
        // (but still wake up for writing) 
        if (buff) {
@@ -163,11 +163,11 @@
 /** Return a queued buffer if there are enough
  * to spare
  */
-AsynchIO::Buffer* AsynchIO::getQueuedBuffer() {
+AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
        // Always keep at least one buffer (it might have data that was 
"unread" in it)
        if (bufferQueue.size()<=1)
                return 0;
-       Buffer* buff = bufferQueue.back();
+       BufferBase* buff = bufferQueue.back();
        buff->dataStart = 0;
        buff->dataCount = 0;
        bufferQueue.pop_back();
@@ -183,7 +183,7 @@
         // (Try to) get a buffer
         if (!bufferQueue.empty()) {
             // Read into buffer
-            Buffer* buff = bufferQueue.front();
+            BufferBase* buff = bufferQueue.front();
             bufferQueue.pop_front();
             errno = 0;
             int readCount = buff->byteCount-buff->dataCount;
@@ -239,7 +239,7 @@
         // See if we've got something to write
         if (!writeQueue.empty()) {
             // Write buffer
-            Buffer* buff = writeQueue.back();
+            BufferBase* buff = writeQueue.back();
             writeQueue.pop_back();
             errno = 0;
             assert(buff->dataStart+buff->dataCount <= buff->byteCount);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FieldTableTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FieldTableTest.cpp?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FieldTableTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FieldTableTest.cpp Fri Aug 31 
11:20:29 2007
@@ -39,11 +39,13 @@
         ft.setString("A", "BCDE");
         CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft.getString("A"));
 
-        Buffer buffer(100);
-        buffer.putFieldTable(ft);
-        buffer.flip();     
+        char buff[100];
+        Buffer wbuffer(buff, 100);
+        wbuffer.putFieldTable(ft);
+
+        Buffer rbuffer(buff, 100);
         FieldTable ft2;
-        buffer.getFieldTable(ft2);
+        rbuffer.getFieldTable(ft2);
         CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft2.getString("A"));
 
     }
@@ -68,10 +70,12 @@
             FieldTable c;
             c = a;
             
-            Buffer buffer(c.size());
-            buffer.putFieldTable(c);
-            buffer.flip();     
-            buffer.getFieldTable(d);
+            char* buff = static_cast<char*>(::alloca(c.size()));
+            Buffer wbuffer(buff, c.size());
+            wbuffer.putFieldTable(c);
+
+            Buffer rbuffer(buff, c.size());
+            rbuffer.getFieldTable(d);
             CPPUNIT_ASSERT_EQUAL(c, d);
             CPPUNIT_ASSERT_EQUAL(std::string("CCCC"), c.getString("A"));
             CPPUNIT_ASSERT_EQUAL(1234, c.getInt("B"));

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp Fri Aug 31 11:20:29 
2007
@@ -68,114 +68,130 @@
     CPPUNIT_TEST_SUITE_END();
 
   private:
-    Buffer buffer;
+    char buffer[1024];
     ProtocolVersion version;
     
   public:
 
-    FramingTest() : buffer(1024), version(highestProtocolVersion) {}
+    FramingTest() : version(highestProtocolVersion) {}
 
     void testBasicQosBody() 
     {
+        Buffer wbuff(buffer, sizeof(buffer));
         BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true);
-        in.encode(buffer);
-        buffer.flip(); 
+        in.encode(wbuff);
+
+        Buffer rbuff(buffer, sizeof(buffer));
         BasicQosBody out(version);
-        out.decode(buffer);
+        out.decode(rbuff);
         CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
     }
     
     void testConnectionSecureBody() 
     {
+        Buffer wbuff(buffer, sizeof(buffer));
         std::string s = "security credential";
         ConnectionSecureBody in(version, s);
-        in.encode(buffer);
-        buffer.flip(); 
+        in.encode(wbuff);
+
+        Buffer rbuff(buffer, sizeof(buffer));
         ConnectionSecureBody out(version);
-        out.decode(buffer);
+        out.decode(rbuff);
         CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
     }
 
     void testConnectionRedirectBody()
     {
+        Buffer wbuff(buffer, sizeof(buffer));
         std::string a = "hostA";
         std::string b = "hostB";
         ConnectionRedirectBody in(version, a, b);
-        in.encode(buffer);
-        buffer.flip(); 
+        in.encode(wbuff);
+        
+        Buffer rbuff(buffer, sizeof(buffer));
         ConnectionRedirectBody out(version);
-        out.decode(buffer);
+        out.decode(rbuff);
         CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
     }
 
     void testAccessRequestBody()
     {
+        Buffer wbuff(buffer, sizeof(buffer));
         std::string s = "text";
         AccessRequestBody in(version, s, true, false, true, false, true);
-        in.encode(buffer);
-        buffer.flip(); 
+        in.encode(wbuff);
+
+        Buffer rbuff(buffer, sizeof(buffer));
         AccessRequestBody out(version);
-        out.decode(buffer);
+        out.decode(rbuff);
         CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
     }
 
     void testBasicConsumeBody()
     {
+        Buffer wbuff(buffer, sizeof(buffer));
         std::string q = "queue";
         std::string t = "tag";
         BasicConsumeBody in(version, 0, q, t, false, true, false, false,
                             FieldTable());
-        in.encode(buffer);
-        buffer.flip(); 
+        in.encode(wbuff);
+
+        Buffer rbuff(buffer, sizeof(buffer));
         BasicConsumeBody out(version);
-        out.decode(buffer);
+        out.decode(rbuff);
         CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
     }
     
 
     void testConnectionRedirectBodyFrame()
     {
+        Buffer wbuff(buffer, sizeof(buffer));
         std::string a = "hostA";
         std::string b = "hostB";
         AMQFrame in(999, ConnectionRedirectBody(version, a, b));
-        in.encode(buffer);
-        buffer.flip(); 
+        in.encode(wbuff);
+
+        Buffer rbuff(buffer, sizeof(buffer));
         AMQFrame out;
-        out.decode(buffer);
+        out.decode(rbuff);
         CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
     }
 
     void testBasicConsumeOkBodyFrame()
     {
+        Buffer wbuff(buffer, sizeof(buffer));
         std::string s = "hostA";
         AMQFrame in(999, BasicConsumeOkBody(version, s));
-        in.encode(buffer);
-        buffer.flip(); 
+        in.encode(wbuff);
+
+        Buffer rbuff(buffer, sizeof(buffer));
         AMQFrame out;
-        for(int i = 0; i < 5; i++){
-            out.decode(buffer);
-            CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
-        }
+        out.decode(rbuff);
+        CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
     }
 
     void testInlineContent() {        
+        Buffer wbuff(buffer, sizeof(buffer));
         Content content(INLINE, "MyData");
         CPPUNIT_ASSERT(content.isInline());
-        content.encode(buffer);
-        buffer.flip();
+        content.encode(wbuff);
+
+        Buffer rbuff(buffer, sizeof(buffer));
         Content recovered;
-        recovered.decode(buffer);
+        recovered.decode(rbuff);
         CPPUNIT_ASSERT(recovered.isInline());
         CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue());
     }
 
     void testContentReference() {        
+        Buffer wbuff(buffer, sizeof(buffer));
         Content content(REFERENCE, "MyRef");
         CPPUNIT_ASSERT(content.isReference());
-        content.encode(buffer);
-        buffer.flip();
+        content.encode(wbuff);
+
+        Buffer rbuff(buffer, sizeof(buffer));
         Content recovered;
-        recovered.decode(buffer);
+        recovered.decode(rbuff);
         CPPUNIT_ASSERT(recovered.isReference());
         CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue());
     }
@@ -198,11 +214,13 @@
         }
         
         try {
-            buffer.putOctet(2);
-            buffer.putLongString("blah, blah");
-            buffer.flip();
+            Buffer wbuff(buffer, sizeof(buffer));
+            wbuff.putOctet(2);
+            wbuff.putLongString("blah, blah");
+            
+            Buffer rbuff(buffer, sizeof(buffer));
             Content content;
-            content.decode(buffer);
+            content.decode(rbuff);
             CPPUNIT_ASSERT(false);//fail, expected exception
         } catch (QpidError& e) {
             CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp Fri Aug 31 11:20:29 
2007
@@ -38,12 +38,13 @@
     {
         AMQHeaderBody body;
         body.get<BasicHeaderProperties>(true)->getHeaders().setString("A", 
"BCDE");
-        Buffer buffer(100);
+        char buff[100];
+        Buffer wbuffer(buff, 100);
+        body.encode(wbuffer);
 
-        body.encode(buffer);
-        buffer.flip();     
+        Buffer rbuffer(buff, 100);
         AMQHeaderBody body2;
-        body2.decode(buffer, body.size());
+        body2.decode(rbuffer, body.size());
         BasicHeaderProperties* props =
             body2.get<BasicHeaderProperties>(true);
         CPPUNIT_ASSERT_EQUAL(std::string("BCDE"),
@@ -84,11 +85,13 @@
         properties->setClusterId(clusterId);
         properties->setContentLength(contentLength);
 
-        Buffer buffer(10000);
-        out.encode(buffer);
-        buffer.flip();     
+        char buff[10000];
+        Buffer wbuffer(buff, 10000);
+        out.encode(wbuffer);
+
+        Buffer rbuffer(buff, 10000);
         AMQFrame in;
-        in.decode(buffer);
+        in.decode(rbuffer);
         properties = 
in.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true);
 
         CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());
@@ -123,11 +126,13 @@
         properties->setExpiration(expiration);
         properties->setTimestamp(timestamp);
 
-        Buffer buffer(100);
-        body.encode(buffer);
-        buffer.flip();     
+        char buff[100];
+        Buffer wbuffer(buff, 100);
+        body.encode(wbuffer);
+
+        Buffer rbuffer(buff, 100);
         AMQHeaderBody temp;
-        temp.decode(buffer, body.size());
+        temp.decode(rbuffer, body.size());
         properties = temp.get<BasicHeaderProperties>(true);
 
         CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Aug 31 11:20:29 2007
@@ -84,7 +84,6 @@
   DtxWorkRecordTest     \
   ExchangeTest         \
   HeadersExchangeTest  \
-  MessageBuilderTest   \
   MessageTest          \
   QueueRegistryTest    \
   QueueTest            \
@@ -96,6 +95,7 @@
   TxPublishTest                \
   ValueTest            \
   MessageHandlerTest
+#  MessageBuilderTest
 
 #client_unit_tests =   \
   ClientChannelTest

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp Fri Aug 31 11:20:29 
2007
@@ -68,14 +68,14 @@
         dProps->setDeliveryMode(PERSISTENT);
         CPPUNIT_ASSERT(msg->isPersistent());
 
-
-        Buffer buffer(msg->encodedSize());
-        msg->encode(buffer);
-        buffer.flip();        
+        char* buff = static_cast<char*>(::alloca(msg->encodedSize()));
+        Buffer wbuffer(buff, msg->encodedSize());
+        msg->encode(wbuffer);
+        
+        Buffer rbuffer(buff, msg->encodedSize());
         msg.reset(new Message());
-        msg->decodeHeader(buffer);
-        msg->decodeContent(buffer);
-
+        msg->decodeHeader(rbuffer);
+        msg->decodeContent(rbuffer);
         CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchangeName());
         CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
         CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), 
msg->contentSize());

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp?rev=571529&r1=571528&r2=571529&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp Fri Aug 31 11:20:29 2007
@@ -62,12 +62,14 @@
 }
 
 BOOST_AUTO_TEST_CASE(testUuidEncodeDecode) {
-    Buffer buf(Uuid::size());
+    char* buff = static_cast<char*>(::alloca(Uuid::size()));
+    Buffer wbuf(buff, Uuid::size());
     Uuid uuid(sample.c_array());
-    uuid.encode(buf);
-    buf.flip();
+    uuid.encode(wbuf);
+
+    Buffer rbuf(buff, Uuid::size());
     Uuid decoded;
-    decoded.decode(buf);
+    decoded.decode(rbuf);
     BOOST_CHECK_EQUAL(string(sample.begin(), sample.end()),
                       string(decoded.begin(), decoded.end()));
 }


Reply via email to