Author: kpvdr
Date: Fri Nov 2 06:12:14 2007
New Revision: 591321
URL: http://svn.apache.org/viewvc?rev=591321&view=rev
Log:
Added Timer class which uses boost intrusive_ptr. This needs to be made into a
C++ template at some point.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp?rev=591321&r1=591320&r2=591321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp Fri Nov 2 06:12:14
2007
@@ -95,3 +95,86 @@
{
return a.get() && b.get() && a->time > b->time;
}
+
+bool LaterA::operator()(const TimerTaskA::intrusive_ptr& a, const
TimerTaskA::intrusive_ptr& b) const
+{
+ return a.get() && b.get() && a->time > b->time;
+}
+
+TimerA::TimerA(): active(false)
+{
+ start();
+}
+
+TimerA::~TimerA()
+{
+ stop();
+}
+void TimerA::run()
+{
+ Monitor::ScopedLock l(monitor);
+ while(active){
+ if (itasks.empty()) {
+ monitor.wait();
+ } else {
+ TimerTaskA::intrusive_ptr t = itasks.top();
+ if (t->cancelled) {
+ itasks.pop();
+ } else if(t->time < AbsTime::now()) {
+ itasks.pop();
+ t->fire();
+ } else {
+ monitor.wait(t->time);
+ }
+ }
+ }
+// ::run();
+}
+
+TimerTaskA::TimerTaskA(qpid::sys::Duration timeout): TimerTask(timeout),
ref_cnt(0) {}
+TimerTaskA::TimerTaskA(qpid::sys::AbsTime time): TimerTask(time), ref_cnt(0) {}
+TimerTaskA::~TimerTaskA() {}
+
+
+void TimerA::add(TimerTaskA::intrusive_ptr& task)
+{
+ Monitor::ScopedLock l(monitor);
+ itasks.push(task);
+ monitor.notify();
+}
+
+void TimerA::start()
+{
+ Monitor::ScopedLock l(monitor);
+ if (!active) {
+ active = true;
+ runner = Thread(this);
+ }
+}
+
+void TimerA::stop()
+{
+ signalStop();
+ runner.join();
+}
+void TimerA::signalStop()
+{
+ Monitor::ScopedLock l(monitor);
+ if (active) {
+ active = false;
+ monitor.notifyAll();
+ }
+}
+
+void qpid::broker::intrusive_ptr_add_ref(TimerTaskA* fe)
+{
+ fe->ref();
+}
+
+void qpid::broker::intrusive_ptr_release(TimerTaskA* fe)
+{
+ fe->unref();
+ if (fe->refcnt() == 0) delete fe;
+}
+
+
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h?rev=591321&r1=591320&r2=591321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h Fri Nov 2 06:12:14
2007
@@ -24,6 +24,7 @@
#include <memory>
#include <queue>
#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
@@ -44,30 +45,76 @@
virtual void fire() = 0;
};
- struct Later
- {
- bool operator()(const TimerTask::shared_ptr& a, const
TimerTask::shared_ptr& b) const;
- };
+struct TimerTaskA : public TimerTask
+{
+ typedef boost::intrusive_ptr<TimerTaskA> intrusive_ptr;
+
+ TimerTaskA(qpid::sys::Duration timeout);
+ TimerTaskA(qpid::sys::AbsTime time);
+ virtual ~TimerTaskA();
+
+ size_t ref_cnt;
+ inline size_t refcnt(void) { return ref_cnt;}
+ inline void ref(void) { ref_cnt++; }
+ inline void unref(void) { ref_cnt--; }
+};
+
+struct Later
+{
+ bool operator()(const TimerTask::shared_ptr& a, const
TimerTask::shared_ptr& b) const;
+};
+
+struct LaterA
+{
+ bool operator()(const TimerTaskA::intrusive_ptr& a, const
TimerTaskA::intrusive_ptr& b) const;
+};
+
class Timer : private qpid::sys::Runnable
{
+protected:
qpid::sys::Monitor monitor;
std::priority_queue<TimerTask::shared_ptr,
std::vector<TimerTask::shared_ptr>, Later> tasks;
qpid::sys::Thread runner;
bool active;
- void run();
+ virtual void run();
void signalStop();
public:
Timer();
- ~Timer();
+ virtual ~Timer();
void add(TimerTask::shared_ptr task);
void start();
void stop();
};
+
+class TimerA : private qpid::sys::Runnable
+{
+protected:
+ qpid::sys::Monitor monitor;
+ std::priority_queue<TimerTaskA::intrusive_ptr&,
std::vector<TimerTaskA::intrusive_ptr>,
+ LaterA> itasks;
+ qpid::sys::Thread runner;
+ bool active;
+
+ virtual void run();
+ void signalStop();
+
+public:
+ TimerA();
+ virtual ~TimerA();
+
+ void add(TimerTaskA::intrusive_ptr& task);
+ void start();
+ void stop();
+};
+
+void intrusive_ptr_add_ref(TimerTaskA* r);
+void intrusive_ptr_release(TimerTaskA* r);
+
}
}