Author: aconway
Date: Thu Jan 24 14:26:12 2008
New Revision: 615063

URL: http://svn.apache.org/viewvc?rev=615063&view=rev
Log:

Improved/additional client API tests.
 - Replaced InProcessBroker with a more accurate loopback BrokerFixture.
 - Added asserts for mutex/condition/thread errors in debug build.
 - Added client tests for several exception conditions.
 - Added peer address to log ouput, client/server distinguished by (addr) or 
[addr]
 - Fixed various deadlocks & races exposed by the new  asserts & tests.


File-by-file:

New BrokerFixture replaces InProcessBroker
D      src/tests/InProcessBroker.h
M      src/tests/BrokerFixture.h
M      src/tests/SocketProxy.h
M      src/tests/Makefile.am

Made it run a bit faster.
M      src/tests/quick_perftest

Redundant
D      src/tests/APRBaseTest.cpp

Updated tests to use BrokerFixture
M      src/tests/ClientChannelTest.cpp
M      src/tests/exception_test.cpp
M      src/tests/ClientSessionTest.cpp

Print thread IDs in decimal, same as GDB.
M      src/qpid/log/Logger.cpp

Assert mutex/condition ops in debug build.
M      src/qpid/sys/posix/check.h
M      src/qpid/sys/posix/Mutex.h
M      src/qpid/sys/posix/Condition.h
M      src/qpid/sys/posix/Thread.h

Added toFd() so SocketProxy can use ::select()
M      src/qpid/sys/Socket.h
M      src/qpid/sys/posix/Socket.cpp

Fixes for races & deadlocks shown up by new tests & asserts.
Mostly shutdown/close issues.
M      src/qpid/client/ConnectionHandler.h
M      src/qpid/client/ConnectionImpl.cpp
M      src/qpid/client/Demux.h
M      src/qpid/client/SessionCore.cpp
M      src/qpid/client/ConnectionHandler.cpp
M      src/qpid/client/Connector.h
M      src/qpid/client/Demux.cpp
M      src/qpid/client/Dispatcher.cpp
M      src/qpid/client/ConnectionImpl.h

Logging peer address.
M      src/qpid/sys/AsynchIOAcceptor.cpp

Removed:
    incubator/qpid/trunk/qpid/cpp/src/tests/APRBaseTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Condition.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Thread.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h
    incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/quick_perftest

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Thu Jan 
24 14:26:12 2008
@@ -142,7 +142,7 @@
 
 void ConnectionHandler::fail(const std::string& message)
 {
-    QPID_LOG(error, message);
+    QPID_LOG(warning, message);
     setState(FAILED);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h Thu Jan 
24 14:26:12 2008
@@ -59,7 +59,6 @@
     void send(const framing::AMQBody& body);
     void error(uint16_t code, const std::string& message, uint16_t classId = 
0, uint16_t methodId = 0);
     void error(uint16_t code, const std::string& message, framing::AMQBody* 
body);
-    void fail(const std::string& message);
 
 public:
     using InputHandler::handle;
@@ -75,6 +74,7 @@
 
     void waitForOpen();
     void close();
+    void fail(const std::string& message);
 
     CloseListener onClose;
     ErrorListener onError;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu Jan 24 
14:26:12 2008
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include "qpid/log/Statement.h"
 #include "qpid/framing/constants.h"
 #include "qpid/framing/reply_exceptions.h"
 
@@ -44,14 +45,18 @@
     connector->setShutdownHandler(this);
 }
 
-ConnectionImpl::~ConnectionImpl() { close(); }
+ConnectionImpl::~ConnectionImpl() {
+    // Important to close the connector first, to ensure the
+    // connector thread does not call on us while the destructor
+    // is running.
+    connector->close(); 
+}
 
 void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
 {
     Mutex::ScopedLock l(lock);
     boost::weak_ptr<SessionCore>& s = sessions[session->getChannel()];
-    if (s.lock())
-        throw ChannelBusyException();
+    if (s.lock()) throw ChannelBusyException();
     s = session;
 }
 
@@ -81,31 +86,15 @@
     handler.pwd = pwd;
     handler.vhost = vhost;
 
+    QPID_LOG(info, "Connecting to " << host << ":" << port);
     connector->connect(host, port);
     connector->init();
     handler.waitForOpen();
 }
 
-bool ConnectionImpl::setClosing()
-{
-    Mutex::ScopedLock l(lock);
-    if (isClosing || isClosed) {
-        return false;
-    }
-    isClosing = true;
-    return true;
-}
-
-void ConnectionImpl::close()
-{
-    if (setClosing()) {
-        handler.close();
-    }
-}
-
 void ConnectionImpl::idleIn()
 {
-    connector->close();
+    close();
 }
 
 void ConnectionImpl::idleOut()
