Author: astitcher
Date: Fri Jun 29 11:56:11 2007
New Revision: 552001

URL: http://svn.apache.org/viewvc?view=rev&rev=552001
Log:
* More work on asychronous network IO
* Fix of current EventQueue code to carry on compiling

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=552001&r1=552000&r2=552001
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jun 29 11:56:11 2007
@@ -68,9 +68,11 @@
   qpid/sys/posix/EventChannelThreads.h
 
 posix_plat_src = \
-  qpid/sys/posix/check.cpp \
+  qpid/sys/Dispatcher.cpp \
   qpid/sys/epoll/EpollPoller.cpp \
+  qpid/sys/posix/check.cpp \
   qpid/sys/posix/Socket.cpp \
+  qpid/sys/posix/AsynchIO.cpp \
   qpid/sys/posix/Time.cpp \
   qpid/sys/posix/Thread.cpp
 
@@ -349,6 +351,7 @@
   qpid/framing/amqp_types.h \
   qpid/framing/amqp_types_full.h \
   qpid/sys/Acceptor.h \
+  qpid/sys/AsynchIO.h \
   qpid/sys/AtomicCount.h \
   qpid/sys/Dispatcher.h \
   qpid/sys/Condition.h \

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?view=auto&rev=552001
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Fri Jun 29 11:56:11 
2007
@@ -0,0 +1,100 @@
+#ifndef _sys_AsynchIO
+#define _sys_AsynchIO
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Dispatcher.h"
+
+#include <boost/function.hpp>
+#include <deque>
+
+namespace qpid {
+namespace sys {
+
+/*
+ * Asynchronous acceptor: accepts connections then does a callback with the
+ * accepted fd
+ */
+class AsynchAcceptor {
+public:
+    typedef boost::function1<void, int> Callback;
+
+private:
+    Callback acceptedCallback;
+    DispatchHandle handle;
+
+public:
+    AsynchAcceptor(int fd, Callback callback);
+    void start(Poller::shared_ptr poller);
+
+private:
+    void readable(DispatchHandle& handle);
+};
+
+/*
+ * Asycnchronous reader/writer: 
+ * Reader accepts buffers to read into; reads into the provided buffers
+ * and then does a callback with the buffer and amount read. Optionally it can 
callback
+ * when there is something to read but no buffer to read it into.
+ * 
+ * Writer accepts a buffer and queues it for writing; can also be given
+ * a callback for when writing is "idle" (ie fd is writable, but nothing to 
write)
+ */
+class AsynchIO {
+public:
+    struct Buffer {
+        char* const bytes;
+        const int32_t byteCount;
+        
+        Buffer(char* const b, const int32_t s) :
+            bytes(b),
+            byteCount(s)
+        {}
+    };
+
+    typedef boost::function2<void, const Buffer&, int32_t> ReadCallback;
+    typedef boost::function0<void> EofCallback;
+    typedef boost::function0<void> BuffersEmptyCallback;
+    typedef boost::function1<void, int> IdleCallback;
+
+private:
+    ReadCallback readCallback;
+    EofCallback eofCallback;
+    BuffersEmptyCallback emptyCallback;
+    IdleCallback idleCallback;
+    DispatchHandle handle;
+    std::deque<Buffer> bufferQueue;
+    std::deque<Buffer> writeQueue;
+
+public:
+    AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, BuffersEmptyCallback 
eCb = 0, IdleCallback iCb = 0);
+    void start(Poller::shared_ptr poller);
+    void QueueReadBuffer(const Buffer& buff);
+    void QueueWrite(const Buffer& buff);
+
+private:
+    void readable(DispatchHandle& handle);
+    void writeable(DispatchHandle& handle);
+};
+
+}}
+
+#endif // _sys_AsynchIO

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?view=diff&rev=552001&r1=552000&r2=552001
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Fri Jun 29 
11:56:11 2007
@@ -36,23 +36,20 @@
 void Dispatcher::run() {
     do {
         Poller::Event event = poller->wait();
-        // Poller::wait guarantees to return an event
         DispatchHandle* h = static_cast<DispatchHandle*>(event.handle);
-        switch (event.dir) {
-        case Poller::IN:
-            h->readableCallback(*h);
-            break;
-        case Poller::OUT:
-            h->writableCallback(*h);
-            break;
-        case Poller::INOUT:
-            h->readableCallback(*h);
-            h->writableCallback(*h);
-            break;
-        case Poller::SHUTDOWN:
-            goto dispatcher_shutdown;
-        default:
-            ;
+
+        // If can read/write then dispatch appropriate callbacks        
+        if (h) {
+            h->dispatchCallbacks(event.dir);
+        } else {
+            // Handle shutdown
+            switch (event.dir) {
+            case Poller::SHUTDOWN:
+                goto dispatcher_shutdown;
+            default:
+                // This should be impossible
+                assert(false);
+            }
         }
     } while (true);
     
@@ -63,11 +60,16 @@
 void DispatchHandle::watch(Poller::shared_ptr poller0) {
     bool r = readableCallback;
     bool w = writableCallback;
-    
+
+    ScopedLock<Mutex> lock(stateLock);
+    assert(state == IDLE);
+
     // If no callbacks set then do nothing (that is what we were asked to do!)
     // TODO: Maybe this should be an assert instead
-    if (!r && !w)
+    if (!r && !w) {
+        state = INACTIVE;
         return;
+    }
 
     Poller::Direction d = r ?
         (w ? Poller::INOUT : Poller::IN) :
@@ -75,16 +77,179 @@
 
     poller = poller0;
     poller->addFd(*this, d);
+    
+    state = r ?
+        (w ? ACTIVE_RW : ACTIVE_R) :
+        ACTIVE_W;
 }
 
 void DispatchHandle::rewatch() {
     assert(poller);
-    poller->rearmFd(*this);
+    bool r = readableCallback;
+    bool w = writableCallback;
+
+    ScopedLock<Mutex> lock(stateLock);
+    switch(state) {
+    case DispatchHandle::IDLE:
+        assert(false);
+        break;
+    case DispatchHandle::DELAYED_R:
+    case DispatchHandle::DELAYED_W:
+    case DispatchHandle::CALLBACK:
+        state = r ?
+            (w ? DELAYED_RW : DELAYED_R) :
+            DELAYED_W;
+        break;
+    case DispatchHandle::INACTIVE:
+    case DispatchHandle::ACTIVE_R:
+    case DispatchHandle::ACTIVE_W: {
+        Poller::Direction d = r ?
+            (w ? Poller::INOUT : Poller::IN) :
+            Poller::OUT;
+        poller->modFd(*this, d);
+        state = r ?
+            (w ? ACTIVE_RW : ACTIVE_R) :
+            ACTIVE_W;
+        break;
+        }
+    case DispatchHandle::DELAYED_RW:
+    case DispatchHandle::ACTIVE_RW:
+        // Don't need to do anything already waiting for readable/writable
+        break;
+    }
+}
+
+void DispatchHandle::rewatchRead() {
+    assert(poller);
+    if (!readableCallback) {
+        return;
+    }
+    
+    ScopedLock<Mutex> lock(stateLock);
+    switch(state) {
+    case DispatchHandle::IDLE:
+        assert(false);
+        break;
+    case DispatchHandle::DELAYED_R:
+    case DispatchHandle::DELAYED_RW:
+        break;
+    case DispatchHandle::DELAYED_W:
+        state = DELAYED_RW;
+        break;
+    case DispatchHandle::CALLBACK:
+        state = DELAYED_R;
+        break;
+    case DispatchHandle::ACTIVE_R:
+    case DispatchHandle::ACTIVE_RW:
+        // Nothing to do: already wating for readable
+        break;
+    case DispatchHandle::INACTIVE:
+        poller->modFd(*this, Poller::IN);
+        state = ACTIVE_R;
+        break;
+    case DispatchHandle::ACTIVE_W:
+        poller->modFd(*this, Poller::INOUT);
+        state = ACTIVE_RW;
+        break;
+    }
+}
+
+void DispatchHandle::rewatchWrite() {
+    assert(poller);
+    if (!writableCallback) {
+        return;
+    }
+    
+    ScopedLock<Mutex> lock(stateLock);
+    switch(state) {
+    case DispatchHandle::IDLE:
+        assert(false);
+        break;
+    case DispatchHandle::DELAYED_W:
+    case DispatchHandle::DELAYED_RW:
+        break;
+    case DispatchHandle::DELAYED_R:
+        state = DELAYED_RW;
+        break;
+    case DispatchHandle::CALLBACK:
+        state = DELAYED_W;
+        break;
+    case DispatchHandle::INACTIVE:
+        poller->modFd(*this, Poller::OUT);
+        state = ACTIVE_W;
+        break;
+    case DispatchHandle::ACTIVE_R:
+        poller->modFd(*this, Poller::INOUT);
+        state = ACTIVE_RW;
+        break;
+    case DispatchHandle::ACTIVE_W:
+    case DispatchHandle::ACTIVE_RW:
+        // Nothing to do: already waiting for writable
+        break;
+   }
 }
 
 void DispatchHandle::unwatch() {
+    assert(poller);
+    ScopedLock<Mutex> lock(stateLock);
     poller->delFd(*this);
     poller.reset();
+    state = IDLE;
+}
+
+void DispatchHandle::dispatchCallbacks(Poller::Direction dir) {
+    // Note that we are now doing the callbacks
+    {
+    ScopedLock<Mutex> lock(stateLock);
+    assert(
+        state == ACTIVE_R ||
+        state == ACTIVE_W ||
+        state == ACTIVE_RW);
+
+    state = CALLBACK;
+    }
+
+    // Do callbacks - whilst we are doing the callbacks we are prevented from 
processing
+    // the same handle until we re-enable it. To avoid rentering the callbacks 
for a single
+    // handle re-enabling in the callbacks is actually deferred until they are 
complete.
+    switch (dir) {
+    case Poller::IN:
+        readableCallback(*this);
+        break;
+    case Poller::OUT:
+        writableCallback(*this);
+        break;
+    case Poller::INOUT:
+        readableCallback(*this);
+        writableCallback(*this);
+        break;
+    default:
+        assert(false);
+    }
+
+    // If any of the callbacks re-enabled reading/writing then actually
+    // do it now
+    ScopedLock<Mutex> lock(stateLock);
+    switch (state) {
+    case DELAYED_R:
+        poller->modFd(*this, Poller::IN);
+        state = ACTIVE_R;
+        break;
+    case DELAYED_W:
+        poller->modFd(*this, Poller::OUT);
+        state = ACTIVE_W;
+        break;
+    case DELAYED_RW:
+        poller->modFd(*this, Poller::INOUT);
+        state = ACTIVE_RW;
+        break;
+    case CALLBACK:
+        state = INACTIVE;
+        break;
+    default:
+        // This should be impossible
+        assert(false);
+    }            
 }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h?view=diff&rev=552001&r1=552000&r2=552001
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h Fri Jun 29 11:56:11 
2007
@@ -22,9 +22,9 @@
  *
  */
 
-
 #include "Poller.h"
 #include "Runnable.h"
+#include "Mutex.h"
 
 #include <memory>
 #include <boost/function.hpp>
@@ -45,18 +45,25 @@
     Callback readableCallback;
     Callback writableCallback;
     Poller::shared_ptr poller;
+    Mutex stateLock;
+    enum { IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, CALLBACK, DELAYED_R, 
DELAYED_W, DELAYED_RW} state;
 
 public:
-    
     DispatchHandle(int fd, Callback rCb, Callback wCb) :
       PollerHandle(fd),
       readableCallback(rCb),
-      writableCallback(wCb)
+      writableCallback(wCb),
+      state(IDLE)
     {}
 
     void watch(Poller::shared_ptr poller);
     void rewatch();
+    void rewatchRead();
+    void rewatchWrite();
     void unwatch();
+
+private:
+    void dispatchCallbacks(Poller::Direction dir);
 };
 
 class Dispatcher : public Runnable {

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?view=auto&rev=552001
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Fri Jun 29 
11:56:11 2007
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AsynchIO.h"
+
+#include "check.h"
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
+
+#include <boost/bind.hpp>
+
+using namespace qpid::sys;
+
+namespace {
+
+/*
+ * Make file descriptor non-blocking
+ */
+void nonblocking(int fd) {
+    QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK));
+}
+
+}
+
+/*
+ * Asynch Acceptor
+ */
+
+AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) :
+    acceptedCallback(callback),
+    handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0) {
+
+    nonblocking(fd);
+}
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+    handle.watch(poller);
+}
+
+/*
+ * We keep on accepting as long as there is something to accept
+ */
+void AsynchAcceptor::readable(DispatchHandle& h) {
+    int afd;
+    do {
+        errno = 0;
+        // TODO: Currently we ignore the peers address, perhaps we should
+        // log it or use it for connection acceptance.
+        afd = ::accept(h.getFD(), 0, 0);
+        if (afd >= 0) {
+            acceptedCallback(afd);
+        } else if (errno == EAGAIN) {
+            break;
+        } else {
+            QPID_POSIX_CHECK(afd);
+        }
+    } while (true);
+
+    h.rewatch();
+}
+
+/*
+ * Asynch reader/writer
+ */
+AsynchIO::AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, 
BuffersEmptyCallback eCb, IdleCallback iCb) :
+    readCallback(rCb),
+    eofCallback(eofCb),
+    emptyCallback(eCb),
+    idleCallback(iCb),
+    handle(fd, boost::bind(&AsynchIO::readable, this, _1), 
boost::bind(&AsynchIO::writeable, this, _1)) {
+
+    nonblocking(fd);
+}
+
+void AsynchIO::start(Poller::shared_ptr poller) {
+    handle.watch(poller);
+}
+
+void AsynchIO::QueueReadBuffer(const Buffer& buff) {
+    bufferQueue.push_front(buff);
+    handle.rewatchRead();
+}
+
+void AsynchIO::QueueWrite(const Buffer& buff) {
+    writeQueue.push_front(buff);
+    handle.rewatchWrite();
+}
+
+/*
+ * We keep on reading as long as we have something to read and a buffer to put
+ * it in
+ */
+void AsynchIO::readable(DispatchHandle& h) {
+    do {
+        // (Try to) get a buffer
+        if (!bufferQueue.empty()) {
+            // Read into buffer
+            Buffer buff = bufferQueue.back();
+            bufferQueue.pop_back();
+            errno = 0;
+            int rc = ::read(h.getFD(), buff.bytes, buff.byteCount);
+            if (rc == 0) {
+                eofCallback();    
+            } else if (rc > 0) {
+                readCallback(buff, rc);
+            } else {
+                // Put buffer back
+                bufferQueue.push_back(buff);
+                
+                if (errno == EAGAIN) {
+                    // We must have just put a buffer back so we know
+                    // we can do this
+                    h.rewatchRead();
+                    return;
+                } else {
+                    QPID_POSIX_CHECK(rc);
+                }
+            }
+        } else {
+            // Something to read but no buffer
+            if (emptyCallback) {
+                emptyCallback();
+            }
+            // If we still have no buffers we can't do anything more
+            if (bufferQueue.empty()) {
+                return;
+            }
+            
+        }
+    } while (true);
+}
+
+/*
+ * We carry on writing whilst we have data to write and we can write
+ */
+void AsynchIO::writeable(DispatchHandle& h) {
+    do {
+        // See if we've got something to write
+        if (!writeQueue.empty()) {
+            // Write buffer
+            Buffer buff = writeQueue.back();
+            writeQueue.pop_back();
+            errno = 0;
+            int rc = ::write(h.getFD(), buff.bytes, buff.byteCount);
+            if (rc >= 0) {
+                // Recycle the buffer
+                QueueReadBuffer(buff);
+            } else {
+                // Put buffer back
+                writeQueue.push_back(buff);
+                
+                if (errno == EAGAIN) {
+                    // We have just put a buffer back so we know
+                    // we can do this
+                    h.rewatchWrite();
+                    return;
+                } else {
+                    QPID_POSIX_CHECK(rc);
+                }
+            }
+        } else {
+            // Something to read but no buffer
+            if (idleCallback) {
+                idleCallback(h.getFD());
+            }
+            // If we still have no buffers to write we can't do anything more
+            if (writeQueue.empty()) {
+                return;
+            }
+        }
+    } while (true);
+}
+        

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp?view=diff&rev=552001&r1=552000&r2=552001
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp 
Fri Jun 29 11:56:11 2007
@@ -62,11 +62,10 @@
 }
 
 
