Kevin Smith wrote:
Gordon Sim wrote:
Kevin Smith wrote:
Gordon Sim wrote:
Kevin Smith wrote:
Thanks for the update! If there are any patches for this against
M2, I'd be willing to help test.
Fantastic! We are aiming to fix the M2 branch first and if you could
re-run your test at that time to ensure the problem is fixed that
would be much appreciated.
I've hacked together a fix against M2 which seems to address the
problems I was seeing. I could post my patch to the list depending on
how soon a fix is expected against M2. My C++ is pretty rusty, so I
make no guarantees about code correctness or quality :)
Yes, I'd be interested in seeing the patch since you have it anyway!
And I spoke waaaay to soon :-)
My C++ is rustier than I originally thought given the sorry report I
just got from valgrind. I'll post something as soon as I get it to quit
leaking like a sieve...
--Kevin
Attached is my attempt at fixing this problem. There are, no doubt, better ways
to do this. This is just the way I hacked to fix the
fall-over-with-three-clients problem I was having.
My patch does three things:
1) Modifies the APRPool class to use alloc/free semantics for APR memory pools.
Each time a caller calls APRPool::get() they'll their own pool reference. I've
fixed up all the call sites I can find to also call APRPool::free() at the
appropriate time.
2) Caches freed APR memory pools in a STL stack. This cuts down on the number of
memory pools created overall.
3) As a result of doing #1 and #2 I've introduced a guard mutex around
APRPool::get() and APRPool::free(). This is to prevent concurrent access to the
memory pool cache. If it's too heavyweight, the mutex along with the caching
mechanism could be removed entirely.
Like I said before, my C++ is very rusty so I make no guarantees about code
correctness or quality. I've fixed the leaks I could find (and understand) from
valgrind so I _think_ this isn't any leakier than the code was before my patch.
I've run a mini-stress test (10 concurrent clients sending messages via a topic
exchange) multiple times and haven't been able to get the broker to fall over.
Memory usage seems pretty stable once the broker is up and all clients have
connected.
--Kevin
Index: lib/broker/QueueRegistry.cpp
===================================================================
--- lib/broker/QueueRegistry.cpp (revision 540150)
+++ lib/broker/QueueRegistry.cpp (working copy)
@@ -28,7 +28,9 @@
QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
-QueueRegistry::~QueueRegistry(){}
+QueueRegistry::~QueueRegistry()
+{
+}
std::pair<Queue::shared_ptr, bool>
QueueRegistry::declare(const string& declareName, bool durable,
Index: lib/common/sys/apr/LFSessionContext.h
===================================================================
--- lib/common/sys/apr/LFSessionContext.h (revision 540150)
+++ lib/common/sys/apr/LFSessionContext.h (working copy)
@@ -66,7 +66,7 @@
public:
- LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
+ LFSessionContext(apr_socket_t* socket,
LFProcessor* const processor,
bool debug = false);
virtual ~LFSessionContext();
Index: lib/common/sys/apr/APRPool.h
===================================================================
--- lib/common/sys/apr/APRPool.h (revision 540150)
+++ lib/common/sys/apr/APRPool.h (working copy)
@@ -23,6 +23,8 @@
*/
#include <boost/noncopyable.hpp>
#include <apr_pools.h>
+#include <apr_thread_mutex.h>
+#include <stack>
namespace qpid {
namespace sys {
@@ -33,12 +35,21 @@
public:
APRPool();
~APRPool();
+
+ apr_pool_t* allocate_pool();
+
+ void free_pool(apr_pool_t* pool);
- /** Get singleton instance */
+ /** Allocate pool */
static apr_pool_t* get();
+
+ /** Free pool */
+ static void free(apr_pool_t* pool);
private:
apr_pool_t* pool;
+ apr_thread_mutex_t* poolGuard;
+ std::stack<apr_pool_t*>* allocated_pools;
};
}}
Index: lib/common/sys/apr/LFProcessor.cpp
===================================================================
--- lib/common/sys/apr/LFProcessor.cpp (revision 540150)
+++ lib/common/sys/apr/LFProcessor.cpp (working copy)
@@ -22,6 +22,7 @@
#include <QpidError.h>
#include "LFProcessor.h"
#include "APRBase.h"
+#include "APRPool.h"
#include "LFSessionContext.h"
using namespace qpid::sys;
@@ -30,7 +31,7 @@
// TODO aconway 2006-10-12: stopped is read outside locks.
//
-LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
+LFProcessor::LFProcessor(int _workers, int _size, int _timeout) :
size(_size),
timeout(_timeout),
signalledCount(0),
@@ -41,7 +42,7 @@
workers(new Thread[_workers]),
stopped(false)
{
-
+ pool = APRPool::get();
CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
}
@@ -50,6 +51,7 @@
if (!stopped) stop();
delete[] workers;
CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+ APRPool::free(pool);
}
void LFProcessor::start(){
Index: lib/common/sys/apr/APRAcceptor.cpp
===================================================================
--- lib/common/sys/apr/APRAcceptor.cpp (revision 540150)
+++ lib/common/sys/apr/APRAcceptor.cpp (working copy)
@@ -20,6 +20,7 @@
*/
#include <sys/Acceptor.h>
#include <sys/SessionHandlerFactory.h>
+#include <apr_pools.h>
#include "LFProcessor.h"
#include "LFSessionContext.h"
#include "APRBase.h"
@@ -32,6 +33,7 @@
{
public:
APRAcceptor(int16_t port, int backlog, int threads, bool trace);
+ ~APRAcceptor();
virtual int16_t getPort() const;
virtual void run(qpid::sys::SessionHandlerFactory* factory);
virtual void shutdown();
@@ -46,6 +48,7 @@
apr_socket_t* socket;
volatile bool running;
Mutex shutdownLock;
+ apr_pool_t* pool;
};
// Define generic Acceptor::create() to return APRAcceptor.
@@ -54,16 +57,22 @@
return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace));
}
// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
+Acceptor::~Acceptor() {
+}
- APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
+APRAcceptor::~APRAcceptor() {
+ APRPool::free(pool);
+}
+
+APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
port(port_),
trace(trace_),
- processor(APRPool::get(), threads, 1000, 5000000)
+ processor(threads, 1000, 5000000)
{
+ pool = APRPool::get();
apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
@@ -81,7 +90,7 @@
std::cout << "Listening on port " << getPort() << "..." << std::endl;
while(running){
apr_socket_t* client;
- apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+ apr_status_t status = apr_socket_accept(&client, socket, pool);
if(status == APR_SUCCESS){
//make this socket non-blocking:
CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
@@ -89,7 +98,7 @@
CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
- LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace);
+ LFSessionContext* session = new LFSessionContext(client, &processor, trace);
session->init(factory->create(session));
}else{
Mutex::ScopedLock locker(shutdownLock);
Index: lib/common/sys/apr/LFProcessor.h
===================================================================
--- lib/common/sys/apr/LFProcessor.h (revision 540150)
+++ lib/common/sys/apr/LFProcessor.h (working copy)
@@ -57,6 +57,7 @@
qpid::sys::Mutex countLock;
std::vector<LFSessionContext*> sessions;
volatile bool stopped;
+ apr_pool_t* pool;
const apr_pollfd_t* getNextEvent();
void waitToLead();
@@ -65,7 +66,7 @@
virtual void run();
public:
- LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
+ LFProcessor(int workers, int size, int timeout);
/**
* Add the fd to the poll set. Relies on the client_data being
* an instance of LFSessionContext.
Index: lib/common/sys/apr/Thread.cpp
===================================================================
--- lib/common/sys/apr/Thread.cpp (revision 540150)
+++ lib/common/sys/apr/Thread.cpp (working copy)
@@ -20,6 +20,7 @@
*/
#include <sys/Thread.h>
+#include "APRPool.h"
using namespace qpid::sys;
using qpid::sys::Runnable;
@@ -30,4 +31,7 @@
return NULL;
}
+Thread::~Thread() {
+}
+
Index: lib/common/sys/apr/Socket.cpp
===================================================================
--- lib/common/sys/apr/Socket.cpp (revision 540150)
+++ lib/common/sys/apr/Socket.cpp (working copy)
@@ -30,10 +30,12 @@
Socket Socket::createTcp() {
Socket s;
+ apr_pool_t* pool = APRPool::get();
CHECK_APR_SUCCESS(
apr_socket_create(
&s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
- APRPool::get()));
+ pool));
+ APRPool::free(pool);
return s;
}
@@ -47,11 +49,13 @@
void Socket::connect(const std::string& host, int port) {
apr_sockaddr_t* address;
+ apr_pool_t* pool = APRPool::get();
CHECK_APR_SUCCESS(
apr_sockaddr_info_get(
&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK,
- APRPool::get()));
+ pool));
CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+ APRPool::free(pool);
}
void Socket::close() {
Index: lib/common/sys/apr/LFSessionContext.cpp
===================================================================
--- lib/common/sys/apr/LFSessionContext.cpp (revision 540150)
+++ lib/common/sys/apr/LFSessionContext.cpp (working copy)
@@ -20,6 +20,7 @@
*/
#include "LFSessionContext.h"
#include "APRBase.h"
+#include "APRPool.h"
#include <QpidError.h>
#include <assert.h>
@@ -27,7 +28,7 @@
using namespace qpid::sys;
using namespace qpid::framing;
-LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
+LFSessionContext::LFSessionContext(apr_socket_t* _socket,
LFProcessor* const _processor,
bool _debug) :
debug(_debug),
@@ -40,7 +41,7 @@
closing(false)
{
- fd.p = _pool;
+ fd.p = APRPool::get();
fd.desc_type = APR_POLL_SOCKET;
fd.reqevents = APR_POLLIN;
fd.client_data = this;
@@ -156,6 +157,7 @@
void LFSessionContext::handleClose(){
handler->closed();
+ APRPool::free(fd.p);
std::cout << "Session closed [" << &socket << "]" << std::endl;
delete handler;
delete this;
Index: lib/common/sys/apr/APRPool.cpp
===================================================================
--- lib/common/sys/apr/APRPool.cpp (revision 540150)
+++ lib/common/sys/apr/APRPool.cpp (working copy)
@@ -22,20 +22,57 @@
#include "APRPool.h"
#include "APRBase.h"
#include <boost/pool/detail/singleton.hpp>
+#include <iostream>
+#include <sstream>
+
using namespace qpid::sys;
APRPool::APRPool(){
APRBase::increment();
+ allocated_pools = new std::stack<apr_pool_t*>();
CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&poolGuard, APR_THREAD_MUTEX_NESTED, pool));
}
APRPool::~APRPool(){
+ while(allocated_pools->size() > 0) {
+ apr_pool_t* pool = allocated_pools->top();
+ allocated_pools->pop();
+ apr_pool_destroy(pool);
+ }
apr_pool_destroy(pool);
+ apr_thread_mutex_destroy(poolGuard);
+ delete allocated_pools;
APRBase::decrement();
}
+void APRPool::free_pool(apr_pool_t* pool) {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard));
+ allocated_pools->push(pool);
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard));
+}
+
+apr_pool_t* APRPool::allocate_pool() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard));
+ apr_pool_t* retval;
+ if (allocated_pools->size() == 0) {
+ CHECK_APR_SUCCESS(apr_pool_create(&retval, pool));
+ }
+ else {
+ retval = allocated_pools->top();
+ allocated_pools->pop();
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard));
+ return retval;
+}
+
apr_pool_t* APRPool::get() {
- return boost::details::pool::singleton_default<APRPool>::instance().pool;
+ return
+ boost::details::pool::singleton_default<APRPool>::instance().allocate_pool();
}
+void APRPool::free(apr_pool_t* pool) {
+ boost::details::pool::singleton_default<APRPool>::instance().free_pool(pool);
+}
+
Index: lib/common/sys/Mutex.h
===================================================================
--- lib/common/sys/Mutex.h (revision 540150)
+++ lib/common/sys/Mutex.h (working copy)
@@ -21,8 +21,9 @@
#ifdef USE_APR
# include <apr_thread_mutex.h>
-# include <apr/APRBase.h>
-# include <apr/APRPool.h>
+# include <apr_pools.h>
+# include "apr/APRBase.h"
+# include "apr/APRPool.h"
#else
# include <pthread.h>
# include <posix/check.h>
@@ -62,6 +63,7 @@
protected:
#ifdef USE_APR
apr_thread_mutex_t* mutex;
+ apr_pool_t* pool;
#else
pthread_mutex_t mutex;
#endif
@@ -71,11 +73,13 @@
// APR ================================================================
Mutex::Mutex() {
- CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
+ pool = APRPool::get();
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
}
Mutex::~Mutex(){
CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+ APRPool::free(pool);
}
void Mutex::lock() {
Index: lib/common/sys/Thread.h
===================================================================
--- lib/common/sys/Thread.h (revision 540150)
+++ lib/common/sys/Thread.h (working copy)
@@ -44,11 +44,10 @@
inline static void yield();
inline Thread();
- inline explicit Thread(qpid::sys::Runnable*);
- inline explicit Thread(qpid::sys::Runnable&);
-
+ inline Thread(qpid::sys::Runnable*);
+ inline Thread(qpid::sys::Runnable&);
+ ~Thread();
inline void join();
-
inline long id();
private:
@@ -70,13 +69,17 @@
#ifdef USE_APR
Thread::Thread(Runnable* runnable) {
+ apr_pool_t* tmp_pool = APRPool::get();
CHECK_APR_SUCCESS(
- apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get()));
+ apr_thread_create(&thread, 0, runRunnable, runnable, tmp_pool));
+ APRPool::free(tmp_pool);
}
Thread::Thread(Runnable& runnable) {
+ apr_pool_t* tmp_pool = APRPool::get();
CHECK_APR_SUCCESS(
- apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get()));
+ apr_thread_create(&thread, 0, runRunnable, &runnable, tmp_pool));
+ APRPool::free(tmp_pool);
}
void Thread::join(){
@@ -92,9 +95,11 @@
Thread::Thread(apr_thread_t* t) : thread(t) {}
Thread Thread::current(){
+ apr_pool_t* tmp_pool = APRPool::get();
apr_thread_t* thr;
apr_os_thread_t osthr = apr_os_thread_current();
- CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, tmp_pool));
+ APRPool::free(tmp_pool);
return Thread(thr);
}
@@ -123,6 +128,9 @@
return long(thread);
}
+Thread::~Thread() {
+}
+
Thread::Thread(pthread_t thr) : thread(thr) {}
Thread Thread::current() {
Index: lib/common/sys/posix/Thread.cpp
===================================================================
--- lib/common/sys/posix/Thread.cpp (revision 540150)
+++ lib/common/sys/posix/Thread.cpp (working copy)
@@ -26,3 +26,6 @@
static_cast<Runnable*>(p)->run();
return 0;
}
+
+Thread::~Thread() {
+}
Index: lib/common/sys/Module.h
===================================================================
--- lib/common/sys/Module.h (revision 540150)
+++ lib/common/sys/Module.h (working copy)
@@ -104,7 +104,9 @@
template <class T> void Module<T>::load(const std::string& name)
{
- CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get()));
+ apr_pool_t* pool = APRPool::get();
+ CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), pool));
+ APRPool::free(pool);
}
template <class T> void Module<T>::unload()
Index: lib/common/sys/Monitor.h
===================================================================
--- lib/common/sys/Monitor.h (revision 540150)
+++ lib/common/sys/Monitor.h (working copy)
@@ -60,7 +60,9 @@
#ifdef USE_APR
Monitor::Monitor() {
- CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
+ apr_pool_t* pool = APRPool::get();
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
+ APRPool::free(pool);
}
Monitor::~Monitor() {