Author: aconway
Date: Thu Jan 10 14:50:23 2008
New Revision: 610972

URL: http://svn.apache.org/viewvc?rev=610972&view=rev
Log:

Client always collects at least an entire frameset into a single buffer
when possible. Based on patch from Gordon Sim.

 - Refactor Connector::writebuff, ::send as Connector::Writer
 - Collect frames up to EOF notifying AIO write.
 - Encode all available complete framesets into buffers as compactly as 
possible.
 - Logging buffer size and frames encoded per write for client and broker.
 - framing::Buffer added getPosition(), getSize(), default ctor, copy ctor.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=610972&r1=610971&r2=610972&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Thu Jan 10 
14:50:23 2008
@@ -48,8 +48,7 @@
     timeoutHandler(0),
     shutdownHandler(0),
     aio(0)
-{
-}
+{}
 
 Connector::~Connector() {
     close();
@@ -62,12 +61,13 @@
     closed = false;
     poller = Poller::shared_ptr(new Poller);
     aio = new AsynchIO(socket,
-        boost::bind(&Connector::readbuff, this, _1, _2),
-        boost::bind(&Connector::eof, this, _1),
-        boost::bind(&Connector::eof, this, _1),
-        0, // closed
-        0, // nobuffs
-        boost::bind(&Connector::writebuff, this, _1));
+                       boost::bind(&Connector::readbuff, this, _1, _2),
+                       boost::bind(&Connector::eof, this, _1),
+                       boost::bind(&Connector::eof, this, _1),
+                       0, // closed
+                       0, // nobuffs
+                       boost::bind(&Connector::writebuff, this, _1));
+    writer.setAio(aio);
 }
 
 void Connector::init(){
@@ -103,12 +103,8 @@
     return this; 
 }
 
-void Connector::send(AMQFrame& frame){
-    Mutex::ScopedLock l(writeLock);
-    writeFrameQueue.push(frame);
-    aio->notifyPendingWrite();
-
-    QPID_LOG(trace, "SENT [" << this << "]: " << frame);
+void Connector::send(AMQFrame& frame) {
+    writer.handle(frame);
 }
 
 void Connector::handleClosed() {
@@ -165,70 +161,89 @@
     timeoutHandler = handler;
 }
 
-
-// Buffer definition
-struct Buff : public AsynchIO::BufferBase {
-    Buff() :
-        AsynchIO::BufferBase(new char[65536], 65536)
-    {}
-    ~Buff()
-    { delete [] bytes;}
+struct Connector::Buff : public AsynchIO::BufferBase {
+    Buff() : AsynchIO::BufferBase(new char[65536], 65536) {}    
+    ~Buff() { delete [] bytes;}
 };
 
+Connector::Writer::Writer() : aio(0), buffer(0), lastEof(frames.begin()) {}
+
+Connector::Writer::~Writer() { delete buffer; }
+
+void Connector::Writer::setAio(sys::AsynchIO* a) {
+    Mutex::ScopedLock l(lock);
+    aio = a;
+    newBuffer(l);
+}
+
+void Connector::Writer::handle(framing::AMQFrame& frame) { 
+    Mutex::ScopedLock l(lock);
+    frames.push_back(frame);
+    if (frame.getEof()) {
+        lastEof = frames.end();
+        aio->notifyPendingWrite();
+    }
+    QPID_LOG(trace, "SENT [" << this << "]: " << frame);
+}
+
+void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
+    assert(buffer);
+    QPID_LOG(trace, "Write buffer " << encode.getPosition()
+             << " bytes " << framesEncoded << " frames ");    
+    framesEncoded = 0;
+
+    buffer->dataStart = 0;
+    buffer->dataCount = encode.getPosition();
+    aio->queueWrite(buffer);
+    newBuffer(l);
+}
+
+void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
+    buffer = aio->getQueuedBuffer();
+    if (!buffer) buffer = new Buff();
+    encode = framing::Buffer(buffer->bytes, buffer->byteCount);
+    framesEncoded = 0;
+}
+
+// Called in IO thread.
+void Connector::Writer::write(sys::AsynchIO& aio_) {
+    Mutex::ScopedLock l(lock);
+    assert(&aio_ == aio);
+    assert(buffer);
+    for (Frames::iterator i = frames.begin(); i != lastEof; ++i) {
+        if (i->size() > encode.available()) writeOne(l);
+        assert(i->size() <= encode.available());
+        i->encode(encode);
+        ++framesEncoded;
+    }
+    frames.erase(frames.begin(), lastEof);
+    lastEof = frames.begin();
+    if (encode.getPosition() > 0) writeOne(l);
+}
+
 void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
     framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
 
