Author: sayer
Date: 2009-02-10 13:17:15 +0100 (Tue, 10 Feb 2009)
New Revision: 1259

Added:
   trunk/apps/monitoring/
   trunk/apps/monitoring/Makefile
   trunk/apps/monitoring/Monitoring.cpp
   trunk/apps/monitoring/Monitoring.h
   trunk/doc/Readme.monitoring
Modified:
   trunk/Makefile.defs
   trunk/core/AmSessionContainer.cpp
   trunk/core/AmSessionContainer.h
Log:
initial support for better call/server monitoring


Modified: trunk/Makefile.defs
===================================================================
--- trunk/Makefile.defs 2009-02-10 11:40:55 UTC (rev 1258)
+++ trunk/Makefile.defs 2009-02-10 12:17:15 UTC (rev 1259)
@@ -9,7 +9,7 @@
 VERSION = 1
 PATCHLEVEL = 1
 SUBLEVEL = 0
-EXTRAVERSION ?= -pre-$(SVN_REV)
+EXTRAVERSION ?= -dev-$(SVN_REV)
 
 REL_VERSION=$(VERSION).$(PATCHLEVEL).$(SUBLEVEL)
 RELEASE=$(REL_VERSION)$(EXTRAVERSION)
@@ -49,6 +49,11 @@
 # e.g. python modules:
 #exclude_modules ?= py_sems ivr mailbox pin_collect conf_auth mp3 examples
 
+# build in support for monitoring?
+#
+#
+#USE_MONITORING=yes
+
 LDFLAGS += -lm
 
 OS     = $(shell uname -s | sed -e s/SunOS/solaris/ | tr "[A-Z]" "[a-z]")
@@ -69,6 +74,10 @@
 endif
 endif
 
+ifdef USE_MONITORING
+CPPFLAGS += -DUSE_MONITORING      
+endif
+
 # Additions for Solaris support.
 ifeq ($(OS),solaris)
        GETARCH=uname -p

Added: trunk/apps/monitoring/Makefile
===================================================================
--- trunk/apps/monitoring/Makefile      2009-02-10 11:40:55 UTC (rev 1258)
+++ trunk/apps/monitoring/Makefile      2009-02-10 12:17:15 UTC (rev 1259)
@@ -0,0 +1,7 @@
+plug_in_name = monitoring
+
+module_ldflags =
+module_cflags  = -DMOD_NAME=\"$(plug_in_name)\"
+
+COREPATH ?=../../core
+include $(COREPATH)/plug-in/Makefile.app_module


Property changes on: trunk/apps/monitoring/Makefile
___________________________________________________________________
Name: svn:eol-style
   + native