@@ -114,35 +103,52 @@
     connector->send(frame);
 }
 
-template <class F>
-void ConnectionImpl::forChannels(F functor) {
-    for (SessionMap::iterator i = sessions.begin();
-         i != sessions.end(); ++i) {
-        try {
-            boost::shared_ptr<SessionCore> s = i->second.lock();
-            if (s) functor(*s);
-        } catch (...) { assert(0); }
+void ConnectionImpl::close()
+{
+    Mutex::ScopedLock l(lock);
+    if (isClosing || isClosed) return;
+    isClosing = true;
+    {
+        Mutex::ScopedUnlock u(lock);
+        handler.close();
+    }
+    closed(REPLY_SUCCESS, "Closed by client");
+}
+
+// Set closed flags and erase the sessions map, but keep the contents
+// so sessions can be updated outside the lock.
+ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const 
Mutex::ScopedLock&) {
+    isClosed = true;
+    connector->close();
+    SessionVector save;
+    for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+        boost::shared_ptr<SessionCore> s = i->second.lock();
+        if (s) save.push_back(s);
     }
+    sessions.clear();
+    return save;
 }
 
-void ConnectionImpl::shutdown() 
+void ConnectionImpl::closed(uint16_t code, const std::string& text) 
 {
     Mutex::ScopedLock l(lock);
     if (isClosed) return;
-    forChannels(boost::bind(&SessionCore::connectionBroke, _1,
-                            INTERNAL_ERROR, "Unexpected socket closure."));
-    sessions.clear();
-    isClosed = true;
+    SessionVector save(closeInternal(l));
+    Mutex::ScopedUnlock u(lock);
+    std::for_each(save.begin(), save.end(), 
boost::bind(&SessionCore::connectionClosed, _1, code, text));
 }
 
-void ConnectionImpl::closed(uint16_t code, const std::string& text) 
+static const std::string CONN_CLOSED("Connection closed by broker");
+
+void ConnectionImpl::shutdown() 
 {
     Mutex::ScopedLock l(lock);
     if (isClosed) return;
-    forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text));
-    sessions.clear();
-    isClosed = true;
-    connector->close();
+    SessionVector save(closeInternal(l));
+    handler.fail(CONN_CLOSED);
+    Mutex::ScopedUnlock u(lock);
+    std::for_each(save.begin(), save.end(),
+                  boost::bind(&SessionCore::connectionBroke, _1, 
INTERNAL_ERROR, CONN_CLOSED));
 }
 
 void ConnectionImpl::erase(uint16_t ch) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Thu Jan 24 
