Author: sayer
Date: 2009-05-19 16:07:30 +0200 (Tue, 19 May 2009)
New Revision: 1391

Added:
   trunk/apps/monitoring/etc/
   trunk/apps/monitoring/etc/monitoring.conf
Modified:
   trunk/apps/monitoring/Monitoring.cpp
   trunk/apps/monitoring/Monitoring.h
   trunk/doc/Readme.monitoring
Log:
Option to run a garbage collector thread to remove info of 
expired sessions.

This work was kindly sponsored by Teltech Systems Inc.



Modified: trunk/apps/monitoring/Monitoring.cpp
===================================================================
--- trunk/apps/monitoring/Monitoring.cpp        2009-05-19 13:49:17 UTC (rev 
1390)
+++ trunk/apps/monitoring/Monitoring.cpp        2009-05-19 14:07:30 UTC (rev 
1391)
@@ -27,10 +27,14 @@
 
 #include "Monitoring.h"
 
+#include "AmConfigReader.h"
+#include "AmEventDispatcher.h"
+
 #include "log.h"
 
 #include <sys/types.h>
 #include <regex.h>
+#include <unistd.h>
 
 //EXPORT_PLUGIN_CLASS_FACTORY(Monitor, MOD_NAME);
 extern "C" void* plugin_class_create()
@@ -42,6 +46,7 @@
 }
 
 Monitor* Monitor::_instance=0;
+unsigned int Monitor::gcInterval = 10;
 
 Monitor* Monitor::instance()
 {
@@ -51,7 +56,7 @@
 }
 
 Monitor::Monitor(const string& name) 
