Author: aconway
Date: Thu Jan 24 14:26:12 2008
New Revision: 615063
URL: http://svn.apache.org/viewvc?rev=615063&view=rev
Log:
Improved/additional client API tests.
- Replaced InProcessBroker with a more accurate loopback BrokerFixture.
- Added asserts for mutex/condition/thread errors in debug build.
- Added client tests for several exception conditions.
- Added peer address to log ouput, client/server distinguished by (addr) or
[addr]
- Fixed various deadlocks & races exposed by the new asserts & tests.
File-by-file:
New BrokerFixture replaces InProcessBroker
D src/tests/InProcessBroker.h
M src/tests/BrokerFixture.h
M src/tests/SocketProxy.h
M src/tests/Makefile.am
Made it run a bit faster.
M src/tests/quick_perftest
Redundant
D src/tests/APRBaseTest.cpp
Updated tests to use BrokerFixture
M src/tests/ClientChannelTest.cpp
M src/tests/exception_test.cpp
M src/tests/ClientSessionTest.cpp
Print thread IDs in decimal, same as GDB.
M src/qpid/log/Logger.cpp
Assert mutex/condition ops in debug build.
M src/qpid/sys/posix/check.h
M src/qpid/sys/posix/Mutex.h
M src/qpid/sys/posix/Condition.h
M src/qpid/sys/posix/Thread.h
Added toFd() so SocketProxy can use ::select()
M src/qpid/sys/Socket.h
M src/qpid/sys/posix/Socket.cpp
Fixes for races & deadlocks shown up by new tests & asserts.
Mostly shutdown/close issues.
M src/qpid/client/ConnectionHandler.h
M src/qpid/client/ConnectionImpl.cpp
M src/qpid/client/Demux.h
M src/qpid/client/SessionCore.cpp
M src/qpid/client/ConnectionHandler.cpp
M src/qpid/client/Connector.h
M src/qpid/client/Demux.cpp
M src/qpid/client/Dispatcher.cpp
M src/qpid/client/ConnectionImpl.h
Logging peer address.
M src/qpid/sys/AsynchIOAcceptor.cpp
Removed:
incubator/qpid/trunk/qpid/cpp/src/tests/APRBaseTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Condition.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Thread.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h
incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h
incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/quick_perftest
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Thu Jan
24 14:26:12 2008
@@ -142,7 +142,7 @@
void ConnectionHandler::fail(const std::string& message)
{
- QPID_LOG(error, message);
+ QPID_LOG(warning, message);
setState(FAILED);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h Thu Jan
24 14:26:12 2008
@@ -59,7 +59,6 @@
void send(const framing::AMQBody& body);
void error(uint16_t code, const std::string& message, uint16_t classId =
0, uint16_t methodId = 0);
void error(uint16_t code, const std::string& message, framing::AMQBody*
body);
- void fail(const std::string& message);
public:
using InputHandler::handle;
@@ -75,6 +74,7 @@
void waitForOpen();
void close();
+ void fail(const std::string& message);
CloseListener onClose;
ErrorListener onError;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu Jan 24
14:26:12 2008
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/log/Statement.h"
#include "qpid/framing/constants.h"
#include "qpid/framing/reply_exceptions.h"
@@ -44,14 +45,18 @@
connector->setShutdownHandler(this);
}
-ConnectionImpl::~ConnectionImpl() { close(); }
+ConnectionImpl::~ConnectionImpl() {
+ // Important to close the connector first, to ensure the
+ // connector thread does not call on us while the destructor
+ // is running.
+ connector->close();
+}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
{
Mutex::ScopedLock l(lock);
boost::weak_ptr<SessionCore>& s = sessions[session->getChannel()];
- if (s.lock())
- throw ChannelBusyException();
+ if (s.lock()) throw ChannelBusyException();
s = session;
}
@@ -81,31 +86,15 @@
handler.pwd = pwd;
handler.vhost = vhost;
+ QPID_LOG(info, "Connecting to " << host << ":" << port);
connector->connect(host, port);
connector->init();
handler.waitForOpen();
}
-bool ConnectionImpl::setClosing()
-{
- Mutex::ScopedLock l(lock);
- if (isClosing || isClosed) {
- return false;
- }
- isClosing = true;
- return true;
-}
-
-void ConnectionImpl::close()
-{
- if (setClosing()) {
- handler.close();
- }
-}
-
void ConnectionImpl::idleIn()
{
- connector->close();
+ close();
}
void ConnectionImpl::idleOut()
@@ -114,35 +103,52 @@
connector->send(frame);
}
-template <class F>
-void ConnectionImpl::forChannels(F functor) {
- for (SessionMap::iterator i = sessions.begin();
- i != sessions.end(); ++i) {
- try {
- boost::shared_ptr<SessionCore> s = i->second.lock();
- if (s) functor(*s);
- } catch (...) { assert(0); }
+void ConnectionImpl::close()
+{
+ Mutex::ScopedLock l(lock);
+ if (isClosing || isClosed) return;
+ isClosing = true;
+ {
+ Mutex::ScopedUnlock u(lock);
+ handler.close();
+ }
+ closed(REPLY_SUCCESS, "Closed by client");
+}
+
+// Set closed flags and erase the sessions map, but keep the contents
+// so sessions can be updated outside the lock.
+ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const
Mutex::ScopedLock&) {
+ isClosed = true;
+ connector->close();
+ SessionVector save;
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ boost::shared_ptr<SessionCore> s = i->second.lock();
+ if (s) save.push_back(s);
}
+ sessions.clear();
+ return save;
}
-void ConnectionImpl::shutdown()
+void ConnectionImpl::closed(uint16_t code, const std::string& text)
{
Mutex::ScopedLock l(lock);
if (isClosed) return;
- forChannels(boost::bind(&SessionCore::connectionBroke, _1,
- INTERNAL_ERROR, "Unexpected socket closure."));
- sessions.clear();
- isClosed = true;
+ SessionVector save(closeInternal(l));
+ Mutex::ScopedUnlock u(lock);
+ std::for_each(save.begin(), save.end(),
boost::bind(&SessionCore::connectionClosed, _1, code, text));
}
-void ConnectionImpl::closed(uint16_t code, const std::string& text)
+static const std::string CONN_CLOSED("Connection closed by broker");
+
+void ConnectionImpl::shutdown()
{
Mutex::ScopedLock l(lock);
if (isClosed) return;
- forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text));
- sessions.clear();
- isClosed = true;
- connector->close();
+ SessionVector save(closeInternal(l));
+ handler.fail(CONN_CLOSED);
+ Mutex::ScopedUnlock u(lock);
+ std::for_each(save.begin(), save.end(),
+ boost::bind(&SessionCore::connectionBroke, _1,
INTERNAL_ERROR, CONN_CLOSED));
}
void ConnectionImpl::erase(uint16_t ch) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Thu Jan 24
14:26:12 2008
@@ -43,6 +43,8 @@
{
typedef std::map<uint16_t, boost::weak_ptr<SessionCore> > SessionMap;
+ typedef std::vector<boost::shared_ptr<SessionCore> > SessionVector;
+
SessionMap sessions;
ConnectionHandler handler;
boost::shared_ptr<Connector> connector;
@@ -51,14 +53,15 @@
bool isClosed;
bool isClosing;
+ template <class F> void detachAll(const F&);
+
+ SessionVector closeInternal(const sys::Mutex::ScopedLock&);
void incoming(framing::AMQFrame& frame);
void closed(uint16_t, const std::string&);
void idleOut();
void idleIn();
void shutdown();
bool setClosing();
-
- template <class F> void forChannels(F functor);
public:
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Thu Jan 24
14:26:12 2008
@@ -27,7 +27,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
-
+#include "qpid/Msg.h"
#include <boost/bind.hpp>
namespace qpid {
@@ -43,6 +43,7 @@
send_buffer_size(buffer_size),
version(ver),
closed(true),
+ joined(true),
timeout(0),
idleIn(0), idleOut(0),
timeoutHandler(0),
@@ -52,11 +53,11 @@
Connector::~Connector() {
close();
- if (receiver.id() && receiver.id() != Thread::current().id())
- receiver.join();
}
void Connector::connect(const std::string& host, int port){
+ Mutex::ScopedLock l(closedLock);
+ assert(closed);
socket.connect(host, port);
closed = false;
poller = Poller::shared_ptr(new Poller);
@@ -71,20 +72,27 @@
}
void Connector::init(){
+ Mutex::ScopedLock l(closedLock);
+ assert(joined);
ProtocolInitiation init(version);
-
writeDataBlock(init);
+ joined = false;
receiver = Thread(this);
}
bool Connector::closeInternal() {
Mutex::ScopedLock l(closedLock);
+ bool ret = !closed;
if (!closed) {
- poller->shutdown();
closed = true;
- return true;
+ poller->shutdown();
+ }
+ if (!joined && receiver.id() != Thread::current().id()) {
+ joined = true;
+ Mutex::ScopedUnlock u(closedLock);
+ receiver.join();
}
- return false;
+ return ret;
}
void Connector::close() {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Thu Jan 24
14:26:12 2008
@@ -77,8 +77,9 @@
const int send_buffer_size;
framing::ProtocolVersion version;
- bool closed;
sys::Mutex closedLock;
+ bool closed;
+ bool joined;
sys::AbsTime lastIn;
sys::AbsTime lastOut;
@@ -112,6 +113,8 @@
void writebuff(qpid::sys::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);
+
+ std::string identifier;
friend class Channel;
@@ -130,6 +133,7 @@
virtual void send(framing::AMQFrame& frame);
virtual void setReadTimeout(uint16_t timeout);
virtual void setWriteTimeout(uint16_t timeout);
+ const std::string& getIdentifier() const { return identifier; }
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp Thu Jan 24 14:26:12
2008
@@ -45,6 +45,10 @@
demuxer.remove(dest);
}
+Demux::Demux() : defaultQueue(new Queue()) {}
+
+Demux::~Demux() { close(); }
+
Demux::QueuePtr ScopedDivert::getQueue()
{
return queue;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h Thu Jan 24 14:26:12
2008
@@ -47,7 +47,8 @@
typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue;
typedef boost::shared_ptr<Queue> QueuePtr;
- Demux() : defaultQueue(new Queue()) {}
+ Demux();
+ ~Demux();
void handle(framing::FrameSet::shared_ptr);
void close();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Thu Jan 24
14:26:12 2008
@@ -62,13 +62,12 @@
}
void Dispatcher::run()
-{
+{
Mutex::ScopedLock l(lock);
if (running)
throw Exception("Dispatcher is already running.");
boost::state_saver<bool> reset(running); // Reset to false on exit.
running = true;
- queue->open();
try {
while (!queue->isClosed()) {
Mutex::ScopedUnlock u(lock);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Thu Jan 24
14:26:12 2008
@@ -125,7 +125,7 @@
channel.next = 0;
code=c;
text=t;
- l3.getDemux().close();
+ l3.getDemux().close();
}
void SessionCore::doClose(int code, const std::string& text) {
@@ -270,7 +270,6 @@
Lock l(state);
check(state == SUSPENDING,
COMMAND_INVALID, UNEXPECTED_SESSION_DETACHED);
- connection->erase(channel);
doSuspend(REPLY_SUCCESS, OK);
}
@@ -379,22 +378,28 @@
// Network thread.
void SessionCore::handleIn(AMQFrame& frame) {
+ ConnectionImpl::shared_ptr save;
{
Lock l(state);
+ save=connection;
// Ignore frames received while closing other than closed response.
if (state==CLOSING && !isCloseResponse(frame))
return;
}
try {
// Cast to expose private SessionHandler functions.
- if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+ if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+ // If we were detached by a session command, tell the connection.
+ if (!connection) save->erase(channel);
+ }
+ else {
session->received(frame);
l3.handle(frame);
}
} catch (const ChannelException& e) {
QPID_LOG(error, "Channel exception:" << e.what());
doClose(e.code, e.what());
- }
+ }
}
void SessionCore::handleOut(AMQFrame& frame)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp Thu Jan 24 14:26:12
2008
@@ -114,7 +114,7 @@
if (flags&LEVEL)
os << LevelTraits::name(s.level) << " ";
if (flags&THREAD)
- os << "[" << hex << qpid::sys::Thread::logId() << "] ";
+ os << "[" << qpid::sys::Thread::logId() << "] ";
if (flags&FILE)
os << s.file << ":";
if (flags&LINE)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp Thu Jan 24 14:26:12
2008
@@ -26,7 +26,7 @@
using namespace std;
Options::Options(const std::string& name) : qpid::Options(name),
- time(true), level(true), thread(false), source(false), function(false),
trace(false)
+ time(true), level(true),
thread(false), source(false), function(false), trace(false)
{
outputs.push_back("stderr");
selectors.push_back("error+");
@@ -43,14 +43,14 @@
("trace,t", optValue(trace), "Enables all logging" )
("log-enable", optValue(selectors, "RULE"),
("Enables logging for selected levels and components. "
- "RULE is in the form 'LEVEL[+][:PATTERN]' "
- "Levels are one of: \n\t "+levels.str()+"\n"
- "For example:\n"
- "\t'--log-enable warning+' "
- "logs all warning, error and critical messages.\n"
- "\t'--log-enable debug:framing' "
+ "RULE is in the form 'LEVEL[+][:PATTERN]' "
+ "Levels are one of: \n\t "+levels.str()+"\n"
+ "For example:\n"
+ "\t'--log-enable warning+' "
+ "logs all warning, error and critical messages.\n"
+ "\t'--log-enable debug:framing' "
"logs debug messages from the framing namespace. "
- "This option can be used multiple times").c_str())
+ "This option can be used multiple times").c_str())
("log-time", optValue(time, "yes|no"),
"Include time in log messages")
("log-level", optValue(level,"yes|no"),
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Thu Jan 24
14:26:12 2008
@@ -110,6 +110,7 @@
void init(AsynchIO* a, ConnectionInputHandler* h) {
aio = a;
inputHandler = h;
+ identifier = aio->getSocket().getPeerAddress();
}
// Output side
@@ -229,7 +230,6 @@
}else{
framing::ProtocolInitiation protocolInit;
if(protocolInit.decode(in)){
- identifier = aio->getSocket().getPeerAddress();
QPID_LOG(debug, "INIT [" << identifier << "]");
inputHandler->initiated(protocolInit);
initiated = true;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Thu Jan 24 14:26:12 2008
@@ -97,6 +97,8 @@
int read(void *buf, size_t count) const;
int write(const void *buf, size_t count) const;
+ int toFd() const;
+
private:
Socket(SocketPrivate*);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Condition.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Condition.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Condition.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Condition.h Thu Jan 24
14:26:12 2008
@@ -52,15 +52,15 @@
};
Condition::Condition() {
- QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_cond_init(&condition, 0));
}
Condition::~Condition() {
- QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_cond_destroy(&condition));
}
void Condition::wait(Mutex& mutex) {
- QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex));
}
bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){
@@ -75,11 +75,11 @@
}
void Condition::notify(){
- QPID_POSIX_THROW_IF(pthread_cond_signal(&condition));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_cond_signal(&condition));
}
void Condition::notifyAll(){
- QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_cond_broadcast(&condition));
}
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h Thu Jan 24
14:26:12 2008
@@ -136,11 +136,11 @@
#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER }
void PODMutex::lock() {
- QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex));
}
void PODMutex::unlock() {
- QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex));
}
bool PODMutex::trylock() {
@@ -148,19 +148,19 @@
}
Mutex::Mutex() {
- QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, recursiveMutexattr));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_init(&mutex, recursiveMutexattr));
}
Mutex::~Mutex(){
- QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_destroy(&mutex));
}
void Mutex::lock() {
- QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex));
}
void Mutex::unlock() {
- QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex));
}
bool Mutex::trylock() {
@@ -169,31 +169,31 @@
RWlock::RWlock() {
- QPID_POSIX_THROW_IF(pthread_rwlock_init(&rwlock, recursiveRWlockattr));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_init(&rwlock,
recursiveRWlockattr));
}
RWlock::~RWlock(){
- QPID_POSIX_THROW_IF(pthread_rwlock_destroy(&rwlock));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_destroy(&rwlock));
}
void RWlock::wlock() {
- QPID_POSIX_THROW_IF(pthread_rwlock_wrlock(&rwlock));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_wrlock(&rwlock));
}
void RWlock::rlock() {
- QPID_POSIX_THROW_IF(pthread_rwlock_rdlock(&rwlock));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_rdlock(&rwlock));
}
void RWlock::unlock() {
- QPID_POSIX_THROW_IF(pthread_rwlock_unlock(&rwlock));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_unlock(&rwlock));
}
void RWlock::trywlock() {
- QPID_POSIX_THROW_IF(pthread_rwlock_trywrlock(&rwlock));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_trywrlock(&rwlock));
}
void RWlock::tryrlock() {
- QPID_POSIX_THROW_IF(pthread_rwlock_tryrdlock(&rwlock));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_tryrdlock(&rwlock));
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Thu Jan 24
14:26:12 2008
@@ -231,6 +231,10 @@
return impl->getName(false, true);
}
+int Socket::toFd() const {
+ return impl->fd;
+}
+
int toFd(const SocketPrivate* s)
{
return s->fd;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Thread.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Thread.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Thread.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Thread.h Thu Jan 24
14:26:12 2008
@@ -60,16 +60,16 @@
Thread::Thread() : thread(0) {}
Thread::Thread(Runnable* runnable) {
- QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_create(&thread, NULL, runRunnable,
runnable));
}
Thread::Thread(Runnable& runnable) {
- QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_create(&thread, NULL, runRunnable,
&runnable));
}
void Thread::join(){
if (thread != 0)
- QPID_POSIX_THROW_IF(pthread_join(thread, 0));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_join(thread, 0));
}
long Thread::id() {
@@ -84,7 +84,7 @@
void Thread::yield()
{
- QPID_POSIX_THROW_IF(pthread_yield());
+ QPID_POSIX_ASSERT_THROW_IF(pthread_yield());
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h Thu Jan 24
14:26:12 2008
@@ -23,16 +23,26 @@
*/
#include "qpid/Exception.h"
-
#include <cerrno>
+#include <assert.h>
-#define QPID_POSIX_ERROR(ERRNO)
qpid::Exception(QPID_MSG(qpid::strError(ERRNO)));
+#define QPID_POSIX_ERROR(ERRNO)
qpid::Exception(QPID_MSG(qpid::strError(ERRNO)))
/** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */
#define QPID_POSIX_CHECK(RESULT) \
if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno))
-/** Throw a posix error if errNo is non-zero */
+/** Throw a posix error if ERRNO is non-zero */
#define QPID_POSIX_THROW_IF(ERRNO) \
- if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO))
+ do { int e=(ERRNO); if (e) throw QPID_POSIX_ERROR(e); } while(0)
+
+/** Same as _THROW_IF in a release build, but abort a debug build */
+#ifdef NDEBUG
+#define QPID_POSIX_ASSERT_THROW_IF(ERRNO) qpid_posix \
+ QPID_POSIX_THROW_IF(ERRNO)
+#else
+#define QPID_POSIX_ASSERT_THROW_IF(ERRNO) \
+ do { int e=(ERRNO); if (e) { errno=e; perror(0); assert(0); } } while(0)
+#endif
+
#endif /*!_posix_check_h*/
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Thu Jan 24 14:26:12
2008
@@ -22,6 +22,7 @@
*
*/
+#include "SocketProxy.h"
#include "qpid/sys/Thread.h"
#include "qpid/broker/Broker.h"
#include "qpid/client/Connection.h"
@@ -30,52 +31,74 @@
#include "qpid/client/SubscriptionManager.h"
/**
- * A fixture to create an in-process broker and connect to it for tests.
+ * A fixture with an in-process broker.
*/
-struct BrokerFixture {
+struct BrokerFixture {
typedef qpid::broker::Broker Broker;
typedef boost::shared_ptr<Broker> BrokerPtr;
- struct OpenConnection : public qpid::client::Connection {
- OpenConnection(int port) { open("localhost", port); }
- };
-
BrokerPtr broker;
qpid::sys::Thread brokerThread;
- OpenConnection connection;
- qpid::client::Session_0_10 session;
- qpid::client::SubscriptionManager subs;
- qpid::client::LocalQueue lq;
-
- BrokerPtr newBroker() {
+
+ BrokerFixture() {
Broker::Options opts;
opts.port=0;
opts.workerThreads=1;
- BrokerPtr b=Broker::create(opts);
- // TODO aconway 2007-12-05: Without the following line
- // the test can hang in the connection ctor. This is
- // a race condition that should be fixed.
- b->getPort();
- return b;
+ broker = Broker::create(opts);
+ // TODO aconway 2007-12-05: At one point BrokerFixture
+ // tests could hang in Connection ctor if the following
+ // line is removed. This may not be an issue anymore.
+ broker->getPort();
+ brokerThread = qpid::sys::Thread(*broker);
};
- BrokerFixture() : broker(newBroker()),
- brokerThread(*broker),
- connection(broker->getPort()),
- session(connection.newSession()),
- subs(session)
- {}
-
~BrokerFixture() {
- connection.close();
broker->shutdown();
brokerThread.join();
}
- /** Open a connection to the local broker */
+ /** Open a connection to the broker. */
void open(qpid::client::Connection& c) {
c.open("localhost", broker->getPort());
}
};
+
+struct LocalConnection : public qpid::client::Connection {
+ LocalConnection(uint16_t port) { open("localhost", port); }
+};
+
+/** A local client connection via a socket proxy. */
+struct ProxyConnection : public qpid::client::Connection {
+ SocketProxy proxy;
+ ProxyConnection(int brokerPort) : proxy(brokerPort) {
+ open("localhost", proxy.getPort());
+ }
+ ~ProxyConnection() { close(); }
+};
+
+/**
+ * A BrokerFixture with open Connection, Session and
+ * SubscriptionManager and LocalQueue for convenience.
+ */
+template <class ConnectionType>
+struct SessionFixtureT : BrokerFixture {
+ ConnectionType connection;
+ qpid::client::Session_0_10 session;
+ qpid::client::SubscriptionManager subs;
+ qpid::client::LocalQueue lq;
+
+ SessionFixtureT() : connection(broker->getPort()),
+ session(connection.newSession()),
+ subs(session)
+ {}
+
+ ~SessionFixtureT() {
+ connection.close();
+ }
+};
+
+typedef SessionFixtureT<LocalConnection> SessionFixture;
+typedef SessionFixtureT<ProxyConnection> ProxySessionFixture;
+
#endif /*!TESTS_BROKERFIXTURE_H*/
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp Thu Jan 24
14:26:12 2008
@@ -44,7 +44,7 @@
* The test base defines the tests methods, derived classes
* instantiate the channel in Basic or Message mode.
*/
-class ChannelTestBase : public CppUnit::TestCase, public BrokerFixture
+class ChannelTestBase : public CppUnit::TestCase, public SessionFixture
{
struct Listener: public qpid::client::MessageListener {
vector<Message> messages;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Thu Jan 24
14:26:12 2008
@@ -20,7 +20,6 @@
*/
#include "qpid_test_plugin.h"
#include "BrokerFixture.h"
-#include "SocketProxy.h"
#include "qpid/client/Dispatcher.h"
#include "qpid/client/Session_0_10.h"
#include "qpid/framing/TransferContent.h"
@@ -62,7 +61,7 @@
}
};
-class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture
+class ClientSessionTest : public CppUnit::TestCase, public ProxySessionFixture
{
CPPUNIT_TEST_SUITE(ClientSessionTest);
CPPUNIT_TEST(testQueueQuery);
@@ -71,7 +70,6 @@
CPPUNIT_TEST(testResumeExpiredError);
CPPUNIT_TEST(testUseSuspendedError);
CPPUNIT_TEST(testSuspendResume);
- CPPUNIT_TEST(testDisconnectResume);
CPPUNIT_TEST_SUITE_END();
public:
@@ -85,11 +83,6 @@
session.messageFlow(destination=dest, unit=1, value=0xFFFFFFFF);//bytes
}
- bool queueExists(const std::string& q) {
- TypedResult<QueueQueryResult> result = session.queueQuery(q);
- return result.get().getQueue() == q;
- }
-
void testQueueQuery()
{
session =connection.newSession();
@@ -166,25 +159,10 @@
declareSubscribe();
session.suspend();
// Make sure we are still subscribed after resume.
- connection.resume(session);
+ connection.resume(session);
session.messageTransfer(content=TransferContent("my-message",
"my-queue"));
FrameSet::shared_ptr msg = session.get();
CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent());
- }
-
- void testDisconnectResume() {
- // FIXME aconway 2007-12-11: Test hanging.
-// ProxyConnection c(broker->getPort());
-// Session_0_10 s = c.session;
-// s.queueDeclare(queue="before");
-// CPPUNIT_ASSERT(queueExists("before"));
-// s.queueDeclare(queue=string("after"));
-// c.proxy.client.close(); // Disconnect the client.
-// Connection c2;
-// open(c2);
-// c2.resume(s);
-// CPPUNIT_ASSERT(queueExists("after"));
-// c2.close();
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Jan 24 14:26:12 2008
@@ -122,7 +122,6 @@
topictest \
.valgrind.supp \
.valgrindrc \
- InProcessBroker.h \
MessageUtils.h \
MockChannel.h
\
MockConnectionInputHandler.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h Thu Jan 24 14:26:12
2008
@@ -24,59 +24,141 @@
#include "qpid/sys/Socket.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/client/Connection.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
/**
- * A simple socket proxy that forwards to another socket. Used between
- * client & broker to simulate network failures.
+ * A simple socket proxy that forwards to another socket.
+ * Used between client & local broker to simulate network failures.
*/
-struct SocketProxy : public qpid::sys::Runnable
+class SocketProxy : private qpid::sys::Runnable
{
- int port; // Port bound to server socket.
- qpid::sys::Socket client, server; // Client & server sockets.
+ public:
+ /** Connect to connectPort on host, start a forwarding thread.
+ * Listen for connection on getPort().
+ */
+ SocketProxy(int connectPort, const std::string host="localhost")
+ : closed(false), port(listener.listen())
+ {
+ int r=::pipe(closePipe);
+ if (r<0) throwErrno(QPID_MSG("::pipe returned " << r));
+ client.connect(host, connectPort);
+ thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
+ }
+
+ ~SocketProxy() { close(); }
+
+ /** Simulate a network disconnect. */
+ void close() {
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (closed) return;
+ closed=true;
+ }
+ write(closePipe[1], this, 1); // Random byte to closePipe
+ thread.join();
+ client.close();
+ ::close(closePipe[0]);
+ ::close(closePipe[1]);
+ }
- SocketProxy(const std::string& host, int port) { init(host,port); }
- SocketProxy(int port) { init("localhost",port); }
+ bool isClosed() const {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return closed;
+ }
- ~SocketProxy() { client.close(); server.close(); thread.join(); }
+ uint16_t getPort() const { return port; }
private:
-
- void init(const std::string& host, int connectPort) {
- client.connect(host, connectPort);
- port = server.listen();
- thread=qpid::sys::Thread(this);
+ static void throwErrno(const std::string& msg) {
+ throw qpid::Exception(msg+":"+qpid::strError(errno));
}
-
- void run() {
- try {
- do {
- ssize_t recv = server.recv(buffer, sizeof(buffer));
- if (recv <= 0) return;
- ssize_t sent=client.send(buffer, recv);
- if (sent < 0) return;
- assert(sent == recv); // Assumes we can send as we receive.
- } while (true);
- } catch(...) {}
+ static void throwIf(bool condition, const std::string& msg) {
+ if (condition) throw qpid::Exception(msg);
}
+
+ struct FdSet : fd_set {
+ FdSet() : maxFd(0) { clear(); }
+ void clear() { FD_ZERO(this); }
+ void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); }
+ bool isSet(int fd) const { return FD_ISSET(fd, this); }
+ bool operator[](int fd) const { return isSet(fd); }
- qpid::sys::Thread thread;
- char buffer[64*1024];
-};
+ int maxFd;
+ };
-/** A local client connection via a socket proxy. */
-struct ProxyConnection : public qpid::client::Connection {
- SocketProxy proxy;
- qpid::client::Session_0_10 session;
+ enum { RD=1, WR=2, ER=4 };
- ProxyConnection(const std::string& host, int port) : proxy(port) {
- open(host, proxy.port);
- session=newSession();
- }
+ struct Selector {
+ FdSet rd, wr, er;
- ProxyConnection(int port) : proxy(port) {
- open("localhost", proxy.port);
- session=newSession();
+ void set(int fd, int sets) {
+ if (sets & RD) rd.set(fd);
+ if (sets & WR) wr.set(fd);
+ if (sets & ER) er.set(fd);
+ }
+
+ int select() {
+ for (;;) {
+ int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd));
+ int r = ::select(maxFd + 1, &rd, &wr, &er, NULL);
+ if (r == -1 && errno == EINTR) continue;
+ if (r < 0) throwErrno(QPID_MSG("select returned " <<r));
+ return r;
+ }
+ }
+ };
+
+ void run() {
+ std::auto_ptr<qpid::sys::Socket> server;
+ try {
+ // Accept incoming connections, watch closePipe.
+ Selector accept;
+ accept.set(listener.toFd(), RD|ER);
+ accept.set(closePipe[0], RD|ER);
+ accept.select();
+ throwIf(accept.rd[closePipe[0]], "Closed by close()");
+ throwIf(!accept.rd[listener.toFd()],"Accept failed");
+ server.reset(listener.accept(0, 0));
+
+ // Pump data between client & server sockets, watch closePipe.
+ char buffer[1024];
+ for (;;) {
+ Selector select;
+ select.set(server->toFd(), RD|ER);
+ select.set(client.toFd(), RD|ER);
+ select.set(closePipe[0], RD|ER);
+ select.select();
+ throwIf(select.rd[closePipe[0]], "Closed by close()");
+ // Read even if fd is in error to throw a useful exception.
+ bool gotData=false;
+ if (select.rd[server->toFd()] || select.er[server->toFd()]) {
+ client.write(buffer, server->read(buffer, sizeof(buffer)));
+ gotData=true;
+ }
+ if (select.rd[client.toFd()] || select.er[client.toFd()]) {
+ server->write(buffer, client.read(buffer, sizeof(buffer)));
+ gotData=true;
+ }
+ throwIf(!gotData, "No data from select()");
+ }
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(debug, "SocketProxy::run exiting: " << e.what());
+ }
+ if (server.get()) server->close();
+ close();
}
+
+ mutable qpid::sys::Mutex lock;
+ bool closed;
+ qpid::sys::Socket client, listener;
+ uint16_t port;
+ int closePipe[2];
+ qpid::sys::Thread thread;
};
#endif
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Thu Jan 24
14:26:12 2008
@@ -21,7 +21,6 @@
#include "unit_test.h"
#include "BrokerFixture.h"
-#include "SocketProxy.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
@@ -52,7 +51,7 @@
try { f(); }
catch(const Ex& e) {
caught=true;
- BOOST_MESSAGE(e.what());
+ BOOST_MESSAGE(string("Caught expected exception: ")+e.what());
}
catch(const std::exception& e) {
BOOST_ERROR(string("Bad exception: ")+e.what());
@@ -71,37 +70,29 @@
}
};
-// FIXME aconway 2007-12-11: Disabled hanging tests.
-// BOOST_FIXTURE_TEST_CASE(DisconnectedGet, BrokerFixture) {
-// ProxyConnection c(broker->getPort());
-// Catcher<ClosedException> get(bind(&Session_0_10::get, c.session));
-// c.proxy.client.close(); // Close the client side.
-// BOOST_CHECK(get.join());
-// }
-
-// BOOST_FIXTURE_TEST_CASE(DisconnectedPop, BrokerFixture) {
-// ProxyConnection c(broker->getPort());
-// c.session.queueDeclare(arg::queue="q");
-// subs.subscribe(lq, "q");
-// Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq)));
-// c.proxy.client.close();
-// BOOST_CHECK(pop.join());
-// }
-
-// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, BrokerFixture) {
-// struct NullListener : public MessageListener {
-// void received(Message&) { BOOST_FAIL("Unexpected message"); }
-// } l;
-// ProxyConnection c(broker->getPort());
-// c.session.queueDeclare(arg::queue="q");
-// subs.subscribe(l, "q");
-// Thread t(subs);
-// c.proxy.client.close();
-// t.join();
-// BOOST_CHECK_THROW(c.session.close(), InternalErrorException);
-// }
+BOOST_FIXTURE_TEST_CASE(DisconnectedPop, ProxySessionFixture) {
+ ProxyConnection c(broker->getPort());
+ session.queueDeclare(arg::queue="q");
+ subs.subscribe(lq, "q");
+ Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq)));
+ connection.proxy.close();
+ BOOST_CHECK(pop.join());
+}
+
+BOOST_FIXTURE_TEST_CASE(DisconnectedListen, ProxySessionFixture) {
+ struct NullListener : public MessageListener {
+ void received(Message&) { BOOST_FAIL("Unexpected message"); }
+ } l;
+ ProxyConnection c(broker->getPort());
+ session.queueDeclare(arg::queue="q");
+ subs.subscribe(l, "q");
+ Thread t(subs);
+ connection.proxy.close();
+ t.join();
+ BOOST_CHECK_THROW(session.close(), InternalErrorException);
+}
-BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, BrokerFixture) {
+BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) {
BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(),
NotFoundException);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/quick_perftest
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/quick_perftest?rev=615063&r1=615062&r2=615063&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/quick_perftest (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/quick_perftest Thu Jan 24 14:26:12
2008
@@ -1,2 +1,2 @@
#!/bin/sh
-exec `dirname $0`/run_test ./perftest --summary --count 1000
+exec `dirname $0`/run_test ./perftest --summary --count 100