14:26:12 2008
@@ -43,6 +43,8 @@
 
 {
     typedef std::map<uint16_t, boost::weak_ptr<SessionCore> > SessionMap;
+    typedef std::vector<boost::shared_ptr<SessionCore> > SessionVector;
+
     SessionMap sessions; 
     ConnectionHandler handler;
     boost::shared_ptr<Connector> connector;
@@ -51,14 +53,15 @@
     bool isClosed;
     bool isClosing;
 
+    template <class F> void detachAll(const F&);
+
+    SessionVector closeInternal(const sys::Mutex::ScopedLock&);
     void incoming(framing::AMQFrame& frame);    
     void closed(uint16_t, const std::string&);
     void idleOut();
     void idleIn();
     void shutdown();
     bool setClosing();
-
-    template <class F> void forChannels(F functor);
 
   public:
     typedef boost::shared_ptr<ConnectionImpl> shared_ptr;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Thu Jan 24 
14:26:12 2008
@@ -27,7 +27,7 @@
 #include "qpid/sys/AsynchIO.h"
 #include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Poller.h"
-
+#include "qpid/Msg.h"
 #include <boost/bind.hpp>
 
 namespace qpid {
@@ -43,6 +43,7 @@
     send_buffer_size(buffer_size),
     version(ver), 
     closed(true),
+    joined(true),
     timeout(0),
     idleIn(0), idleOut(0), 
     timeoutHandler(0),
@@ -52,11 +53,11 @@
 
 Connector::~Connector() {
     close();
-    if (receiver.id() && receiver.id() != Thread::current().id())
-        receiver.join();
 }
 
 void Connector::connect(const std::string& host, int port){
+    Mutex::ScopedLock l(closedLock);
+    assert(closed);
     socket.connect(host, port);
     closed = false;
     poller = Poller::shared_ptr(new Poller);
@@ -71,20 +72,27 @@
 }
 
 void Connector::init(){
+    Mutex::ScopedLock l(closedLock);
+    assert(joined);
     ProtocolInitiation init(version);
-
     writeDataBlock(init);
+    joined = false;
     receiver = Thread(this);
 }
 
 bool Connector::closeInternal() {
     Mutex::ScopedLock l(closedLock);
+    bool ret = !closed;
     if (!closed) {
-        poller->shutdown();
         closed = true;
-        return true;
+        poller->shutdown();
+    }
+    if (!joined && receiver.id() != Thread::current().id()) {
+        joined = true;
+        Mutex::ScopedUnlock u(closedLock);
+        receiver.join();
     }
-    return false;
+    return ret;
 }
         
 void Connector::close() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Thu Jan 24 
14:26:12 2008
@@ -77,8 +77,9 @@
     const int send_buffer_size;
     framing::ProtocolVersion version;
 
-    bool closed;
     sys::Mutex closedLock;
+    bool closed;
+    bool joined;
 
     sys::AbsTime lastIn;
     sys::AbsTime lastOut;
@@ -112,6 +113,8 @@
     void writebuff(qpid::sys::AsynchIO&);
     void writeDataBlock(const framing::AMQDataBlock& data);
     void eof(qpid::sys::AsynchIO&);
+
+    std::string identifier;
     
   friend class Channel;
 
@@ -130,6 +133,7 @@
     virtual void send(framing::AMQFrame& frame);
     virtual void setReadTimeout(uint16_t timeout);
     virtual void setWriteTimeout(uint16_t timeout);
+    const std::string& getIdentifier() const { return identifier; }
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp Thu Jan 24 14:26:12 
2008
@@ -45,6 +45,10 @@
     demuxer.remove(dest); 
 }
 
+Demux::Demux() : defaultQueue(new Queue()) {}
+
+Demux::~Demux() { close(); }
+
 Demux::QueuePtr ScopedDivert::getQueue()
 {
     return queue;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h Thu Jan 24 14:26:12 
2008
@@ -47,7 +47,8 @@
     typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue;
     typedef boost::shared_ptr<Queue> QueuePtr;
 
-    Demux() : defaultQueue(new Queue()) {}
+    Demux();
+    ~Demux();
     
     void handle(framing::FrameSet::shared_ptr);
     void close();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Thu Jan 24 
14:26:12 2008
@@ -62,13 +62,12 @@
 }
 
 void Dispatcher::run()
-{    
+{
     Mutex::ScopedLock l(lock);
     if (running) 
         throw Exception("Dispatcher is already running.");
     boost::state_saver<bool>  reset(running); // Reset to false on exit.
     running = true;
-    queue->open();
     try {
         while (!queue->isClosed()) {
             Mutex::ScopedUnlock u(lock);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Thu Jan 24 
14:26:12 2008
@@ -125,7 +125,7 @@
     channel.next = 0;
     code=c;
     text=t;
-    l3.getDemux().close();
+    l3.getDemux().close();      
 }
 
 void SessionCore::doClose(int code, const std::string& text) {
@@ -270,7 +270,6 @@
     Lock l(state);
     check(state == SUSPENDING,
           COMMAND_INVALID, UNEXPECTED_SESSION_DETACHED);
-    connection->erase(channel);
     doSuspend(REPLY_SUCCESS, OK);
 }
 
@@ -379,22 +378,28 @@
 
 // Network thread.
 void SessionCore::handleIn(AMQFrame& frame) {
+    ConnectionImpl::shared_ptr save;
     {
         Lock l(state);
+        save=connection;
         // Ignore frames received while closing other than closed response.
         if (state==CLOSING && !isCloseResponse(frame))
             return;             
     }
     try {
         // Cast to expose private SessionHandler functions.
-        if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+        if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+            // If we were detached by a session command, tell the connection.
+            if (!connection) save->erase(channel);
+        }
+        else {
             session->received(frame);
             l3.handle(frame);
         }
     } catch (const ChannelException& e) {
         QPID_LOG(error, "Channel exception:" << e.what());
         doClose(e.code, e.what());
-    } 
+    }
 }
 
 void SessionCore::handleOut(AMQFrame& frame)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp Thu Jan 24 14:26:12 
2008
@@ -114,7 +114,7 @@
     if (flags&LEVEL)
         os << LevelTraits::name(s.level) << " ";
     if (flags&THREAD)
-        os << "[" << hex << qpid::sys::Thread::logId() << "] ";
+        os << "[" << qpid::sys::Thread::logId() << "] ";
     if (flags&FILE)
         os << s.file << ":";
     if (flags&LINE)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp Thu Jan 24 14:26:12 
2008
@@ -26,7 +26,7 @@
 using namespace std;
 
 Options::Options(const std::string& name) : qpid::Options(name),
-    time(true), level(true), thread(false), source(false), function(false), 
trace(false)
+                                            time(true), level(true), 
thread(false), source(false), function(false), trace(false)
 {
     outputs.push_back("stderr");
     selectors.push_back("error+");
@@ -43,14 +43,14 @@
         ("trace,t", optValue(trace), "Enables all logging" )
         ("log-enable", optValue(selectors, "RULE"),
          ("Enables logging for selected levels and components. " 
-         "RULE is in the form 'LEVEL[+][:PATTERN]' "
-         "Levels are one of: \n\t "+levels.str()+"\n"
-         "For example:\n"
-         "\t'--log-enable warning+' "
-         "logs all warning, error and critical messages.\n"
-         "\t'--log-enable debug:framing' "
+          "RULE is in the form 'LEVEL[+][:PATTERN]' "
+          "Levels are one of: \n\t "+levels.str()+"\n"
+          "For example:\n"
+          "\t'--log-enable warning+' "
+          "logs all warning, error and critical messages.\n"
+          "\t'--log-enable debug:framing' "
           "logs debug messages from the framing namespace. "
-         "This option can be used multiple times").c_str())
+          "This option can be used multiple times").c_str())
         ("log-time", optValue(time, "yes|no"),
          "Include time in log messages")
         ("log-level", optValue(level,"yes|no"),

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Thu Jan 24 
14:26:12 2008
@@ -110,6 +110,7 @@
     void init(AsynchIO* a, ConnectionInputHandler* h) {
         aio = a;
         inputHandler = h;
+        identifier = aio->getSocket().getPeerAddress();
     }
 
     // Output side
@@ -229,7 +230,6 @@
     }else{
         framing::ProtocolInitiation protocolInit;
         if(protocolInit.decode(in)){
-            identifier = aio->getSocket().getPeerAddress();
             QPID_LOG(debug, "INIT [" << identifier << "]");
             inputHandler->initiated(protocolInit);
             initiated = true;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Thu Jan 24 14:26:12 2008
@@ -97,6 +97,8 @@
     int read(void *buf, size_t count) const;
     int write(const void *buf, size_t count) const;
 
+    int toFd() const;
+    
 private:
        Socket(SocketPrivate*);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Condition.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Condition.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Condition.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Condition.h Thu Jan 24 
14:26:12 2008
@@ -52,15 +52,15 @@
 };
 
 Condition::Condition() {
-    QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_cond_init(&condition, 0));
 }
 
 Condition::~Condition() {
-    QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_cond_destroy(&condition));
 }
 
 void Condition::wait(Mutex& mutex) {
-    QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex));
 }
 
 bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){
@@ -75,11 +75,11 @@
 }
 
 void Condition::notify(){
-    QPID_POSIX_THROW_IF(pthread_cond_signal(&condition));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_cond_signal(&condition));
 }
 
 void Condition::notifyAll(){
-    QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_cond_broadcast(&condition));
 }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h Thu Jan 24 
