Kevin Smith wrote:
Gordon Sim wrote:
Kevin Smith wrote:
Gordon Sim wrote:
Kevin Smith wrote:
Thanks for the update! If there are any patches for this against M2, I'd be willing to help test.

Fantastic! We are aiming to fix the M2 branch first and if you could re-run your test at that time to ensure the problem is fixed that would be much appreciated.

I've hacked together a fix against M2 which seems to address the problems I was seeing. I could post my patch to the list depending on how soon a fix is expected against M2. My C++ is pretty rusty, so I make no guarantees about code correctness or quality :)

Yes, I'd be interested in seeing the patch since you have it anyway!

And I spoke waaaay to soon :-)

My C++ is rustier than I originally thought given the sorry report I just got from valgrind. I'll post something as soon as I get it to quit leaking like a sieve...

--Kevin

Attached is my attempt at fixing this problem. There are, no doubt, better ways to do this. This is just the way I hacked to fix the fall-over-with-three-clients problem I was having.

My patch does three things:

1) Modifies the APRPool class to use alloc/free semantics for APR memory pools. Each time a caller calls APRPool::get() they'll their own pool reference. I've fixed up all the call sites I can find to also call APRPool::free() at the appropriate time.

2) Caches freed APR memory pools in a STL stack. This cuts down on the number of memory pools created overall.

3) As a result of doing #1 and #2 I've introduced a guard mutex around APRPool::get() and APRPool::free(). This is to prevent concurrent access to the memory pool cache. If it's too heavyweight, the mutex along with the caching mechanism could be removed entirely.

Like I said before, my C++ is very rusty so I make no guarantees about code correctness or quality. I've fixed the leaks I could find (and understand) from valgrind so I _think_ this isn't any leakier than the code was before my patch.

I've run a mini-stress test (10 concurrent clients sending messages via a topic exchange) multiple times and haven't been able to get the broker to fall over. Memory usage seems pretty stable once the broker is up and all clients have connected.


--Kevin
Index: lib/broker/QueueRegistry.cpp
===================================================================
--- lib/broker/QueueRegistry.cpp	(revision 540150)
+++ lib/broker/QueueRegistry.cpp	(working copy)
@@ -28,7 +28,9 @@
 
 QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
 
-QueueRegistry::~QueueRegistry(){}
+QueueRegistry::~QueueRegistry()
+{
+}
 
 std::pair<Queue::shared_ptr, bool>
 QueueRegistry::declare(const string& declareName, bool durable, 
Index: lib/common/sys/apr/LFSessionContext.h
===================================================================
--- lib/common/sys/apr/LFSessionContext.h	(revision 540150)
+++ lib/common/sys/apr/LFSessionContext.h	(working copy)
@@ -66,7 +66,7 @@
         
 
   public:
-    LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, 
+    LFSessionContext(apr_socket_t* socket, 
                      LFProcessor* const processor, 
                      bool debug = false);
     virtual ~LFSessionContext();
Index: lib/common/sys/apr/APRPool.h
===================================================================
--- lib/common/sys/apr/APRPool.h	(revision 540150)
+++ lib/common/sys/apr/APRPool.h	(working copy)
@@ -23,6 +23,8 @@
  */
 #include <boost/noncopyable.hpp>
 #include <apr_pools.h>
+#include <apr_thread_mutex.h>
+#include <stack>
 
 namespace qpid {
 namespace sys {
@@ -33,12 +35,21 @@
   public:
     APRPool();
     ~APRPool();
+    
+    apr_pool_t* allocate_pool();
+    
+    void free_pool(apr_pool_t* pool);
 
-    /** Get singleton instance */
+    /** Allocate pool */
     static apr_pool_t* get();
+    
+    /** Free pool */
+    static void free(apr_pool_t* pool);
 
   private:
     apr_pool_t* pool;
+    apr_thread_mutex_t* poolGuard;
+    std::stack<apr_pool_t*>* allocated_pools;
 };
 
 }}
Index: lib/common/sys/apr/LFProcessor.cpp
===================================================================
--- lib/common/sys/apr/LFProcessor.cpp	(revision 540150)
+++ lib/common/sys/apr/LFProcessor.cpp	(working copy)
@@ -22,6 +22,7 @@
 #include <QpidError.h>
 #include "LFProcessor.h"
 #include "APRBase.h"