-       AMQFrame frame;
-       while(frame.decode(in)){
-           QPID_LOG(trace, "RECV [" << this << "]: " << frame);
-            input->received(frame);
-       }
-       // TODO: unreading needs to go away, and when we can cope
-       // with multiple sub-buffers in the general buffer scheme, it will
-       if (in.available() != 0) {
-               // Adjust buffer for used bytes and then "unread them"
-               buff->dataStart += buff->dataCount-in.available();
-               buff->dataCount = in.available();
-               aio.unread(buff);
-       } else {
-               // Give whole buffer back to aio subsystem
-               aio.queueReadBuffer(buff);
-       }
-}
-
-void Connector::writebuff(AsynchIO& aio) {
-    Mutex::ScopedLock l(writeLock);
-    
-    if (writeFrameQueue.empty()) {
-       return;
-    }
-
-       do {
-               // Try and get a queued buffer if not then construct new one
-               AsynchIO::BufferBase* buff = aio.getQueuedBuffer();
-               if (!buff)
-                       buff = new Buff;
-               framing::Buffer out(buff->bytes, buff->byteCount);
-               int buffUsed = 0;
-       
-               framing::AMQFrame frame = writeFrameQueue.front();
-               int frameSize = frame.size();
-               while (frameSize <= int(out.available())) {
-       
-                       // Encode output frame  
-                       frame.encode(out);
-                       buffUsed += frameSize;
-                       
-                       writeFrameQueue.pop();
-                       if (writeFrameQueue.empty())
-                               break;
-                       frame = writeFrameQueue.front();
-                       frameSize = frame.size();
-               }
-       
-               buff->dataCount = buffUsed;
-               aio.queueWrite(buff);
-       } while (!writeFrameQueue.empty());
+    AMQFrame frame;
+    while(frame.decode(in)){
+        QPID_LOG(trace, "RECV [" << this << "]: " << frame);
+        input->received(frame);
+    }
+    // TODO: unreading needs to go away, and when we can cope
+    // with multiple sub-buffers in the general buffer scheme, it will
+    if (in.available() != 0) {
+        // Adjust buffer for used bytes and then "unread them"
+        buff->dataStart += buff->dataCount-in.available();
+        buff->dataCount = in.available();
+        aio.unread(buff);
+    } else {
+        // Give whole buffer back to aio subsystem
+        aio.queueReadBuffer(buff);
+    }
+}
+
+void Connector::writebuff(AsynchIO& aio_) {
+    writer.write(aio_);
 }
 
 void Connector::writeDataBlock(const AMQDataBlock& data) {
@@ -240,24 +255,24 @@
 }
 
 void Connector::eof(AsynchIO&) {
-       handleClosed();
+    handleClosed();
 }
 
 // TODO: astitcher 20070908 This version of the code can never time out, so 