14:26:12 2008
@@ -136,11 +136,11 @@
 #define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER }
 
 void PODMutex::lock() {
-    QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex));
 }
 
 void PODMutex::unlock() {
-    QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex));
 }
 
 bool PODMutex::trylock() {
@@ -148,19 +148,19 @@
 }
 
 Mutex::Mutex() {
-    QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, recursiveMutexattr));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_init(&mutex, recursiveMutexattr));
 }
 
 Mutex::~Mutex(){
-    QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_destroy(&mutex));
 }
 
 void Mutex::lock() {
-    QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex));
 }
 
 void Mutex::unlock() {
-    QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex));
 }
 
 bool Mutex::trylock() {
@@ -169,31 +169,31 @@
 
 
 RWlock::RWlock() {
-    QPID_POSIX_THROW_IF(pthread_rwlock_init(&rwlock, recursiveRWlockattr));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_init(&rwlock, 
recursiveRWlockattr));
 }
 
 RWlock::~RWlock(){
-    QPID_POSIX_THROW_IF(pthread_rwlock_destroy(&rwlock));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_destroy(&rwlock));
 }
 
 void RWlock::wlock() {
-    QPID_POSIX_THROW_IF(pthread_rwlock_wrlock(&rwlock));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_wrlock(&rwlock));
 }
 
 void RWlock::rlock() {
-    QPID_POSIX_THROW_IF(pthread_rwlock_rdlock(&rwlock));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_rdlock(&rwlock));
 }
 
 void RWlock::unlock() {
-    QPID_POSIX_THROW_IF(pthread_rwlock_unlock(&rwlock));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_unlock(&rwlock));
 }
 
 void RWlock::trywlock() {
-    QPID_POSIX_THROW_IF(pthread_rwlock_trywrlock(&rwlock));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_trywrlock(&rwlock));
 }
 
 void RWlock::tryrlock() {
-    QPID_POSIX_THROW_IF(pthread_rwlock_tryrdlock(&rwlock));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_tryrdlock(&rwlock));
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Thu Jan 24 
14:26:12 2008
@@ -231,6 +231,10 @@
     return impl->getName(false, true);
 }
 