+#include "APRPool.h"
 #include "LFSessionContext.h"
 
 using namespace qpid::sys;
@@ -30,7 +31,7 @@
 // TODO aconway 2006-10-12: stopped is read outside locks.
 //
 
-LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
+LFProcessor::LFProcessor(int _workers, int _size, int _timeout) :
     size(_size),
     timeout(_timeout), 
     signalledCount(0),
@@ -41,7 +42,7 @@
     workers(new Thread[_workers]),
     stopped(false)
 {
-
+    pool = APRPool::get();
     CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
 }
 
@@ -50,6 +51,7 @@
     if (!stopped) stop();
     delete[] workers;
     CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+    APRPool::free(pool);
 }
 
 void LFProcessor::start(){
Index: lib/common/sys/apr/APRAcceptor.cpp
===================================================================
--- lib/common/sys/apr/APRAcceptor.cpp	(revision 540150)
+++ lib/common/sys/apr/APRAcceptor.cpp	(working copy)
@@ -20,6 +20,7 @@
  */
 #include <sys/Acceptor.h>
 #include <sys/SessionHandlerFactory.h>
+#include <apr_pools.h>
 #include "LFProcessor.h"
 #include "LFSessionContext.h"
 #include "APRBase.h"
@@ -32,6 +33,7 @@
 {
   public:
     APRAcceptor(int16_t port, int backlog, int threads, bool trace);
+    ~APRAcceptor();
     virtual int16_t getPort() const;
     virtual void run(qpid::sys::SessionHandlerFactory* factory);
     virtual void shutdown();
@@ -46,6 +48,7 @@
     apr_socket_t* socket;
     volatile bool running;
     Mutex shutdownLock;
+    apr_pool_t* pool;
 };
 
 // Define generic Acceptor::create() to return APRAcceptor.
@@ -54,16 +57,22 @@
     return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace));
 }
 // Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
+Acceptor::~Acceptor() {
+}
 
-    APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
+APRAcceptor::~APRAcceptor() {
+    APRPool::free(pool);
+}
+
+APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
     port(port_),
     trace(trace_),
-    processor(APRPool::get(), threads, 1000, 5000000)
+    processor(threads, 1000, 5000000)
 {
+    pool = APRPool::get();
     apr_sockaddr_t* address;
-    CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
-    CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+    CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
+    CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
     CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
     CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
     CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
@@ -81,7 +90,7 @@
     std::cout << "Listening on port " << getPort() << "..." << std::endl;
     while(running){
         apr_socket_t* client;
-        apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+        apr_status_t status = apr_socket_accept(&client, socket, pool);
         if(status == APR_SUCCESS){
             //make this socket non-blocking:
             CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
@@ -89,7 +98,7 @@
             CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
             CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
             CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
-            LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace);
+            LFSessionContext* session = new LFSessionContext(client, &processor, trace);
             session->init(factory->create(session));
         }else{
             Mutex::ScopedLock locker(shutdownLock);                
Index: lib/common/sys/apr/LFProcessor.h
===================================================================
--- lib/common/sys/apr/LFProcessor.h	(revision 540150)
+++ lib/common/sys/apr/LFProcessor.h	(working copy)
@@ -57,6 +57,7 @@
         qpid::sys::Mutex countLock;
         std::vector<LFSessionContext*> sessions;
         volatile bool stopped;
+        apr_pool_t* pool;
 
         const apr_pollfd_t* getNextEvent();
         void waitToLead();
@@ -65,7 +66,7 @@
         virtual void run();        
 
     public:
-        LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
+        LFProcessor(int workers, int size, int timeout);
         /**
          * Add the fd to the poll set. Relies on the client_data being
          * an instance of LFSessionContext.
Index: lib/common/sys/apr/Thread.cpp
===================================================================
--- lib/common/sys/apr/Thread.cpp	(revision 540150)
+++ lib/common/sys/apr/Thread.cpp	(working copy)
@@ -20,6 +20,7 @@
  */
 
 #include <sys/Thread.h>
+#include "APRPool.h"
 
 using namespace qpid::sys;
 using qpid::sys::Runnable;
@@ -30,4 +31,7 @@
     return NULL;
 } 
 
+Thread::~Thread() {
+}
 
+
Index: lib/common/sys/apr/Socket.cpp
===================================================================
--- lib/common/sys/apr/Socket.cpp	(revision 540150)
+++ lib/common/sys/apr/Socket.cpp	(working copy)
@@ -30,10 +30,12 @@
 
 Socket Socket::createTcp() {
     Socket s;
+    apr_pool_t* pool = APRPool::get();
     CHECK_APR_SUCCESS(
         apr_socket_create(
             &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
-            APRPool::get()));
+            pool));
+    APRPool::free(pool);
     return s;
 }
 