the idle processing
 // will never be called
 void Connector::run(){
-       try {
-           Dispatcher d(poller);
+    try {
+        Dispatcher d(poller);
        
-           for (int i = 0; i < 32; i++) {
-               aio->queueReadBuffer(new Buff);
-           }
+        for (int i = 0; i < 32; i++) {
+            aio->queueReadBuffer(new Buff);
+        }
        
-           aio->start(poller);
-           d.run();
+        aio->start(poller);
+        d.run();
         aio->queueForDeletion();
         socket.close();
-       } catch (const std::exception& e) {
+    } catch (const std::exception& e) {
         QPID_LOG(error, e.what());
         handleClosed();
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=610972&r1=610971&r2=610972&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Thu Jan 10 
14:50:23 2008
@@ -45,6 +45,33 @@
 class Connector : public framing::OutputHandler, 
                   private sys::Runnable
 {
+    struct Buff;
+
+    /** Batch up frames for writing to aio. */
+    class Writer : public framing::FrameHandler {
+        typedef sys::AsynchIO::BufferBase BufferBase;
+        typedef std::vector<framing::AMQFrame> Frames;
+
+        sys::Mutex lock;
+        sys::AsynchIO* aio;
+        BufferBase* buffer;
+        Frames frames;
+        Frames::iterator lastEof; // Points after last EOF in frames
+        framing::Buffer encode;
+        size_t framesEncoded;
+        
+        void writeOne(const sys::Mutex::ScopedLock&);
+        void newBuffer(const sys::Mutex::ScopedLock&);
+
+      public:
+        
+        Writer();
+        ~Writer();
+        void setAio(sys::AsynchIO*);
+        void handle(framing::AMQFrame&);
+        void write(sys::AsynchIO&);
+    };
+    
     const bool debug;
     const int receive_buffer_size;
     const int send_buffer_size;
@@ -65,8 +92,7 @@
     framing::InitiationHandler* initialiser;
     framing::OutputHandler* output;
 
-    sys::Mutex writeLock;
-    std::queue<framing::AMQFrame> writeFrameQueue;
+    Writer writer;
     
     sys::Thread receiver;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp?rev=610972&r1=610971&r2=610972&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp Thu Jan 10 
14:50:23 2008
@@ -47,10 +47,6 @@
     position = 0;
 }
 
-uint32_t Buffer::available(){
-    return size - position;
-}
-
 ///////////////////////////////////////////////////
 
 void Buffer::putOctet(uint8_t i){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h?rev=610972&r1=610971&r2=610972&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h Thu Jan 10 14:50:23 
2008
@@ -34,7 +34,7 @@
 
 class Buffer
 {
-    const uint32_t size;
+    uint32_t size;
     char* data;
     uint32_t position;
     uint32_t r_position;
@@ -43,13 +43,16 @@
 
 public:
 
-    Buffer(char* data, uint32_t size);
+    Buffer(char* data=0, uint32_t size=0);
 
     void record();
     void restore(bool reRecord = false);
     void reset();
-    uint32_t available();
-    
+
+    uint32_t available() { return size - position; }
+    uint32_t getSize() { return size; }
+    uint32_t getPosition() { return position; }
+        
     void putOctet(uint8_t i);
     void putShort(uint16_t i);
     void putLong(uint32_t i);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=610972&r1=610971&r2=610972&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Thu Jan 10 
14:50:23 2008
@@ -44,12 +44,12 @@
 namespace sys {
 
 class AsynchIOAcceptor : public Acceptor {
-       Poller::shared_ptr poller;
-       Socket listener;
-       int numIOThreads;
-       const uint16_t listeningPort;
+    Poller::shared_ptr poller;
+    Socket listener;
+    int numIOThreads;
+    const uint16_t listeningPort;
 
-public:
+  public:
     AsynchIOAcceptor(int16_t port, int backlog, int threads);
     ~AsynchIOAcceptor() {}
     void run(ConnectionInputHandlerFactory* factory);
@@ -58,7 +58,7 @@
     uint16_t getPort() const;
     std::string getHost() const;
 
-private:
+  private:
     void accepted(Poller::shared_ptr, const Socket&, 
ConnectionInputHandlerFactory*);
 };
 
@@ -69,9 +69,9 @@
 }
 
 AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
-       poller(new Poller),
-       numIOThreads(threads),
-       listeningPort(listener.listen(port, backlog))
+    poller(new Poller),
+    numIOThreads(threads),
+    listeningPort(listener.listen(port, backlog))
 {}
 
 // Buffer definition
@@ -93,53 +93,53 @@
     bool readError;
     std::string identifier;
 
-public:
-       AsynchIOHandler() :
-               inputHandler(0),
-               frameQueueClosed(false),
-               initiated(false),
-                readError(false)
-       {}
+  public:
+    AsynchIOHandler() :
+        inputHandler(0),
+        frameQueueClosed(false),
+        initiated(false),
+        readError(false)
+    {}
        
-       ~AsynchIOHandler() {
-               if (inputHandler)
-                       inputHandler->closed();
-               delete inputHandler;
-       }
-
-       void init(AsynchIO* a, ConnectionInputHandler* h) {
-               aio = a;
-               inputHandler = h;
-       }
-
-       // Output side
-       void send(framing::AMQFrame&);
-       void close();
-        void activateOutput();
-
-       // Input side
-       void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
-       void eof(AsynchIO& aio);
-       void disconnect(AsynchIO& aio);
+    ~AsynchIOHandler() {
+        if (inputHandler)
+            inputHandler->closed();
+        delete inputHandler;
+    }
+
+    void init(AsynchIO* a, ConnectionInputHandler* h) {
+        aio = a;
+        inputHandler = h;
+    }
+
+    // Output side
+    void send(framing::AMQFrame&);
+    void close();
+    void activateOutput();
+
+    // Input side
+    void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
+    void eof(AsynchIO& aio);
+    void disconnect(AsynchIO& aio);
        
-       // Notifications
-       void nobuffs(AsynchIO& aio);
-       void idle(AsynchIO& aio);
-       void closedSocket(AsynchIO& aio, const Socket& s);
+    // Notifications
+    void nobuffs(AsynchIO& aio);
+    void idle(AsynchIO& aio);
+    void closedSocket(AsynchIO& aio, const Socket& s);
 };
 
 void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, 
ConnectionInputHandlerFactory* f) {
 
-       AsynchIOHandler* async = new AsynchIOHandler; 
-       ConnectionInputHandler* handler = f->create(async, s);
+    AsynchIOHandler* async = new AsynchIOHandler; 
+    ConnectionInputHandler* handler = f->create(async, s);
     AsynchIO* aio = new AsynchIO(s,
-       boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
-       boost::bind(&AsynchIOHandler::eof, async, _1),
-       boost::bind(&AsynchIOHandler::disconnect, async, _1),
-        boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
-       boost::bind(&AsynchIOHandler::nobuffs, async, _1),
-       boost::bind(&AsynchIOHandler::idle, async, _1));
-       async->init(aio, handler);
+                                 boost::bind(&AsynchIOHandler::readbuff, 
async, _1, _2),
+                                 boost::bind(&AsynchIOHandler::eof, async, _1),
+                                 boost::bind(&AsynchIOHandler::disconnect, 
async, _1),
+                                 boost::bind(&AsynchIOHandler::closedSocket, 
async, _1, _2),
+                                 boost::bind(&AsynchIOHandler::nobuffs, async, 
_1),
+                                 boost::bind(&AsynchIOHandler::idle, async, 
_1));
+    async->init(aio, handler);
 
     // Give connection some buffers to use
     for (int i = 0; i < 4; i++) {
@@ -158,50 +158,50 @@
 }
 
 void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
-       Dispatcher d(poller);
-       AsynchAcceptor
-               acceptor(listener,
-                       boost::bind(&AsynchIOAcceptor::accepted, this, poller, 
_1, fact));
-       acceptor.start(poller);
+    Dispatcher d(poller);
+    AsynchAcceptor
+        acceptor(listener,
+                 boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, 
fact));
+    acceptor.start(poller);
        
-       std::vector<Thread> t(numIOThreads-1);
+    std::vector<Thread> t(numIOThreads-1);
 
-       // Run n-1 io threads
-       for (int i=0; i<numIOThreads-1; ++i)
-               t[i] = Thread(d);
+    // Run n-1 io threads
+    for (int i=0; i<numIOThreads-1; ++i)
+        t[i] = Thread(d);
 
-       // Run final thread
-       d.run();
+    // Run final thread
+    d.run();
        
-       // Now wait for n-1 io threads to exit
-       for (int i=0; i<numIOThreads-1; ++i) {
-               t[i].join();
-       }
+    // Now wait for n-1 io threads to exit
+    for (int i=0; i<numIOThreads-1; ++i) {
+        t[i].join();
+    }
 }
 
 void AsynchIOAcceptor::shutdown() {
-       poller->shutdown();
+    poller->shutdown();
 }
 
 // Output side
 void AsynchIOHandler::send(framing::AMQFrame& frame) {
-       // TODO: Need to find out if we are in the callback context,
-       // in the callback thread if so we can go further than just queuing the 
frame
-       // to be handled later
-       {
+    // TODO: Need to find out if we are in the callback context,
+    // in the callback thread if so we can go further than just queuing the 
frame
+    // to be handled later
+    {
        ScopedLock<Mutex> l(frameQueueLock);
        // Ignore anything seen after closing
        if (!frameQueueClosed)
-               frameQueue.push(frame);
-       }
+            frameQueue.push(frame);
+    }
 
-       // Activate aio for writing here
-       aio->notifyPendingWrite();
+    // Activate aio for writing here
+    aio->notifyPendingWrite();
 }
 
 void AsynchIOHandler::close() {
-       ScopedLock<Mutex> l(frameQueueLock);
-       frameQueueClosed = true;
+    ScopedLock<Mutex> l(frameQueueLock);
+    frameQueueClosed = true;
 }
 
 void AsynchIOHandler::activateOutput() {
@@ -218,7 +218,7 @@
         framing::AMQFrame frame;
         try{
             while(frame.decode(in)) {
-                QPID_LOG(debug, "RECV [" << identifier << "]: " << frame);
+                QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
                 inputHandler->received(frame);
             }
         }catch(const std::exception& e){
@@ -249,9 +249,9 @@
 }
 
 void AsynchIOHandler::eof(AsynchIO&) {
-        QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
-       inputHandler->closed();
-       aio->queueWriteClose();
+    QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
+    inputHandler->closed();
+    aio->queueWriteClose();
 }
 
 void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
@@ -259,14 +259,14 @@
     if (!aio->writeQueueEmpty()) {
         QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data 
(probably due to client disconnect)");
     }
-       delete &s;
-       aio->queueForDeletion();
-       delete this;
+    delete &s;
+    aio->queueForDeletion();
+    delete this;
 }
 
 void AsynchIOHandler::disconnect(AsynchIO& a) {
-       // treat the same as eof
-       eof(a);
+    // treat the same as eof
+    eof(a);
 }
 
 // Notifications
@@ -274,50 +274,54 @@
 }
 
 void AsynchIOHandler::idle(AsynchIO&){
-       ScopedLock<Mutex> l(frameQueueLock);
+    ScopedLock<Mutex> l(frameQueueLock);
        
-       if (frameQueue.empty()) {
-            // At this point we know that we're write idling the connection
-            // so tell the input handler to queue any available output:
-            inputHandler->doOutput();
-            //if still no frames, theres nothing to do:
-            if (frameQueue.empty()) return;
-       }
+    if (frameQueue.empty()) {
+        // At this point we know that we're write idling the connection
+        // so tell the input handler to queue any available output:
+        inputHandler->doOutput();
+        //if still no frames, theres nothing to do:
+        if (frameQueue.empty()) return;
+    }
        
-       do {
-               // Try and get a queued buffer if not then construct new one
-               AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
-               if (!buff)
-                       buff = new Buff;
-               framing::Buffer out(buff->bytes, buff->byteCount);
-               int buffUsed = 0;
+    do {
+        // Try and get a queued buffer if not then construct new one
+        AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+        if (!buff)
+            buff = new Buff;
+        framing::Buffer out(buff->bytes, buff->byteCount);
+        int buffUsed = 0;
        
-               framing::AMQFrame frame = frameQueue.front();
-               int frameSize = frame.size();
-               while (frameSize <= int(out.available())) {
-                       frameQueue.pop();
+        framing::AMQFrame frame = frameQueue.front();
+        int frameSize = frame.size();
+        int framesEncoded=0;
+        while (frameSize <= int(out.available())) {
+            frameQueue.pop();
        
-                       // Encode output frame  
-                       frame.encode(out);
-                       buffUsed += frameSize;
-                       QPID_LOG(debug, "SENT [" << identifier << "]: " << 
frame);
+            // Encode output frame     
+            frame.encode(out);
+            ++framesEncoded;
+            buffUsed += frameSize;
+            QPID_LOG(trace, "SENT [" << identifier << "]: " << frame);
                        
-                       if (frameQueue.empty())
-                               break;
-                       frame = frameQueue.front();
-                       frameSize = frame.size();
-               }
-               // If frame was egregiously large complain
-               if (frameSize > buff->byteCount)
-                    throw framing::ContentTooLargeException(QPID_MSG("Could 
not write frame, too large for buffer."));
+            if (frameQueue.empty())
+                break;
+            frame = frameQueue.front();
+            frameSize = frame.size();
+        }
+        QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << 
framesEncoded << " frames ");
+
+        // If frame was egregiously large complain
+        if (frameSize > buff->byteCount)
+            throw framing::ContentTooLargeException(QPID_MSG("Could not write 
frame, too large for buffer."));
        
-               buff->dataCount = buffUsed;
-               aio->queueWrite(buff);
-       } while (!frameQueue.empty());
+        buff->dataCount = buffUsed;
+        aio->queueWrite(buff);
+    } while (!frameQueue.empty());
 
-       if (frameQueueClosed) {
-            aio->queueWriteClose();
-       }
+    if (frameQueueClosed) {
+        aio->queueWriteClose();
+    }
 }
 
 }} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=610972&r1=610971&r2=610972&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Thu Jan 10 
14:50:23 2008
@@ -42,7 +42,7 @@
  * pipe/socket (necessary as default action is to terminate process)
  */
 void ignoreSigpipe() {
-       ::signal(SIGPIPE, SIG_IGN);
+    ::signal(SIGPIPE, SIG_IGN);
 }
 
 /*
@@ -88,7 +88,7 @@
         if (s) {
             acceptedCallback(*s);
         } else {
-               break;
+            break;
         }
     } while (true);
 
@@ -99,13 +99,13 @@
  * Asynch reader/writer
  */
 AsynchIO::AsynchIO(const Socket& s,
-    ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
-    ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
+                   ReadCallback rCb, EofCallback eofCb, DisconnectCallback 
disCb,
+                   ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback 
iCb) :
 
     DispatchHandle(s, 
-        boost::bind(&AsynchIO::readable, this, _1),
-        boost::bind(&AsynchIO::writeable, this, _1),
-        boost::bind(&AsynchIO::disconnected, this, _1)),
+                   boost::bind(&AsynchIO::readable, this, _1),
+                   boost::bind(&AsynchIO::writeable, this, _1),
+                   boost::bind(&AsynchIO::disconnected, this, _1)),
     readCallback(rCb),
     eofCallback(eofCb),
     disCallback(disCb),