+int Socket::toFd() const {
+    return impl->fd;
+}
+
 int toFd(const SocketPrivate* s)
 {
     return s->fd;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Thread.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Thread.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Thread.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Thread.h Thu Jan 24 
14:26:12 2008
@@ -60,16 +60,16 @@
 Thread::Thread() : thread(0) {}
 
 Thread::Thread(Runnable* runnable) {
-    QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_create(&thread, NULL, runRunnable, 
runnable));
 }
 
 Thread::Thread(Runnable& runnable) {
-    QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable));
+    QPID_POSIX_ASSERT_THROW_IF(pthread_create(&thread, NULL, runRunnable, 
&runnable));
 }
 
 void Thread::join(){
     if (thread != 0)
-        QPID_POSIX_THROW_IF(pthread_join(thread, 0));
+        QPID_POSIX_ASSERT_THROW_IF(pthread_join(thread, 0));
 }
 
 long Thread::id() {
@@ -84,7 +84,7 @@
 
 void Thread::yield() 
 {
-    QPID_POSIX_THROW_IF(pthread_yield());
+    QPID_POSIX_ASSERT_THROW_IF(pthread_yield());
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h Thu Jan 24 
14:26:12 2008
@@ -23,16 +23,26 @@
  */
 
 #include "qpid/Exception.h"
-
 #include <cerrno>
+#include <assert.h>
 
-#define QPID_POSIX_ERROR(ERRNO) 
qpid::Exception(QPID_MSG(qpid::strError(ERRNO)));
+#define QPID_POSIX_ERROR(ERRNO) 
qpid::Exception(QPID_MSG(qpid::strError(ERRNO)))
 
 /** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */
 #define QPID_POSIX_CHECK(RESULT)                        \
     if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno))
 
-/** Throw a posix error if errNo is non-zero */
+/** Throw a posix error if ERRNO is non-zero */
 #define QPID_POSIX_THROW_IF(ERRNO)              \
-    if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO))
+    do { int e=(ERRNO); if (e) throw QPID_POSIX_ERROR(e); } while(0)
+
+/** Same as _THROW_IF in a release build, but abort a debug build */
+#ifdef NDEBUG
+#define QPID_POSIX_ASSERT_THROW_IF(ERRNO) qpid_posix    \
+    QPID_POSIX_THROW_IF(ERRNO)
+#else
+#define QPID_POSIX_ASSERT_THROW_IF(ERRNO)                               \
+    do { int e=(ERRNO); if (e) { errno=e; perror(0); assert(0); } } while(0)
+#endif
+
 #endif  /*!_posix_check_h*/

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Thu Jan 24 14:26:12 
2008
@@ -22,6 +22,7 @@
  *
  */
 
+#include "SocketProxy.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/client/Connection.h"
@@ -30,52 +31,74 @@
 #include "qpid/client/SubscriptionManager.h"
 
 /**
- * A fixture to create an in-process broker and connect to it for tests.
+ * A fixture with an in-process broker.
  */