@@ -47,11 +49,13 @@
 
 void Socket::connect(const std::string& host, int port) {
     apr_sockaddr_t* address;
+    apr_pool_t* pool = APRPool::get();
     CHECK_APR_SUCCESS(
         apr_sockaddr_info_get(
             &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK,
-            APRPool::get()));
+            pool));
     CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+    APRPool::free(pool);
 }
 
 void Socket::close() {
Index: lib/common/sys/apr/LFSessionContext.cpp
===================================================================
--- lib/common/sys/apr/LFSessionContext.cpp	(revision 540150)
+++ lib/common/sys/apr/LFSessionContext.cpp	(working copy)
@@ -20,6 +20,7 @@
  */
 #include "LFSessionContext.h"
 #include "APRBase.h"
+#include "APRPool.h"
 #include <QpidError.h>
 #include <assert.h>
 
@@ -27,7 +28,7 @@
 using namespace qpid::sys;
 using namespace qpid::framing;
 
-LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, 
+LFSessionContext::LFSessionContext(apr_socket_t* _socket, 
                                    LFProcessor* const _processor,
                                    bool _debug) :
     debug(_debug),
@@ -40,7 +41,7 @@
     closing(false)
 {
     
-    fd.p = _pool;
+    fd.p = APRPool::get();
     fd.desc_type = APR_POLL_SOCKET;
     fd.reqevents = APR_POLLIN;
     fd.client_data = this;
@@ -156,6 +157,7 @@
 
 void LFSessionContext::handleClose(){
     handler->closed();
+    APRPool::free(fd.p);
     std::cout << "Session closed [" << &socket << "]" << std::endl;
     delete handler;
     delete this;
Index: lib/common/sys/apr/APRPool.cpp
===================================================================
--- lib/common/sys/apr/APRPool.cpp	(revision 540150)
+++ lib/common/sys/apr/APRPool.cpp	(working copy)
@@ -22,20 +22,57 @@
 #include "APRPool.h"
 #include "APRBase.h"
 #include <boost/pool/detail/singleton.hpp>
+#include <iostream>
+#include <sstream>
 
+
 using namespace qpid::sys;
 
 APRPool::APRPool(){
     APRBase::increment();
+    allocated_pools = new std::stack<apr_pool_t*>();    
     CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+    CHECK_APR_SUCCESS(apr_thread_mutex_create(&poolGuard, APR_THREAD_MUTEX_NESTED, pool));
 }
 
 APRPool::~APRPool(){
+    while(allocated_pools->size() > 0) {
+        apr_pool_t* pool = allocated_pools->top();
+        allocated_pools->pop();
+        apr_pool_destroy(pool);
+    }
     apr_pool_destroy(pool);
+    apr_thread_mutex_destroy(poolGuard);
+    delete allocated_pools;
     APRBase::decrement();
 }
 
+void APRPool::free_pool(apr_pool_t* pool) {
+    CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard));
+    allocated_pools->push(pool);
+    CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard));
+}
+
+apr_pool_t* APRPool::allocate_pool() {
+    CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard));
+    apr_pool_t* retval;
+    if (allocated_pools->size() == 0) {
+        CHECK_APR_SUCCESS(apr_pool_create(&retval, pool));
+    }
+    else {
+        retval = allocated_pools->top();
+        allocated_pools->pop();
+    }
+    CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard));
+    return retval;    
+}
+
 apr_pool_t* APRPool::get() {
-    return boost::details::pool::singleton_default<APRPool>::instance().pool;
+    return 
+        boost::details::pool::singleton_default<APRPool>::instance().allocate_pool(); 
 }
 