@@ -120,8 +120,8 @@
 
 struct deleter
 {
-  template <typename T>
-  void operator()(T *ptr){ delete ptr;}
+    template <typename T>
+    void operator()(T *ptr){ delete ptr;}
 };
 
 AsynchIO::~AsynchIO() {
@@ -138,7 +138,7 @@
 }
 
 void AsynchIO::queueReadBuffer(BufferBase* buff) {
-       assert(buff);
+    assert(buff);
     buff->dataStart = 0;
     buff->dataCount = 0;
     bufferQueue.push_back(buff);
@@ -146,11 +146,11 @@
 }
 
 void AsynchIO::unread(BufferBase* buff) {
-       assert(buff);
-       if (buff->dataStart != 0) {
-               memmove(buff->bytes, buff->bytes+buff->dataStart, 
buff->dataCount);
-               buff->dataStart = 0;
-       }
+    assert(buff);
+    if (buff->dataStart != 0) {
+        memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
+        buff->dataStart = 0;
+    }
     bufferQueue.push_front(buff);
     DispatchHandle::rewatchRead();
 }
@@ -182,14 +182,15 @@
  * to spare
  */
 AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
-       // Always keep at least one buffer (it might have data that was 
"unread" in it)
-       if (bufferQueue.size()<=1)
-               return 0;
-       BufferBase* buff = bufferQueue.back();
-       buff->dataStart = 0;
-       buff->dataCount = 0;
-       bufferQueue.pop_back();
-       return buff;
+    // Always keep at least one buffer (it might have data that was "unread" 
in it)
+    if (bufferQueue.size()<=1)
+        return 0;
+    BufferBase* buff = bufferQueue.back();
+    assert(buff);
+    buff->dataStart = 0;
+    buff->dataCount = 0;
+    bufferQueue.pop_back();
+    return buff;
 }
 
 /*
@@ -204,6 +205,7 @@
         if (!bufferQueue.empty()) {
             // Read into buffer
             BufferBase* buff = bufferQueue.front();
+            assert(buff);
             bufferQueue.pop_front();
             errno = 0;
             int readCount = buff->byteCount-buff->dataCount;
@@ -227,6 +229,7 @@
             } else {
                 // Put buffer back (at front so it doesn't interfere with 
unread buffers)
                 bufferQueue.push_front(buff);
+                assert(buff);
                 
                 // Eof or other side has gone away
                 if (rc == 0 || errno == ECONNRESET) {
@@ -352,10 +355,10 @@
  * Close the socket and callback to say we've done it
  */
 void AsynchIO::close(DispatchHandle& h) {
-       h.stopWatch();
-       h.getSocket().close();
-       if (closedCallback) {
-            closedCallback(*this, getSocket());
-       }
+    h.stopWatch();
+    h.getSocket().close();
+    if (closedCallback) {
+        closedCallback(*this, getSocket());
+    }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=610972&r1=610971&r2=610972&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Thu Jan 10 14:50:23 
2008
@@ -476,7 +476,7 @@
             session.close();
         }
         catch (const std::exception& e) {
-            cout << "Publisher exception: " << e.what() << endl;
+            cout << "SubscribeThread exception: " << e.what() << endl;
             exit(1);
         }
     }


Reply via email to