-struct BrokerFixture {
+struct  BrokerFixture {
     typedef qpid::broker::Broker Broker;
     typedef boost::shared_ptr<Broker> BrokerPtr;
 
-    struct OpenConnection : public qpid::client::Connection {
-        OpenConnection(int port) { open("localhost", port); }
-    };
-    
     BrokerPtr broker;
     qpid::sys::Thread brokerThread;
-    OpenConnection connection;
-    qpid::client::Session_0_10 session;
-    qpid::client::SubscriptionManager subs;
-    qpid::client::LocalQueue lq;
-        
-    BrokerPtr newBroker() {
+
+    BrokerFixture() {
         Broker::Options opts;
         opts.port=0;
         opts.workerThreads=1;
-        BrokerPtr b=Broker::create(opts);
-        // TODO aconway 2007-12-05: Without the following line
-        // the test can hang in the connection ctor. This is
-        // a race condition that should be fixed.
-        b->getPort(); 
-        return b;
+        broker = Broker::create(opts);
+        // TODO aconway 2007-12-05: At one point BrokerFixture
+        // tests could hang in Connection ctor if the following
+        // line is removed. This may not be an issue anymore.
+        broker->getPort();
+        brokerThread = qpid::sys::Thread(*broker);
     };
 
-    BrokerFixture() : broker(newBroker()),
-                      brokerThread(*broker),
-                      connection(broker->getPort()),
-                      session(connection.newSession()),
-                      subs(session)
-    {}
-
     ~BrokerFixture() {
-        connection.close();
         broker->shutdown();
         brokerThread.join();
     }
 
-    /** Open a connection to the local broker */
+    /** Open a connection to the broker. */
     void open(qpid::client::Connection& c) {
         c.open("localhost", broker->getPort());
     }
 };
+
+struct LocalConnection : public qpid::client::Connection {
+    LocalConnection(uint16_t port) { open("localhost", port); }
+};
+
+/** A local client connection via a socket proxy. */
+struct ProxyConnection : public qpid::client::Connection {
+    SocketProxy proxy;
+    ProxyConnection(int brokerPort) : proxy(brokerPort) {
+        open("localhost", proxy.getPort());
+    }
+    ~ProxyConnection() { close(); }
+};
+
+/**
+ * A BrokerFixture with open Connection, Session and
+ * SubscriptionManager and LocalQueue for convenience.
+ */
+template <class ConnectionType>
+struct  SessionFixtureT : BrokerFixture {
+    ConnectionType connection;
+    qpid::client::Session_0_10 session;
+    qpid::client::SubscriptionManager subs;
+    qpid::client::LocalQueue lq;
+
+    SessionFixtureT() : connection(broker->getPort()),
+                       session(connection.newSession()),
+                       subs(session)
+    {}
+
+    ~SessionFixtureT() {
+        connection.close();
+    }
+};
+
+typedef SessionFixtureT<LocalConnection> SessionFixture;
+typedef SessionFixtureT<ProxyConnection> ProxySessionFixture;
+
 
 #endif  /*!TESTS_BROKERFIXTURE_H*/

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp Thu Jan 24 
14:26:12 2008
@@ -44,7 +44,7 @@
  * The test base defines the tests methods, derived classes
  * instantiate the channel in Basic or Message mode.
  */
-class ChannelTestBase : public CppUnit::TestCase, public BrokerFixture
+class ChannelTestBase : public CppUnit::TestCase, public SessionFixture
 {
     struct Listener: public qpid::client::MessageListener {
         vector<Message> messages;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Thu Jan 24 
14:26:12 2008
@@ -20,7 +20,6 @@
  */
 #include "qpid_test_plugin.h"
 #include "BrokerFixture.h"
-#include "SocketProxy.h"
 #include "qpid/client/Dispatcher.h"
 #include "qpid/client/Session_0_10.h"
 #include "qpid/framing/TransferContent.h"
@@ -62,7 +61,7 @@
     }
 };
 
-class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture
+class ClientSessionTest : public CppUnit::TestCase, public ProxySessionFixture
 {
     CPPUNIT_TEST_SUITE(ClientSessionTest);
     CPPUNIT_TEST(testQueueQuery);
@@ -71,7 +70,6 @@
     CPPUNIT_TEST(testResumeExpiredError);
     CPPUNIT_TEST(testUseSuspendedError);
     CPPUNIT_TEST(testSuspendResume);
-    CPPUNIT_TEST(testDisconnectResume);
     CPPUNIT_TEST_SUITE_END();
 
   public:
@@ -85,11 +83,6 @@
         session.messageFlow(destination=dest, unit=1, value=0xFFFFFFFF);//bytes
     }
 
-    bool queueExists(const std::string& q) {
-        TypedResult<QueueQueryResult> result = session.queueQuery(q);
-        return result.get().getQueue() == q;
-    }
-    
     void testQueueQuery() 
     {
         session =connection.newSession();
@@ -166,25 +159,10 @@
         declareSubscribe();
         session.suspend();
         // Make sure we are still subscribed after resume.
-       connection.resume(session);
+        connection.resume(session);
         session.messageTransfer(content=TransferContent("my-message", 
"my-queue"));
         FrameSet::shared_ptr msg = session.get();
         CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent());
-    }
-
-    void testDisconnectResume() {
-        // FIXME aconway 2007-12-11: Test hanging.
-//         ProxyConnection c(broker->getPort());
-//         Session_0_10 s = c.session;
-//         s.queueDeclare(queue="before");
-//         CPPUNIT_ASSERT(queueExists("before"));
-//         s.queueDeclare(queue=string("after"));
-//         c.proxy.client.close();       // Disconnect the client.
-//         Connection c2;
-//         open(c2);
-//         c2.resume(s);
-//         CPPUNIT_ASSERT(queueExists("after"));
-//         c2.close();
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Jan 24 14:26:12 2008
@@ -122,7 +122,6 @@
   topictest                                                            \
   .valgrind.supp                                                       \
   .valgrindrc                                                          \
-  InProcessBroker.h                                                    \
   MessageUtils.h                                                       \
   MockChannel.h                                                                
\
   MockConnectionInputHandler.h                                         \

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h Thu Jan 24 14:26:12 
2008
@@ -24,59 +24,141 @@
 #include "qpid/sys/Socket.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/client/Connection.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
 
 /**
- * A simple socket proxy that forwards to another socket. Used between
- * client & broker to simulate network failures.
+ * A simple socket proxy that forwards to another socket. 
+ * Used between client & local broker to simulate network failures.
  */
-struct SocketProxy : public qpid::sys::Runnable
+class SocketProxy : private qpid::sys::Runnable
 {
-    int port;             // Port bound to server socket.
-    qpid::sys::Socket client, server; // Client & server sockets.
+  public:
+    /** Connect to connectPort on host, start a forwarding thread.
+     * Listen for connection on getPort().
+     */
+    SocketProxy(int connectPort, const std::string host="localhost")
+        : closed(false), port(listener.listen())
+    {
+        int r=::pipe(closePipe);
+        if (r<0) throwErrno(QPID_MSG("::pipe returned " << r));
+        client.connect(host, connectPort);
+        thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
+    }
+    
+    ~SocketProxy() { close(); }
+
+    /** Simulate a network disconnect. */
+    void close() {
+        {
+            qpid::sys::Mutex::ScopedLock l(lock);
+            if (closed) return;
+            closed=true;
+        }
+        write(closePipe[1], this, 1); // Random byte to closePipe
+        thread.join();
+        client.close();
+        ::close(closePipe[0]);
+        ::close(closePipe[1]);
+    }
 
-    SocketProxy(const std::string& host, int port) { init(host,port); }
-    SocketProxy(int port) { init("localhost",port); }
+    bool isClosed() const {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        return closed;
+    }
 
-    ~SocketProxy() { client.close(); server.close(); thread.join(); }
+    uint16_t getPort() const { return port; }
     
   private:
-
-    void init(const std::string& host, int connectPort) {
-        client.connect(host, connectPort);
-        port = server.listen();
-        thread=qpid::sys::Thread(this);
+    static void throwErrno(const std::string& msg) {
+        throw qpid::Exception(msg+":"+qpid::strError(errno));
     }
-
-    void run() {
-        try {
-            do {
-                ssize_t recv = server.recv(buffer, sizeof(buffer));
-                if (recv <= 0) return;
-                ssize_t sent=client.send(buffer, recv);
-                if (sent < 0) return;
-                assert(sent == recv); // Assumes we can send as we receive.
-            } while (true);
-        } catch(...) {}
+    static void throwIf(bool condition, const std::string& msg) {
+        if (condition) throw qpid::Exception(msg);
     }
+    
+    struct FdSet : fd_set {
+        FdSet() : maxFd(0) { clear(); }
+        void clear() { FD_ZERO(this); }
+        void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); }
+        bool isSet(int fd) const { return FD_ISSET(fd, this); }
+        bool operator[](int fd) const { return isSet(fd); }
 
-    qpid::sys::Thread thread;
-    char buffer[64*1024];
-};
+        int maxFd;
+    };
 
-/** A local client connection via a socket proxy. */
-struct ProxyConnection : public qpid::client::Connection {
-    SocketProxy proxy;
-    qpid::client::Session_0_10 session;
+    enum { RD=1, WR=2, ER=4 };
     
-    ProxyConnection(const std::string& host, int port) : proxy(port) {
-        open(host, proxy.port);
-        session=newSession();
-    }
+    struct Selector {
+        FdSet rd, wr, er;
 
-    ProxyConnection(int port) : proxy(port) {
-        open("localhost", proxy.port);
-        session=newSession();
+        void set(int fd, int sets) {
+            if (sets & RD) rd.set(fd);
+            if (sets & WR) wr.set(fd);
+            if (sets & ER) er.set(fd);
+        }
+        
+        int select() {
+            for (;;) {
+                int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd));
+                int r = ::select(maxFd + 1, &rd, &wr, &er, NULL);
+                if (r == -1 && errno == EINTR) continue;
+                if (r < 0) throwErrno(QPID_MSG("select returned " <<r));
+                return r;
+            }
+        }
+    };
+
+    void run() {
+        std::auto_ptr<qpid::sys::Socket> server;
+        try {
+            // Accept incoming connections, watch closePipe.
+            Selector accept;
+            accept.set(listener.toFd(), RD|ER);
+            accept.set(closePipe[0], RD|ER);
+            accept.select();
+            throwIf(accept.rd[closePipe[0]], "Closed by close()");
+            throwIf(!accept.rd[listener.toFd()],"Accept failed");
+            server.reset(listener.accept(0, 0));
+
+            // Pump data between client & server sockets, watch closePipe.
+            char buffer[1024];
+            for (;;) {
+                Selector select;
+                select.set(server->toFd(), RD|ER);
+                select.set(client.toFd(), RD|ER);
+                select.set(closePipe[0], RD|ER);
+                select.select();
+                throwIf(select.rd[closePipe[0]], "Closed by close()");
+                // Read even if fd is in error to throw a useful exception.
+                bool gotData=false;
+                if (select.rd[server->toFd()] || select.er[server->toFd()]) {
+                    client.write(buffer, server->read(buffer, sizeof(buffer)));
+                    gotData=true;
+                }
+                if (select.rd[client.toFd()] || select.er[client.toFd()]) {
+                    server->write(buffer, client.read(buffer, sizeof(buffer)));
+                    gotData=true;
+                }
+                throwIf(!gotData, "No data from select()");
+            }
+        }
+        catch (const std::exception& e) {
+            QPID_LOG(debug, "SocketProxy::run exiting: " << e.what());
+        }
+        if (server.get()) server->close();
+        close(); 
     }
+
+    mutable qpid::sys::Mutex lock;
+    bool closed;
+    qpid::sys::Socket client, listener;
+    uint16_t port;
+    int closePipe[2];
+    qpid::sys::Thread thread;
 };
 
 #endif

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Thu Jan 24 
14:26:12 2008
@@ -21,7 +21,6 @@
 
 #include "unit_test.h"
 #include "BrokerFixture.h"
-#include "SocketProxy.h"
 #include "qpid/client/SubscriptionManager.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
@@ -52,7 +51,7 @@
         try { f(); }
         catch(const Ex& e) {
             caught=true;
-            BOOST_MESSAGE(e.what());
+            BOOST_MESSAGE(string("Caught expected exception: ")+e.what());
         }
         catch(const std::exception& e) {
             BOOST_ERROR(string("Bad exception: ")+e.what());
@@ -71,37 +70,29 @@
     }
 };
 
