Author: gsim
Date: Tue Oct 14 04:06:41 2008
New Revision: 704461

URL: http://svn.apache.org/viewvc?rev=704461&view=rev
Log:
Update to periodic purge of expired messages: check the dequeue rate to avoid 
interfering unnecessarily where the dequeing is sufficient to remove expired 
messages.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=704461&r1=704460&r2=704461&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Oct 14 04:06:41 2008
@@ -332,6 +332,7 @@
   qpid/broker/QueueBindings.cpp \
   qpid/broker/QueuePolicy.cpp \
   qpid/broker/QueueRegistry.cpp \
+  qpid/broker/RateTracker.cpp \
   qpid/broker/RecoveryManagerImpl.cpp \
   qpid/broker/RecoveredEnqueue.cpp \
   qpid/broker/RecoveredDequeue.cpp \
@@ -479,6 +480,7 @@
   qpid/broker/QueuedMessage.h \
   qpid/broker/QueuePolicy.h \
   qpid/broker/QueueRegistry.h \
+  qpid/broker/RateTracker.h \
   qpid/broker/RecoverableConfig.h \
   qpid/broker/RecoverableExchange.h \
   qpid/broker/RecoverableMessage.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=704461&r1=704460&r2=704461&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct 14 04:06:41 
2008
@@ -405,19 +405,24 @@
 
 void Queue::purgeExpired()
 {
-    Messages expired;
-    {
-        Mutex::ScopedLock locker(messageLock);
-        for (Messages::iterator i = messages.begin(); i != messages.end();) {
-            if (i->payload->hasExpired()) {
-                expired.push_back(*i);
-                i = messages.erase(i);
-            } else {
-                ++i;
+    //As expired messages are discarded during dequeue also, only
+    //bother explicitly expiring if the rate of dequeues since last
+    //attempt is less than one per second.
+    if (dequeueTracker.sampleRatePerSecond() < 1) {
+        Messages expired;
+        {
+            Mutex::ScopedLock locker(messageLock);
+            for (Messages::iterator i = messages.begin(); i != 
messages.end();) {
+                if (i->payload->hasExpired()) {
+                    expired.push_back(*i);
+                    i = messages.erase(i);
+                } else {
+                    ++i;
+                }
             }
         }
+        for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, 
(TransactionContext*) 0, _1));
     }
-    for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, 
(TransactionContext*) 0, _1));
 }
 
 /**
@@ -465,6 +470,7 @@
         lvq.erase(key);
     }
     messages.pop_front();
+    ++dequeueTracker;
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=704461&r1=704460&r2=704461&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Oct 14 04:06:41 
2008
@@ -27,6 +27,7 @@
 #include "PersistableQueue.h"
 #include "QueuePolicy.h"
 #include "QueueBindings.h"
+#include "RateTracker.h"
 
 #include "qpid/framing/FieldTable.h"
 #include "qpid/sys/Monitor.h"
@@ -94,6 +95,7 @@
             boost::shared_ptr<Exchange> alternateExchange;
             framing::SequenceNumber sequence;
             qmf::org::apache::qpid::broker::Queue* mgmtObject;
+            RateTracker dequeueTracker;
 
             void push(boost::intrusive_ptr<Message>& msg);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.cpp?rev=704461&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.cpp Tue Oct 14 
04:06:41 2008
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RateTracker.h"
+
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_SEC;
+
+namespace qpid {
+namespace broker {
+
+RateTracker::RateTracker() : currentCount(0), lastCount(0), 
lastTime(AbsTime::now()) {}
+
+RateTracker& RateTracker::operator++()
+{
+    ++currentCount;
+    return *this;
+}
+
+double RateTracker::sampleRatePerSecond()
+{
+    int32_t increment = currentCount - lastCount;    
+    AbsTime now = AbsTime::now();
+    Duration interval(lastTime, now);
+    lastCount = currentCount;
+    lastTime = now;
+    return increment ? increment / (interval / TIME_SEC) : 0;
+}
+
+}} // namespace qpid::broker

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.h?rev=704461&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.h Tue Oct 14 
04:06:41 2008
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_RATETRACKER_H
+#define QPID_BROKER_RATETRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Simple rate tracker: represents some value that can be incremented,
+ * then can periodcially sample the rate of increments.
+ */
+class RateTracker
+{
+  public:
+    RateTracker();
+    /**
+     * Increments the count being tracked. Can be called concurrently
+     * with other calls to this operator as well as with calls to
+     * sampleRatePerSecond().
+     */
+    RateTracker& operator++();
+    /**
+     * Returns the rate of increments per second since last
+     * called. Calls to this method should be serialised, but can be
+     * called concurrently with the increment operator
+     */
+    double sampleRatePerSecond();
+  private:
+    volatile int32_t currentCount;
+    int32_t lastCount;
+    qpid::sys::AbsTime lastTime;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_RATETRACKER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RateTracker.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to