-  : AmDynInvokeFactory(MOD_NAME) {
+  : AmDynInvokeFactory(MOD_NAME), gc_thread(NULL) {
 }
 
 Monitor::~Monitor() {
@@ -59,6 +64,23 @@
 
 int Monitor::onLoad() {
   // todo: if GC configured, start thread
+  AmConfigReader cfg;
+  if(cfg.loadFile(AmConfig::ModConfigPath + string(MOD_NAME ".conf"))) {
+    DBG("monitoring not starting garbage collector\n");
+    return 0;
+  }
+
+  if (cfg.getParameter("run_garbage_collector","no") == "yes") {
+    gcInterval = cfg.getParameterInt("garbage_collector_interval", 10);
+    DBG("Running garbage collection for monitoring every %u seconds\n", 
+       gcInterval);
+    gc_thread.reset(new MonitorGarbageCollector());
+    gc_thread->start();
+    AmEventDispatcher::instance()->addEventQueue("monitoring_gc", 
gc_thread.get());
+//     // add garbage collector to garbage collector...
+//     AmThreadWatcher::instance()->add(gc_thread);
+  }
+
   return 0;
 }
 
@@ -189,12 +211,21 @@
 }
 
 void Monitor::clearFinished(const AmArg& args, AmArg& ret) {
- for (int i=0;i<NUM_LOG_BUCKETS;i++) {
+  clearFinished();
+
+  ret.push(0);
+  ret.push("OK");
+}
+
+void Monitor::clearFinished() {
+  time_t now = time(0);
+  for (int i=0;i<NUM_LOG_BUCKETS;i++) {
     logs[i].log_lock.lock();
     std::map<string, LogInfo>::iterator it=
       logs[i].log.begin();
     while (it != logs[i].log.end()) {
-      if (it->second.finished > 0) {
+      if (it->second.finished && 
+         it->second.finished <= now) {
        std::map<string, LogInfo>::iterator d_it = it;
        it++;
        logs[i].log.erase(d_it);
@@ -204,8 +235,6 @@
     }
     logs[i].log_lock.unlock();
   }
-  ret.push(0);
-  ret.push("OK");
 }
 
 void Monitor::get(const AmArg& args, AmArg& ret) {
@@ -218,10 +247,28 @@
   bucket.log_lock.unlock();
 }
 
+void Monitor::getAttribute(const AmArg& args, AmArg& ret) {
+  assertArgCStr(args[0]);
+  string attr_name = args[0].asCStr();
+  for (int i=0;i<NUM_LOG_BUCKETS;i++) {
+    logs[i].log_lock.lock();
+    for (std::map<string, LogInfo>::iterator it=
+          logs[i].log.begin();it != logs[i].log.end();it++) {
+      ret.push(AmArg());
+      AmArg& val = ret.get(ret.size()-1);
+      val.push(AmArg(it->first.c_str()));
+      val.push(it->second.info[attr_name]);
+    }
+    logs[i].log_lock.unlock();
+  }
+}
+
+
 #define DEF_GET_ATTRIB_FUNC(func_name, cond)                           \
   void Monitor::func_name(const AmArg& args, AmArg& ret) {             \
     assertArgCStr(args[0]);                                            \
     string attr_name = args[0].asCStr();                               \
+    time_t now = time(0);                                              \
     for (int i=0;i<NUM_LOG_BUCKETS;i++) {                              \
       logs[i].log_lock.lock();                                         \
       for (std::map<string, LogInfo>::iterator it=                     \
@@ -237,10 +284,10 @@
     }                                                                  \
   }
 
-DEF_GET_ATTRIB_FUNC(getAttribute, true)
-DEF_GET_ATTRIB_FUNC(getAttributeActive,  (!it->second.finished))
-DEF_GET_ATTRIB_FUNC(getAttributeFinished, (it->second.finished))
-
+DEF_GET_ATTRIB_FUNC(getAttributeActive,  (!(it->second.finished && 
+                                           it->second.finished <= now)))
+DEF_GET_ATTRIB_FUNC(getAttributeFinished,(it->second.finished && 
+                                         it->second.finished <= now))
 #undef DEF_GET_ATTRIB_FUNC
 
 void Monitor::listAll(const AmArg& args, AmArg& ret) {
@@ -315,11 +362,14 @@
 }
 
 void Monitor::listFinished(const AmArg& args, AmArg& ret) {
- for (int i=0;i<NUM_LOG_BUCKETS;i++) {
+  time_t now = time(0);
+  ret.assertArray();
+  for (int i=0;i<NUM_LOG_BUCKETS;i++) {
     logs[i].log_lock.lock();
     for (std::map<string, LogInfo>::iterator it=
           logs[i].log.begin(); it != logs[i].log.end(); it++) {
-      if (it->second.finished > 0)
+      if (it->second.finished && 
+         it->second.finished <= now)
        ret.push(AmArg(it->first.c_str()));
     }
     logs[i].log_lock.unlock();
@@ -328,11 +378,14 @@
 
 
 void Monitor::listActive(const AmArg& args, AmArg& ret) {
- for (int i=0;i<NUM_LOG_BUCKETS;i++) {
+  time_t now = time(0);
+  ret.assertArray();
+  for (int i=0;i<NUM_LOG_BUCKETS;i++) {
     logs[i].log_lock.lock();
     for (std::map<string, LogInfo>::iterator it=
           logs[i].log.begin(); it != logs[i].log.end(); it++) {
-      if (!it->second.finished)
+      if (!(it->second.finished &&
+           it->second.finished <= now))
        ret.push(AmArg(it->first.c_str()));
     }
     logs[i].log_lock.unlock();
@@ -348,3 +401,29 @@
   
   return logs[c % NUM_LOG_BUCKETS];
 }
+
+void MonitorGarbageCollector::run() {
+  DBG("running MonitorGarbageCollector thread\n");
+  running.set(true);
+  while (running.get()) {
+    sleep(Monitor::gcInterval);
+    Monitor::instance()->clearFinished();
+  }
+  DBG("MonitorGarbageCollector thread ends\n");
+  AmEventDispatcher::instance()->delEventQueue("monitoring_gc");
+}
+
+void MonitorGarbageCollector::postEvent(AmEvent* e) {
+  AmSystemEvent* sys_ev = dynamic_cast<AmSystemEvent*>(e);  
+  if (sys_ev && 
+      sys_ev->sys_event == AmSystemEvent::ServerShutdown) {
+    DBG("stopping MonitorGarbageCollector thread\n");
+    running.set(false);
+    return;
+  }
+
+  WARN("received unknown event\n");
+}
+
+void MonitorGarbageCollector::on_stop() {
+}

Modified: trunk/apps/monitoring/Monitoring.h
===================================================================
--- trunk/apps/monitoring/Monitoring.h  2009-05-19 13:49:17 UTC (rev 1390)
+++ trunk/apps/monitoring/Monitoring.h  2009-05-19 14:07:30 UTC (rev 1391)
@@ -29,6 +29,7 @@
 #define _MONITORING_H_
 
 #include <map>
+#include <memory>
 
 #include "AmThread.h"
 #include "AmApi.h"
@@ -49,18 +50,21 @@
   AmMutex log_lock;
   std::map<string, LogInfo> log;
 };
+class MonitorGarbageCollector;
 
 class Monitor 
 : public AmDynInvokeFactory,
-  public AmDynInvoke
-   
+  public AmDynInvoke   
 {
   static Monitor* _instance;
+  std::auto_ptr<MonitorGarbageCollector> gc_thread;
 
   LogBucket logs[NUM_LOG_BUCKETS];
 
   LogBucket& getLogBucket(const string& call_id);
 
+  void clearFinished();
+
   void log(const AmArg& args, AmArg& ret);
   void logAdd(const AmArg& args, AmArg& ret);
   void markFinished(const AmArg& args, AmArg& ret);
@@ -88,15 +92,20 @@
   void invoke(const string& method, 
              const AmArg& args, AmArg& ret);
   int onLoad();
+  static unsigned int gcInterval;
+
+  friend class MonitorGarbageCollector;
 };
 
-/*
-class MonitorGarbageCollector : public AmThread {
+class MonitorGarbageCollector 
+: public AmThread,
+  public AmEventQueueInterface
+ {
+  AmSharedVar<bool> running;
+
  public:
   void run();
   void on_stop();
-
+  void postEvent(AmEvent* e);
 };
-*/
-
 #endif

Added: trunk/apps/monitoring/etc/monitoring.conf
===================================================================
--- trunk/apps/monitoring/etc/monitoring.conf   2009-05-19 13:49:17 UTC (rev 
1390)
+++ trunk/apps/monitoring/etc/monitoring.conf   2009-05-19 14:07:30 UTC (rev 
1391)
@@ -0,0 +1,14 @@
+
+#run_garbage_collector=[yes | no]
+#
+# run garbage collection on expired session info? 
+# Default: no
+#
+#run_garbage_collector = yes
+
+#garbage_collector_interval=10
+#
+# run garbage collection every n seconds
+# Default: 10
+#
+#garbage_collector_interval = 20
\ No newline at end of file

Modified: trunk/doc/Readme.monitoring
===================================================================
--- trunk/doc/Readme.monitoring 2009-05-19 13:49:17 UTC (rev 1390)
+++ trunk/doc/Readme.monitoring 2009-05-19 14:07:30 UTC (rev 1391)
@@ -25,6 +25,8 @@
  USE_MONITORING = yes
 and the monitoring module needs to be loaded.
 
+In monitoring.conf the option can be set to run a garbage collector thread.
+This will remove all info about finished sessions preiodically.
 
 DI API
 ------

_______________________________________________
Semsdev mailing list
[email protected]
http://lists.iptel.org/mailman/listinfo/semsdev

Reply via email to