-// FIXME aconway 2007-12-11: Disabled hanging tests.
-// BOOST_FIXTURE_TEST_CASE(DisconnectedGet, BrokerFixture) {
-//     ProxyConnection c(broker->getPort());
-//     Catcher<ClosedException> get(bind(&Session_0_10::get, c.session));
-//     c.proxy.client.close();           // Close the client side.
-//     BOOST_CHECK(get.join());
-// }
-
-// BOOST_FIXTURE_TEST_CASE(DisconnectedPop, BrokerFixture) {
-//     ProxyConnection c(broker->getPort());
-//     c.session.queueDeclare(arg::queue="q");
-//     subs.subscribe(lq, "q");
-//     Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq)));
-//     c.proxy.client.close();
-//     BOOST_CHECK(pop.join());
-// }
-
-// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, BrokerFixture) {
-//     struct NullListener : public MessageListener {
-//         void received(Message&) { BOOST_FAIL("Unexpected message"); }
-//     } l;
-//     ProxyConnection c(broker->getPort());
-//     c.session.queueDeclare(arg::queue="q");
-//     subs.subscribe(l, "q");
-//     Thread t(subs);
-//     c.proxy.client.close();
-//     t.join();
-//     BOOST_CHECK_THROW(c.session.close(), InternalErrorException);    
-// }
+BOOST_FIXTURE_TEST_CASE(DisconnectedPop, ProxySessionFixture) {
+    ProxyConnection c(broker->getPort());
+    session.queueDeclare(arg::queue="q");
+    subs.subscribe(lq, "q");
+    Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq)));
+    connection.proxy.close();
+    BOOST_CHECK(pop.join());
+}
+
+BOOST_FIXTURE_TEST_CASE(DisconnectedListen, ProxySessionFixture) {
+    struct NullListener : public MessageListener {
+        void received(Message&) { BOOST_FAIL("Unexpected message"); }
+    } l;
+    ProxyConnection c(broker->getPort());
+    session.queueDeclare(arg::queue="q");
+    subs.subscribe(l, "q");
+    Thread t(subs);
+    connection.proxy.close();
+    t.join();
+    BOOST_CHECK_THROW(session.close(), InternalErrorException);    
+}
 
-BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, BrokerFixture) {
+BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) {
     BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), 
NotFoundException);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/quick_perftest
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/quick_perftest?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/quick_perftest (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/quick_perftest Thu Jan 24 14:26:12 
2008
@@ -1,2 +1,2 @@
 #!/bin/sh
-exec `dirname $0`/run_test ./perftest --summary --count 1000
+exec `dirname $0`/run_test ./perftest --summary --count 100


Reply via email to