Added: trunk/apps/monitoring/Monitoring.cpp
===================================================================
--- trunk/apps/monitoring/Monitoring.cpp        2009-02-10 11:40:55 UTC (rev 
1258)
+++ trunk/apps/monitoring/Monitoring.cpp        2009-02-10 12:17:15 UTC (rev 
1259)
@@ -0,0 +1,244 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2009 IPTEGO GmbH
+ *
+ * This file is part of SEMS, a free SIP media server.
+ *
+ * SEMS is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * For a license to use the SEMS software under conditions
+ * other than those described here, or to purchase support for this
+ * software, please contact iptel.org by e-mail at the following addresses:
+ *    [email protected]
+ *
+ * SEMS is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include "Monitoring.h"
+
+#include "log.h"
+
+//EXPORT_PLUGIN_CLASS_FACTORY(Monitor, MOD_NAME);
+extern "C" void* plugin_class_create()
+{
+    Monitor* m_inst = Monitor::instance();
+    assert(dynamic_cast<AmDynInvokeFactory*>(m_inst));
+
+    return m_inst;
+}
+
+Monitor* Monitor::_instance=0;
+
+Monitor* Monitor::instance()
+{
+  if(_instance == NULL)
+    _instance = new Monitor(MOD_NAME);
+  return _instance;
+}
+
+Monitor::Monitor(const string& name) 
+  : AmDynInvokeFactory(MOD_NAME) {
+}
+
+Monitor::~Monitor() {
+}
+
+int Monitor::onLoad() {
+  // todo: if GC configured, start thread
+  return 0;
+}
+
+void Monitor::invoke(const string& method, 
+                    const AmArg& args, AmArg& ret) {
+  if(method == "log"){
+    log(args,ret);
+  } else if(method == "logAdd"){
+    logAdd(args,ret);
+  } else if(method == "markFinished"){
+    markFinished(args,ret);
+  } else if(method == "clear"){
+    clear(args,ret);
+  } else if(method == "clearFinished"){
+    clearFinished(args,ret);
+  } else if(method == "erase"){
+    clear(args,ret);
+  } else if(method == "get"){
+    get(args,ret);
+  } else if(method == "list"){
+    list(args,ret);
+  } else if(method == "listFinished"){
+    listFinished(args,ret);
+  } else if(method == "listUnfinished"){
+    listUnfinished(args,ret);
+  } else if(method == "_list"){ 
+    ret.push(AmArg("log"));
+    ret.push(AmArg("logAdd"));
+    ret.push(AmArg("markFinished"));
+    ret.push(AmArg("erase"));
+    ret.push(AmArg("clear"));
+    ret.push(AmArg("clearFinished"));
+    ret.push(AmArg("get"));
+    ret.push(AmArg("list"));
+    ret.push(AmArg("listFinished"));
+    ret.push(AmArg("listUnfinished"));
+  } else
+    throw AmDynInvoke::NotImplemented(method);
+}
+
+void Monitor::log(const AmArg& args, AmArg& ret) {
+  assertArgCStr(args[0]);
+  assertArgCStr(args[1]);
+  
+  LogBucket& bucket = getLogBucket(args[0].asCStr());
+  bucket.log_lock.lock();
+  try {
+    for (size_t i=1;i<args.size();i+=2)
+    bucket.log[args[0].asCStr()].info[args[i].asCStr()]=AmArg(args[i+1]);
+  } catch (...) {
+    bucket.log_lock.unlock();
+    ret.push(-1);
+    ret.push("ERROR while converting value");
+    throw;
+  }
+  bucket.log_lock.unlock();
+  ret.push(0);
+  ret.push("OK");
+}
+
+void Monitor::logAdd(const AmArg& args, AmArg& ret) {
+  assertArgCStr(args[0]);
+  assertArgCStr(args[1]);
+
+  LogBucket& bucket = getLogBucket(args[0].asCStr());
+  bucket.log_lock.lock();
+  try {
+    bucket.log[args[0].asCStr()].info[args[1].asCStr()].push(AmArg(args[2]));
+  } catch (...) {
+    ret.push(-1);
+    ret.push("ERROR while converting value");
+    bucket.log_lock.unlock();
+    throw;
+  }
+  ret.push(0);
+  ret.push("OK");
+  bucket.log_lock.unlock();
+}
+
+void Monitor::markFinished(const AmArg& args, AmArg& ret) {
+  assertArgCStr(args[0]);
+
+  LogBucket& bucket = getLogBucket(args[0].asCStr());
+  bucket.log_lock.lock();
+  if (!bucket.log[args[0].asCStr()].finished)
+    bucket.log[args[0].asCStr()].finished  = time(0);
+  bucket.log_lock.unlock();
+  ret.push(0);
+  ret.push("OK");
+}
+
+void Monitor::erase(const AmArg& args, AmArg& ret) {
+  assertArgCStr(args[0]);
+  LogBucket& bucket = getLogBucket(args[0].asCStr());
+  bucket.log_lock.lock();
+  bucket.log.erase(args[0].asCStr());
+  bucket.log_lock.unlock();
+  ret.push(0);
+  ret.push("OK");
+}
+
+void Monitor::clear(const AmArg& args, AmArg& ret) {
+  for (int i=0;i<NUM_LOG_BUCKETS;i++) {
+    logs[i].log_lock.lock();
+    logs[i].log.clear();
+    logs[i].log_lock.unlock();
+  }
+  ret.push(0);
+  ret.push("OK");
+}
+
+void Monitor::clearFinished(const AmArg& args, AmArg& ret) {
+ for (int i=0;i<NUM_LOG_BUCKETS;i++) {
+    logs[i].log_lock.lock();
+    std::map<string, LogInfo>::iterator it=
+      logs[i].log.begin();
+    while (it != logs[i].log.end()) {
+      if (it->second.finished > 0) {
+       std::map<string, LogInfo>::iterator d_it = it;
+       it++;
+       logs[i].log.erase(d_it);
+      } else {
+       it++;
+      }
+    }
+    logs[i].log_lock.unlock();
+  }
+  ret.push(0);
+  ret.push("OK");
+}
+
+void Monitor::get(const AmArg& args, AmArg& ret) {
+  assertArgCStr(args[0]);
+  LogBucket& bucket = getLogBucket(args[0].asCStr());
+  bucket.log_lock.lock();
+  std::map<string, LogInfo>::iterator it=bucket.log.find(args[0].asCStr());
+  if (it!=bucket.log.end())
+    ret.push(it->second.info);
+  bucket.log_lock.unlock();
+}
+
+void Monitor::list(const AmArg& args, AmArg& ret) {
+ for (int i=0;i<NUM_LOG_BUCKETS;i++) {
+    logs[i].log_lock.lock();
+    for (std::map<string, LogInfo>::iterator it=
+          logs[i].log.begin(); it != logs[i].log.end(); it++) {
+      ret.push(AmArg(it->first.c_str()));
+    }
+    logs[i].log_lock.unlock();
+  }
+}
+
+void Monitor::listFinished(const AmArg& args, AmArg& ret) {
+ for (int i=0;i<NUM_LOG_BUCKETS;i++) {
+    logs[i].log_lock.lock();
+    for (std::map<string, LogInfo>::iterator it=
+          logs[i].log.begin(); it != logs[i].log.end(); it++) {
+      if (it->second.finished > 0)
+       ret.push(AmArg(it->first.c_str()));
+    }
+    logs[i].log_lock.unlock();
+  }
+}
+
+
+void Monitor::listUnfinished(const AmArg& args, AmArg& ret) {
+ for (int i=0;i<NUM_LOG_BUCKETS;i++) {
+    logs[i].log_lock.lock();
+    for (std::map<string, LogInfo>::iterator it=
+          logs[i].log.begin(); it != logs[i].log.end(); it++) {
+      if (!it->second.finished)
+       ret.push(AmArg(it->first.c_str()));
+    }
+    logs[i].log_lock.unlock();
+  }
+}
+
+LogBucket& Monitor::getLogBucket(const string& call_id) {
+  if (call_id.empty())
+    return logs[0];
+  char c = '\0'; // some distribution...bad luck if all callid start with 
00000...
+  for (size_t i=0;i<5 && i<call_id.length();i++) 
+    c = c ^ call_id[i];
+  
+  return logs[c % NUM_LOG_BUCKETS];
+}


