Author: chug Date: Mon Dec 1 13:32:48 2014 New Revision: 1642681 URL: http://svn.apache.org/r1642681 Log: QPID-6213: qpidd misses heartbeats
* Pollable queue breaks when client does not process whole batch. * QueueCleaner must not reschedule same task multiple times. * QueueCleaner breaks out of batch processing on wall clock time interval. Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=1642681&r1=1642680&r2=1642681&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Mon Dec 1 13:32:48 2014 @@ -23,6 +23,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/sys/Timer.h" +#include "qpid/sys/Time.h" #include <boost/function.hpp> #include <boost/bind.hpp> @@ -73,22 +74,25 @@ void QueueCleaner::setTimer(qpid::sys::T void QueueCleaner::fired() { + QPID_LOG(debug, "QueueCleaner::fired: requesting purge"); queues.eachQueue(boost::bind(&PurgeSet::push, &purging, _1)); - QPID_LOG(debug, "Requested purge of queues"); + task->restart(); // Update task restart time to now()+interval + timer->add(task); } QueueCleaner::QueuePtrs::const_iterator QueueCleaner::purge(const QueueCleaner::QueuePtrs& batch) { - for (QueuePtrs::const_iterator i = batch.begin(); i != batch.end(); ++i) { - (*i)->purgeExpired(period); - } - QPID_LOG(debug, "Purged " << batch.size() << " queues"); - if (purging.empty()) { - task->restart(); - timer->add(task); - QPID_LOG(debug, "Restarted purge timer"); + const sys::AbsTime tmoTime = sys::AbsTime(sys::AbsTime::now(), 1 * sys::TIME_SEC); + int nPurged = 0; + QueuePtrs::const_iterator batchItr = batch.begin(); + for ( ; batchItr != batch.end() && sys::AbsTime::now() < tmoTime; ++batchItr) { + task->restart(); // Update task restart time to now()+interval + (*batchItr)->purgeExpired(period); + nPurged++; } - return batch.end(); + QPID_LOG(debug, "QueueCleaner::purge: purged " << nPurged << " of " << batch.size() << " queues"); + task->restart(); // Update task restart time to now()+interval + return batchItr; } }} // namespace qpid::broker Modified: qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=1642681&r1=1642680&r2=1642681&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Mon Dec 1 13:32:48 2014 @@ -143,7 +143,7 @@ template <class T> void PollableQueue<T> template <class T> void PollableQueue<T>::process() { // Called with lock held - while (!stopped && !queue.empty()) { + if (!stopped && !queue.empty()) { assert(batch.empty()); batch.swap(queue); typename Batch::const_iterator putBack; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org