+void APRPool::free(apr_pool_t* pool) {    
+    boost::details::pool::singleton_default<APRPool>::instance().free_pool(pool);
+}
+
Index: lib/common/sys/Mutex.h
===================================================================
--- lib/common/sys/Mutex.h	(revision 540150)
+++ lib/common/sys/Mutex.h	(working copy)
@@ -21,8 +21,9 @@
 
 #ifdef USE_APR
 #  include <apr_thread_mutex.h>
-#  include <apr/APRBase.h>
-#  include <apr/APRPool.h>
+#  include <apr_pools.h>
+#  include "apr/APRBase.h"
+#  include "apr/APRPool.h"
 #else
 #  include <pthread.h>
 #  include <posix/check.h>
@@ -62,6 +63,7 @@
   protected:
 #ifdef USE_APR
     apr_thread_mutex_t* mutex;
+    apr_pool_t* pool;
 #else
     pthread_mutex_t mutex;
 #endif
@@ -71,11 +73,13 @@
 // APR ================================================================
 
 Mutex::Mutex() {
-    CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
+    pool = APRPool::get();
+    CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
 }
 
 Mutex::~Mutex(){
     CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+    APRPool::free(pool);    
 }
 
 void Mutex::lock() {
Index: lib/common/sys/Thread.h
===================================================================
--- lib/common/sys/Thread.h	(revision 540150)
+++ lib/common/sys/Thread.h	(working copy)
@@ -44,11 +44,10 @@
     inline static void yield();
 
     inline Thread();
-    inline explicit Thread(qpid::sys::Runnable*);
-    inline explicit Thread(qpid::sys::Runnable&);
-    
+    inline Thread(qpid::sys::Runnable*);
+    inline Thread(qpid::sys::Runnable&);
+    ~Thread();
     inline void join();
-
     inline long id();
         
   private:
@@ -70,13 +69,17 @@
 #ifdef USE_APR
 
 Thread::Thread(Runnable* runnable) {
+    apr_pool_t* tmp_pool = APRPool::get();
     CHECK_APR_SUCCESS(
-        apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get()));
+        apr_thread_create(&thread, 0, runRunnable, runnable, tmp_pool));
+    APRPool::free(tmp_pool);
 }
 
 Thread::Thread(Runnable& runnable) {
+    apr_pool_t* tmp_pool = APRPool::get();
     CHECK_APR_SUCCESS(
-        apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get()));
+        apr_thread_create(&thread, 0, runRunnable, &runnable, tmp_pool));
+    APRPool::free(tmp_pool);
 }
 
 void Thread::join(){
@@ -92,9 +95,11 @@
 Thread::Thread(apr_thread_t* t) : thread(t) {}
 
 Thread Thread::current(){
+    apr_pool_t* tmp_pool = APRPool::get();
     apr_thread_t* thr;
     apr_os_thread_t osthr = apr_os_thread_current();
-    CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get()));
+    CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, tmp_pool));
+    APRPool::free(tmp_pool);
     return Thread(thr);
 }
 
@@ -123,6 +128,9 @@
     return long(thread);
 }
 
+Thread::~Thread() {
+}
+
 Thread::Thread(pthread_t thr) : thread(thr) {}
 
 Thread Thread::current() {
Index: lib/common/sys/posix/Thread.cpp
===================================================================
--- lib/common/sys/posix/Thread.cpp	(revision 540150)
+++ lib/common/sys/posix/Thread.cpp	(working copy)
@@ -26,3 +26,6 @@
     static_cast<Runnable*>(p)->run();
     return 0;
 }
+
+Thread::~Thread() {
+}
Index: lib/common/sys/Module.h
===================================================================
--- lib/common/sys/Module.h	(revision 540150)
+++ lib/common/sys/Module.h	(working copy)
@@ -104,7 +104,9 @@
 
 template <class T> void Module<T>::load(const std::string& name)
 {
-    CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get()));
+    apr_pool_t* pool = APRPool::get();
+    CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), pool));
+    APRPool::free(pool);
 }
 
 template <class T> void Module<T>::unload()
Index: lib/common/sys/Monitor.h
===================================================================
--- lib/common/sys/Monitor.h	(revision 540150)
+++ lib/common/sys/Monitor.h	(working copy)
@@ -60,7 +60,9 @@
 #ifdef USE_APR
 
 Monitor::Monitor() {
-    CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
+    apr_pool_t* pool = APRPool::get();
+    CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
+    APRPool::free(pool);
 }
 
 Monitor::~Monitor() {

Reply via email to