Property changes on: trunk/apps/monitoring/Monitoring.cpp
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Added: trunk/apps/monitoring/Monitoring.h
===================================================================
--- trunk/apps/monitoring/Monitoring.h  2009-02-10 11:40:55 UTC (rev 1258)
+++ trunk/apps/monitoring/Monitoring.h  2009-02-10 12:17:15 UTC (rev 1259)
@@ -0,0 +1,97 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2009 IPTEGO GmbH
+ *
+ * This file is part of SEMS, a free SIP media server.
+ *
+ * SEMS is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * For a license to use the SEMS software under conditions
+ * other than those described here, or to purchase support for this
+ * software, please contact iptel.org by e-mail at the following addresses:
+ *    [email protected]
+ *
+ * SEMS is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#ifndef _MONITORING_H_
+#define _MONITORING_H_
+
+#include <map>
+
+#include "AmThread.h"
+#include "AmApi.h"
+#include "AmArg.h"
+
+#include <time.h>
+
+#define NUM_LOG_BUCKETS 16
+
+struct LogInfo {
+  time_t finished; // for garbage collection
+LogInfo() 
+ : finished(0) { }
+  AmArg info;
+};
+
+struct LogBucket {
+  AmMutex log_lock;
+  std::map<string, LogInfo> log;
+};
+
+class Monitor 
+: public AmDynInvokeFactory,
+  public AmDynInvoke
+   
+{
+  static Monitor* _instance;
+
+  LogBucket logs[NUM_LOG_BUCKETS];
+
+  LogBucket& getLogBucket(const string& call_id);
+
+  void log(const AmArg& args, AmArg& ret);
+  void logAdd(const AmArg& args, AmArg& ret);
+  void markFinished(const AmArg& args, AmArg& ret);
+  void clear(const AmArg& args, AmArg& ret);
+  void clearFinished(const AmArg& args, AmArg& ret);
+  void erase(const AmArg& args, AmArg& ret);
+  void get(const AmArg& args, AmArg& ret);
+  void list(const AmArg& args, AmArg& ret);
+  void listFinished(const AmArg& args, AmArg& ret);
+  void listUnfinished(const AmArg& args, AmArg& ret);
+
+ public:
+
+  Monitor(const string& name);
+  ~Monitor();
+  // DI factory
+  AmDynInvoke* getInstance() { return instance(); }
+  // DI API
+  static Monitor* instance();
+  void invoke(const string& method, 
+             const AmArg& args, AmArg& ret);
+  int onLoad();
+};
+
+/*
+class MonitorGarbageCollector : public AmThread {
+ public:
+  void run();
+  void on_stop();
+
+};
+*/
+
+#endif


