Hi,

Attached is a patch that adds a Calls-per-sec limiting capability to the SEMS core, similar to the maximum number of calls limiter. To make it work, I had to make some modifications to the CPS counter mechanism, now it has a fixed size (compile-time set), sliding time window that we use to calculate the CPS instead of calculating since the last query (it is arguably a more stable approach). Also, there is a "soft limit" that is intended to use by the applications: if the app uses async processing and detects that the incoming workload is too much for the current capacity (which can depend on for example the remote DB engine's load), it can slew it back to some percent of the current CPS; then when the circumstances are restored it can switch the CPS limit back to the configuration (or XMLRPC/stats interface) mandated value.

I'd like to have some review and comment before I commit so I can be sure that it doesn't break something!

Thank you

br

Szo
diff --git a/apps/xmlrpc2di/XMLRPC2DI.cpp b/apps/xmlrpc2di/XMLRPC2DI.cpp
index 7175f02..525f333 100644
--- a/apps/xmlrpc2di/XMLRPC2DI.cpp
+++ b/apps/xmlrpc2di/XMLRPC2DI.cpp
@@ -30,7 +30,7 @@
 #include "AmConfigReader.h"
 #include "AmUtils.h"
 #include "AmArg.h"
-#include "AmSession.h"
+#include "AmSessionContainer.h"
 #include "AmEventDispatcher.h"
 #include "TOXmlRpcClient.h"
 
@@ -351,7 +351,9 @@ XMLRPC2DIServer::XMLRPC2DIServer(unsigned int port,
     getcallsavg_method(s),
     getcallsmax_method(s),
     getcpsavg_method(s),
-    getcpsmax_method(s)
+    getcpsmax_method(s),
+    getcpslimit_method(s),
+    setcpslimit_method(s)
 
 
 {	
@@ -364,6 +366,8 @@ XMLRPC2DIServer::XMLRPC2DIServer(unsigned int port,
   INFO("XMLRPC Server: enabled builtin method 'get_callsmax'\n");
   INFO("XMLRPC Server: enabled builtin method 'get_cpsavg'\n");
   INFO("XMLRPC Server: enabled builtin method 'get_cpsmax'\n");
+  INFO("XMLRPC Server: enabled builtin method 'get_cpslimit'\n");
+  INFO("XMLRPC Server: enabled builtin method 'set_cpslimit'\n");
 
   // export all methods via 'di' function? 
   if (di_export) {
@@ -517,6 +521,30 @@ void XMLRPC2DIServerSetShutdownmodeMethod::execute(XmlRpcValue& params, XmlRpcVa
   result = "200 OK";
 }
 
+void XMLRPC2DIServerGetCPSLimitMethod::execute(XmlRpcValue& params, XmlRpcValue& result) {
+  int l = AmSessionContainer::instance()->getCPSLimit();
+  DBG("XMLRPC2DI: get_cpslimit returns %d\n", l);
+  result = l;
+}
+
+void XMLRPC2DIServerSetCPSLimitMethod::execute(XmlRpcValue& params, XmlRpcValue& result) {
+  AmSessionContainer::instance()->setCPSLimit((int)params[0]);
+  DBG("XMLRPC2DI: set cpslimit to %u.\n", AmSessionContainer::instance()->getCPSLimit());
+  result = "200 OK";
+}
+
+void XMLRPC2DIServerGetCpsavgMethod::execute(XmlRpcValue& params, XmlRpcValue& result) {
+  int l = AmSessionContainer::instance()->getAvgCPS();
+  DBG("XMLRPC2DI: get_cpsavg returns %d\n", l);
+  result = l;
+}
+
+void XMLRPC2DIServerGetCpsmaxMethod::execute(XmlRpcValue& params, XmlRpcValue& result) {
+  int l = AmSessionContainer::instance()->getMaxCPS();
+  DBG("XMLRPC2DI: get_cpsmax returns %d\n", l);
+  result = l;
+}
+
 #define XMLMETH_EXEC(_meth, _sess_func, _descr)				\
   void _meth::execute(XmlRpcValue& params, XmlRpcValue& result) {	\
   unsigned int res = AmSession::_sess_func();				\
@@ -526,8 +554,6 @@ void XMLRPC2DIServerSetShutdownmodeMethod::execute(XmlRpcValue& params, XmlRpcVa
 
 XMLMETH_EXEC(XMLRPC2DIServerGetCallsavgMethod, getAvgSessionNum, "get_callsavg");
 XMLMETH_EXEC(XMLRPC2DIServerGetCallsmaxMethod, getMaxSessionNum, "get_callsmax");
-XMLMETH_EXEC(XMLRPC2DIServerGetCpsavgMethod,   getAvgCPS, "get_cpsavg");
-XMLMETH_EXEC(XMLRPC2DIServerGetCpsmaxMethod,   getMaxCPS, "get_cpsmax");
 #undef XMLMETH_EXEC
 
 void XMLRPC2DIServerDIMethod::execute(XmlRpcValue& params, XmlRpcValue& result) {
diff --git a/apps/xmlrpc2di/XMLRPC2DI.h b/apps/xmlrpc2di/XMLRPC2DI.h
index fdc03b8..af808dc 100644
--- a/apps/xmlrpc2di/XMLRPC2DI.h
+++ b/apps/xmlrpc2di/XMLRPC2DI.h
@@ -67,6 +67,9 @@ DEF_XMLRPCSERVERMETHOD(XMLRPC2DIServerGetCallsmaxMethod, "get_callsmax");
 DEF_XMLRPCSERVERMETHOD(XMLRPC2DIServerGetCpsavgMethod,   "get_cpsavg");
 DEF_XMLRPCSERVERMETHOD(XMLRPC2DIServerGetCpsmaxMethod,   "get_cpsmax");
 
+DEF_XMLRPCSERVERMETHOD(XMLRPC2DIServerSetCPSLimitMethod, "set_cpslimit");
+DEF_XMLRPCSERVERMETHOD(XMLRPC2DIServerGetCPSLimitMethod, "get_cpslimit");
+
 
 class XMLRPC2DIServerDIMethod 
 :  public XmlRpcServerMethod { 
@@ -115,6 +118,9 @@ class XMLRPC2DIServer
   XMLRPC2DIServerGetCpsavgMethod    getcpsavg_method;
   XMLRPC2DIServerGetCpsmaxMethod    getcpsmax_method;
 
+  XMLRPC2DIServerSetCPSLimitMethod setcpslimit_method;
+  XMLRPC2DIServerGetCPSLimitMethod getcpslimit_method;
+
   XMLRPC2DIServerDIMethod*         di_method;
   void registerMethods(const std::string& iface);
 
diff --git a/core/AmConfig.cpp b/core/AmConfig.cpp
index 9e6eef5..0273ce0 100644
--- a/core/AmConfig.cpp
+++ b/core/AmConfig.cpp
@@ -40,7 +40,7 @@
 #include "log.h"
 #include "AmConfigReader.h"
 #include "AmUtils.h"
-#include "AmSession.h"
+#include "AmSessionContainer.h"
 #include "Am100rel.h"
 #include "sip/transport.h"
 #include "sip/ip_util.h"
@@ -110,6 +110,9 @@ unsigned int AmConfig::OptionsSessionLimit            = 0;
 unsigned int AmConfig::OptionsSessionLimitErrCode     = 503;
 string       AmConfig::OptionsSessionLimitErrReason   = "Server overload";
 
+unsigned int AmConfig::CPSLimitErrCode     = 503;
+string       AmConfig::CPSLimitErrReason   = "Server overload";
+
 bool         AmConfig::AcceptForkedDialogs     = true;
 
 bool         AmConfig::ShutdownMode            = false;
@@ -600,6 +603,20 @@ int AmConfig::readConfiguration()
     }
   }
 
+  if(cfg.hasParameter("cps_limit")){ 
+    unsigned int CPSLimit;
+    vector<string> limit = explode(cfg.getParameter("cps_limit"), ";");
+    if (limit.size() != 3) {
+      ERROR("invalid cps_limit specified.\n");
+    } else {
+      if (str2i(limit[0], CPSLimit) || str2i(limit[1], CPSLimitErrCode)) {
+	ERROR("invalid cps_limit specified.\n");
+      }
+      CPSLimitErrReason = limit[2];
+    }
+    AmSessionContainer::instance()->setCPSLimit(CPSLimit);
+  }
+
   if(cfg.hasParameter("accept_forked_dialogs"))
     AcceptForkedDialogs = !(cfg.getParameter("accept_forked_dialogs") == "no");
 
diff --git a/core/AmConfig.h b/core/AmConfig.h
index ae72918..62027d1 100644
--- a/core/AmConfig.h
+++ b/core/AmConfig.h
@@ -224,6 +224,9 @@ struct AmConfig
   static unsigned int OptionsSessionLimitErrCode;
   static string OptionsSessionLimitErrReason;
 
+  static unsigned int CPSLimitErrCode;
+  static string CPSLimitErrReason;
+
   static bool AcceptForkedDialogs;
 
   static bool ShutdownMode;
diff --git a/core/AmSession.cpp b/core/AmSession.cpp
index 12cd716..52a8752 100644
--- a/core/AmSession.cpp
+++ b/core/AmSession.cpp
@@ -54,9 +54,6 @@ volatile unsigned int AmSession::session_num = 0;
 AmMutex AmSession::session_num_mut;
 volatile unsigned int AmSession::max_session_num = 0;
 volatile unsigned long long AmSession::avg_session_num = 0;
-volatile unsigned long AmSession::max_cps = 0;
-volatile unsigned long AmSession::max_cps_counter = 0;
-volatile unsigned long AmSession::avg_cps = 0;
 
 struct timeval get_now() {
   struct timeval res;
@@ -65,8 +62,6 @@ struct timeval get_now() {
 }
 struct timeval avg_last_timestamp = get_now();
 struct timeval avg_first_timestamp = avg_last_timestamp;
-struct timeval cps_first_timestamp = avg_last_timestamp;
-struct timeval cps_max_timestamp = avg_last_timestamp;
 
 // AmSession methods
 
@@ -536,23 +531,6 @@ void AmSession::session_started() {
   //maximum session number
   if(session_num > max_session_num) max_session_num = session_num;
 
-  //cps average
-  ++avg_cps;
-
-  //cps maximum
-  ++max_cps_counter;
-  timersub(&now, &cps_max_timestamp, &delta);
-  unsigned long long d_usec = delta.tv_sec * 1000000ULL + delta.tv_usec;
-  if (delta.tv_sec > 0) {
-    //more than 1 sec has passed
-   unsigned long long secavg = ((max_cps_counter * 1000000ULL) + d_usec - 1) / d_usec;
-    if (max_cps < secavg) {
-      max_cps = secavg;
-    }
-    cps_max_timestamp = now;
-    max_cps_counter = 0;
-  }
-
   session_num_mut.unlock();
 }
 
@@ -609,57 +587,6 @@ unsigned int AmSession::getAvgSessionNum() {
   return res;
 }
 
-unsigned int AmSession::getMaxCPS()
-{
-  unsigned int res = 0;
-  struct timeval now, delta;
-  session_num_mut.lock();
-  gettimeofday(&now, NULL);
-  timersub(&now, &cps_max_timestamp, &delta);
-  unsigned long long d_usec = delta.tv_sec * 1000000ULL + delta.tv_usec;
-  if(delta.tv_sec > 0) {
-    //more than 1 sec has passed
-    //Round up
-    unsigned long long secavg = ((max_cps_counter * 1000000ULL) + d_usec - 1) / d_usec;
-    if (max_cps < secavg) {
-      max_cps = secavg;
-    }
-    cps_max_timestamp = now;
-    max_cps_counter = 0;
-  }
-
-  res = max_cps;
-  max_cps = 0;
-  session_num_mut.unlock();
-  return res;
-}
-
-unsigned int AmSession::getAvgCPS()
-{
-  unsigned int res = 0;
-  struct timeval now, delta;
-  unsigned long n_avg_cps;
-
-  session_num_mut.lock();
-  gettimeofday(&now, NULL);
-  timersub(&now, &cps_first_timestamp, &delta);
-  cps_first_timestamp = now;
-  n_avg_cps = avg_cps;
-  avg_cps = 0;
-  session_num_mut.unlock();
-
-  unsigned long long d_usec = delta.tv_sec * 1000000ULL + delta.tv_usec;
-  if(!d_usec) {
-    res = 0;
-    WARN("zero delta!\n");
-  } else {
-    //Round up
-    res  = (unsigned int)(((n_avg_cps * 1000000ULL) +  d_usec - 1) / d_usec);
-  }
-  
-  return res;
-}
-
 void AmSession::setInbandDetector(Dtmf::InbandDetectorType t)
 { 
   m_dtmfDetector.setInbandDetector(t, RTPStream()->getSampleRate()); 
diff --git a/core/AmSession.h b/core/AmSession.h
index 6eb7ad0..6a4c930 100644
--- a/core/AmSession.h
+++ b/core/AmSession.h
@@ -120,9 +120,6 @@ private:
   static volatile unsigned int session_num;
   static volatile unsigned int max_session_num;
   static volatile unsigned long long avg_session_num;
-  static volatile unsigned long max_cps;
-  static volatile unsigned long max_cps_counter;
-  static volatile unsigned long avg_cps;
   static AmMutex session_num_mut;
 
   friend class AmMediaProcessor;
@@ -404,14 +401,6 @@ public:
    * Gets the average of running sessions since last query
    */
   static unsigned int getAvgSessionNum();
-  /**
-   * Gets the maximum of calls per second since last query
-   */
-  static unsigned int getMaxCPS();
-  /**
-   * Gets the timeaverage of calls per second since last query
-   */
-  static unsigned int getAvgCPS();
 
   /* ----         DTMF                          ---- */
   /**
diff --git a/core/AmSessionContainer.cpp b/core/AmSessionContainer.cpp
index 4ea4294..db0f932 100644
--- a/core/AmSessionContainer.cpp
+++ b/core/AmSessionContainer.cpp
@@ -44,8 +44,8 @@ AmSessionContainer* AmSessionContainer::_instance=NULL;
 _MONITORING_DECLARE_INTERFACE(AmSessionContainer);
 
 AmSessionContainer::AmSessionContainer()
-  : _run_cond(false), _container_closed(false), enable_unclean_shutdown(false)
-      
+  : _run_cond(false), _container_closed(false), enable_unclean_shutdown(false),
+  CPSLimit(0)
 {
 }
 
@@ -377,6 +377,102 @@ bool AmSessionContainer::postEvent(const string& local_tag,
 
 }
 
+void AmSessionContainer::setCPSLimit(unsigned int limit)
+{
+  AmLock lock(cps_mut);
+  CPSLimit = CPSHardLimit = limit;
+}
+
+void AmSessionContainer::setCPSSoftLimit(unsigned int percent)
+{
+  if(!percent) {
+    CPSLimit = CPSHardLimit;
+    return;
+  }
+
+  struct timeval tv, res;
+  gettimeofday(&tv,0);
+
+  AmLock lock(cps_mut);
+
+  while (cps_queue.size()) {
+    timersub(&tv, &cps_queue.front(), &res);
+    if (res.tv_sec >= CPS_SAMPLERATE) {
+      cps_queue.pop();
+    }   
+    else {
+      break;
+    }
+  }
+  CPSLimit = (percent / 100) * ((float)cps_queue.size() / CPS_SAMPLERATE);
+}
+
+unsigned int AmSessionContainer::getCPSLimit()
+{
+  AmLock lock(cps_mut);
+  return CPSLimit;
+}
+
+unsigned int AmSessionContainer::getAvgCPS()
+{
+  struct timeval tv, res;
+  gettimeofday(&tv,0);
+
+  AmLock lock(cps_mut);
+
+  while (cps_queue.size()) {
+    timersub(&tv, &cps_queue.front(), &res);
+    if (res.tv_sec >= CPS_SAMPLERATE) {
+      cps_queue.pop();
+    }   
+    else {
+      break;
+    }
+  }
+
+  return (float)cps_queue.size() / CPS_SAMPLERATE;
+}
+
+unsigned int AmSessionContainer::getMaxCPS()
+{
+  AmLock lock(cps_mut);
+  unsigned int res = max_cps;
+  max_cps = 0;
+  return res;
+}
+
+bool AmSessionContainer::check_and_add_cps()
+{
+  struct timeval tv, res;
+  gettimeofday(&tv,0);
+
+  AmLock lock(cps_mut);
+
+  while (cps_queue.size()) {
+    timersub(&tv, &cps_queue.front(), &res);
+    if (res.tv_sec >= CPS_SAMPLERATE) {
+      cps_queue.pop();
+    }   
+    else {
+      break;
+    }
+  }
+
+  unsigned int cps = (float)cps_queue.size() / CPS_SAMPLERATE;
+  if (cps > max_cps) {
+    max_cps = cps;
+  }
+
+  if( CPSLimit && cps > CPSLimit ){
+    DBG("cps_limit %d reached. Not creating session.\n", CPSLimit);
+    return true;
+  }
+  else {
+    cps_queue.push(tv);
+    return false;
+  }
+}
+
 AmSession* AmSessionContainer::createSession(const AmSipRequest& req,
 					     string& app_name,
 					     AmArg* session_params)
@@ -401,6 +497,12 @@ AmSession* AmSessionContainer::createSession(const AmSipRequest& req,
       return NULL;
   }
 
+  if (check_and_add_cps()) {
+      AmSipDialog::reply_error(req,AmConfig::CPSLimitErrCode, 
+			       AmConfig::CPSLimitErrReason);
+      return NULL;
+  }
+
   AmSessionFactory* session_factory = NULL;
   if(!app_name.empty())
       session_factory = AmPlugIn::instance()->getFactory4App(app_name);
diff --git a/core/AmSessionContainer.h b/core/AmSessionContainer.h
index 8da27e7..e891d8b 100644
--- a/core/AmSessionContainer.h
+++ b/core/AmSessionContainer.h
@@ -82,6 +82,22 @@ class AmSessionContainer : public AmThread
 
   bool clean_sessions();
 
+  typedef std::queue<struct timeval> TimevalQueue;
+
+  /** Container for cps timevals*/
+  TimevalQueue cps_queue;
+  /** Maximum cps since the lasd getMaxCPS()*/
+  unsigned int max_cps;
+  /** Mutex to protect the cps container */
+  AmMutex      cps_mut;
+
+  enum { CPS_SAMPLERATE = 5 };
+
+  unsigned int CPSLimit;
+  unsigned int CPSHardLimit;
+
+  bool check_and_add_cps();
+
  public:
   static AmSessionContainer* instance();
 
@@ -165,6 +181,27 @@ class AmSessionContainer : public AmThread
   /** enable unclean shutdown (will not broadcastShutdown event) */
   void enableUncleanShutdown();
 
+  /** Set the maximum number of calls per second to be accepted */
+  void setCPSLimit(unsigned int limit);
+
+  /** Set the maximum number of calls per second to be accepted as a percent 
+   * of the current CPS. Intented to be used by the components. 0 means turning off
+   * the soft limit.
+   */
+  void setCPSSoftLimit(unsigned int percent);
+
+  /** Return the maximum number of calls per second to be accepted */
+  unsigned int getCPSLimit();
+
+  /**
+   * Gets the timeaverage of calls per second in the last CPS_SAMPLERATE sec window
+   */
+  unsigned int getAvgCPS();
+  /**
+   * Gets the maximum of calls per second since last query
+   */
+  unsigned int getMaxCPS();
+
   _MONITORING_DEFINE_INTERFACE;
 
 };
diff --git a/core/etc/sems.conf.cmake b/core/etc/sems.conf.cmake
index c4f900f..fbd165d 100644
--- a/core/etc/sems.conf.cmake
+++ b/core/etc/sems.conf.cmake
@@ -312,6 +312,16 @@ loglevel=2
 # Example:
 #  options_session_limit="900;503;Warning, server soon overloaded"
 
+# optional parameter: cps_limit=<limit>;<err code>;<err reason>
+# 
+# - this sets a maximum calls per sec limit. If that limit is 
+#   reached, no further calls are accepted, but the error reply 
+#   with err code/err reason is sent out.
+# 
+# Default: 0 (None)
+#
+# Example:
+#  cps_limit="100;503;Server overload"
 
 # optional parameter: dead_rtp_time=<unsigned int>
 #
diff --git a/core/etc/sems.conf.sample b/core/etc/sems.conf.sample
index 756e611..f9d0c41 100644
--- a/core/etc/sems.conf.sample
+++ b/core/etc/sems.conf.sample
@@ -353,6 +353,16 @@ loglevel=2
 #
 # Default: shutdown_mode_reply="503 Server shutting down"
 
+# optional parameter: cps_limit=<limit>;<err code>;<err reason>
+# 
+# - this sets a maximum calls per sec limit. If that limit is 
+#   reached, no further calls are accepted, but the error reply 
+#   with err code/err reason is sent out.
+# 
+# Default: 0 (None)
+#
+# Example:
+#  cps_limit="100;503;Server overload"
 
 ############################################################
 # tuning
@@ -603,4 +613,4 @@ accept_fr_without_totag=yes
 #
 # Default: 4
 #
-# sip_server_threads=8
\ No newline at end of file
+# sip_server_threads=8
diff --git a/core/plug-in/stats/StatsUDPServer.cpp b/core/plug-in/stats/StatsUDPServer.cpp
index e551f47..a9252d8 100644
--- a/core/plug-in/stats/StatsUDPServer.cpp
+++ b/core/plug-in/stats/StatsUDPServer.cpp
@@ -291,11 +291,13 @@ int StatsUDPServer::execute(char* msg_buf, string& reply,
       "which                              -  print available commands\n"
       "set_loglevel <loglevel>            -  set log level\n"
       "get_loglevel                       -  get log level\n"
+      "set_cpslimit <limit>               -  set maximum allowed CPS\n"
+      "get_cpslimit                       -  get maximum allowed CPS\n"
       "set_shutdownmode <1 or 0>          -  turns on and off shutdown mode\n"
       "get_shutdownmode                   -  returns the shutdown mode's current state\n"
       "get_callsavg                       -  get number of active calls (average since the last query)\n"
       "get_callsmax                       -  get maximum of active calls since the last query\n"
-      "get_cpsavg                         -  get calls per second (average since the last query)\n"
+      "get_cpsavg                         -  get calls per second (5 sec average)\n"
       "get_cpsmax                         -  get maximum of CPS since the last query\n"
 
       "DI <factory> <function> (<args>)*  -  invoke DI command\n"
@@ -313,6 +315,16 @@ int StatsUDPServer::execute(char* msg_buf, string& reply,
 	reply= "loglevel set to "+int2str(log_level)+".\n";
     }
 
+    else if (cmd_str.substr(4, 8) == "cpslimit") {
+      int tmp;
+      if(sscanf(&cmd_str.c_str()[13],"%u",&tmp) != 1)
+        reply= "invalid CPS limit\n";
+      else {
+        sc->setCPSLimit(tmp);
+        reply= "CPS limit set to "+int2str(sc->getCPSLimit())+".\n";
+      }
+    }
+
     else if (cmd_str.substr(4, 12) == "shutdownmode") {
       int tmp;
       if(sscanf(&cmd_str.c_str()[17],"%u",&tmp) != 1)
@@ -344,10 +356,12 @@ int StatsUDPServer::execute(char* msg_buf, string& reply,
       reply = "Average active calls: " + int2str(AmSession::getAvgSessionNum()) + "\n";
     else if(cmd_str.substr(4, 8) == "callsmax")
       reply = "Maximum active calls: " + int2str(AmSession::getMaxSessionNum()) + "\n";
-    else if(cmd_str.substr(4, 8) == "cpsavg")
-      reply = "Average calls per second: " + int2str(AmSession::getAvgCPS()) + "\n";
-    else if(cmd_str.substr(4, 8) == "cpsmax")
-      reply = "Maximum calls per second: " + int2str(AmSession::getMaxCPS()) + "\n";
+    else if(cmd_str.substr(4, 6) == "cpsavg")
+      reply = "Average calls per second: " + int2str(sc->getAvgCPS()) + "\n";
+    else if(cmd_str.substr(4, 6) == "cpsmax")
+      reply = "Maximum calls per second: " + int2str(sc->getMaxCPS()) + "\n";
+    else if(cmd_str.substr(4, 8) == "cpslimit")
+      reply = "CPS limit: " + int2str(sc->getCPSLimit()) + "\n";
 
     else if (cmd_str.substr(4, 12) == "shutdownmode") {
       if(AmConfig::ShutdownMode)
_______________________________________________
Semsdev mailing list
[email protected]
http://lists.iptel.org/mailman/listinfo/semsdev

Reply via email to