-void EventChannelConnection::send(std::auto_ptr<AMQFrame> frame) {
+void EventChannelConnection::send(AMQFrame& frame) {
     {
         Monitor::ScopedLock lock(monitor);
-        assert(frame.get());
-        writeFrames.push_back(frame.release());
+        writeFrames.push_back(frame);
     }
     closeOnException(&EventChannelConnection::startWrite);
 }
@@ -119,7 +118,6 @@
 // Called by endWrite and send, but only one thread writes at a time.
 // 
 void EventChannelConnection::startWrite() {
-    FrameQueue::auto_type frame;
     {
         Monitor::ScopedLock lock(monitor);
         // Stop if closed or a write event is already in progress.
@@ -130,14 +128,15 @@
             return;
         }
         isWriting = true;
-        frame = writeFrames.pop_front();
+        AMQFrame& frame = writeFrames.front();
+        writeFrames.pop_front();
+           // No need to lock here - only one thread can be writing at a time.
+           out.clear();
+           if (isTrace)
+               cout << "Send on socket " << writeFd << ": " << frame << endl;
+           frame.encode(out);
+           out.flip();
     }
-    // No need to lock here - only one thread can be writing at a time.
-    out.clear();
-    if (isTrace)
-        cout << "Send on socket " << writeFd << ": " << *frame << endl;
-    frame->encode(out);
-    out.flip();
     // TODO: AMS 1/6/07 This only works because we already have the correct fd
     // in the descriptor - change not to use assigment
     writeEvent = WriteEvent(
@@ -225,11 +224,10 @@
         in.flip();
         AMQFrame frame;
         while (frame.decode(in)) {
-            // TODO aconway 2006-11-30: received should take Frame&
             if (isTrace)
                 cout << "Received on socket " << readFd
                      << ": " << frame << endl;
-            handler->received(&frame); 
+            handler->received(frame); 
         }
         in.compact();
         startRead();

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h?view=diff&rev=552001&r1=552000&r2=552001
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h 
Fri Jun 29 11:56:11 2007
@@ -50,17 +50,11 @@
         bool isTrace = false
     );
 
-    // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr
-    virtual void send(qpid::framing::AMQFrame* frame) {
-        send(std::auto_ptr<qpid::framing::AMQFrame>(frame));
-    }
-            
-    virtual void send(std::auto_ptr<qpid::framing::AMQFrame> frame);
-
+    virtual void send(qpid::framing::AMQFrame& frame);
     virtual void close();
 
   private:
-    typedef boost::ptr_deque<qpid::framing::AMQFrame> FrameQueue;
+    typedef std::deque<qpid::framing::AMQFrame> FrameQueue;
     typedef void (EventChannelConnection::*MemberFnPtr)();
     struct ScopedBusy;
 


Reply via email to