Property changes on: trunk/apps/monitoring/Monitoring.h
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Modified: trunk/core/AmSessionContainer.cpp
===================================================================
--- trunk/core/AmSessionContainer.cpp   2009-02-10 11:40:55 UTC (rev 1258)
+++ trunk/core/AmSessionContainer.cpp   2009-02-10 12:17:15 UTC (rev 1259)
@@ -39,10 +39,10 @@
 
 #include "sems.h"
 
-// AmSessionContainer methods
-
 AmSessionContainer* AmSessionContainer::_instance=NULL;
 
+_MONITORING_DECLARE_INTERFACE(AmSessionContainer);
+
 AmSessionContainer::AmSessionContainer()
   : _run_cond(false), _container_closed(false)
       
@@ -88,6 +88,8 @@
       
       if(cur_session->is_stopped() && cur_session->detached.get()){
        
+       MONITORING_MARK_FINISHED(cur_session->getCallID().c_str());
+
        DBG("session %p has been destroyed'\n",(void*)cur_session->_pid);
        delete cur_session;
       }
@@ -115,6 +117,7 @@
 
 void AmSessionContainer::run()
 {
+  _MONITORING_INIT;
 
   while(!_container_closed.get()){
 
@@ -201,6 +204,13 @@
        return NULL;
       }
 
+      MONITORING_LOG5(session->getCallID().c_str(), 
+                     "app", req.cmd.c_str(),
+                     "dir", "out",
+                     "from", req.from.c_str(),
+                     "to", req.to.c_str(),
+                     "ruri", req.r_uri.c_str());
+      
       if (int err = session->sendInvite(req.hdrs)) {
        ERROR("INVITE could not be sent: error code = %d.\n", 
              err);
@@ -209,6 +219,7 @@
                        session->getCallID(),
                        session->getRemoteTag());       
        delete session;
+       MONITORING_MARK_FINISHED(session->getCallID().c_str());
        return NULL;
       }
 
@@ -261,6 +272,13 @@
          throw string("internal server error");
        }
 
+       MONITORING_LOG5(req.callid.c_str(), 
+                       "app", req.cmd.c_str(),
+                       "dir", "in",
+                       "from", req.from.c_str(),
+                       "to", req.to.c_str(),
+                       "ruri", req.r_uri.c_str());
+
        session->start();
 
        session->postEvent(new AmSipRequestEvent(req));

Modified: trunk/core/AmSessionContainer.h
===================================================================
--- trunk/core/AmSessionContainer.h     2009-02-10 11:40:55 UTC (rev 1258)
+++ trunk/core/AmSessionContainer.h     2009-02-10 12:17:15 UTC (rev 1259)
@@ -31,6 +31,8 @@
 #include "AmThread.h"
 #include "AmSession.h"
 
+#include "ampi/MonitoringAPI.h"
+
 #include <string>
 #include <queue>
 #include <map>
@@ -140,6 +142,9 @@
    * @return false if session doesn't exist 
    */
   bool postEvent(const string& local_tag, AmEvent* event);
+
+  _MONITORING_DEFINE_INTERFACE;
+
 };
 
 #endif

Added: trunk/doc/Readme.monitoring
===================================================================
--- trunk/doc/Readme.monitoring 2009-02-10 11:40:55 UTC (rev 1258)
+++ trunk/doc/Readme.monitoring 2009-02-10 12:17:15 UTC (rev 1259)
@@ -0,0 +1,61 @@
+monitoring module
+
+The 'monitoring' module gets information regarding calls from the core 
+and applications, and makes them available via DI methods, e.g. for 
+monitoring a SEMS server via XMLRPC (using xmlrpc2di), or any other 
+method that can acces DI.
+
+monitoring information is explicitely pushed to monitoring module via 
+DI calls (See ampi/MonitoringAPI.h for useful macros). Info is always 
+accessed via primary key, usually call-id. Info for every call is organized 
+as attribute-value pairs (one or more values), value can be any type 
+representable by AmArg (SEMS' variable type). 
+
+A call can be marked as finished. If not done before, this is done by 
+the session container when deleting a session (i.e., as the session 
+garbage collector in session container only runs every few seconds, 
+this can lag some seconds). Finished sessions can be listed and erased
+separately, to free used memory.
+
+Internally, the monitoring module keeps info in locked buckets of calls; 
+thus lock contention can be minimized by adapting NUM_LOG_BUCKETS 
+(Monitoring.h), which defaults to 16 (should be ok for most cases).
+
+monitoring must be compile time enabled in Makefile.defs by setting 
+ USE_MONITORING = yes
+and the monitoring module needs to be loaded.
+
+
+DI API
+------
+functions to log, e.g. from inside SEMS:
+ log(ID, key, value [, key, value [, key, value [...]]])  - set one or 
multiple AVPs
+ logAdd(ID, key, value)   - add a value to an AVPs
+
+functions to get log, e.g. from the outside:
+ list           - list IDs of calls
+ listFinished   - list IDs of finished calls
+ listUnfinished - list IDs of unfinished (active) calls
+ get            - get info for a specific call, parameter is the ID
+ erase          - erase info of a specific call, parameter is the ID (+free 
used memory)
+ clear          - erase info of all calls (+free used memory)
+ clearFinished  - erase info of all finished calls (+free used memory)
+
+(of course, log()/logAdd() functions can also be accessed via e.g. XMLRPC.)
+
+Performance
+-----------
+monitoring is not very much optimized for speed. Thus, especially by 
+using DI/AmArg, a lot of string comparisions and copying is performed. 
+If you measure any performance figures in real life usage comparing use 
+of monitoring vs. monitoring not enabled, please contribute 
+(mailto:[email protected], http://tracker.iptel.org) to be included in 
+this documentation.
+
+
+TODO
+----
+ o internal garbage collector, e.g. x secs after call is finished
+ o codec info
+ o more app specific info
+ o b2bua specific info

_______________________________________________
Semsdev mailing list
[email protected]
http://lists.iptel.org/mailman/listinfo/semsdev

Reply via email to