Author: sayer
Date: 2010-04-15 14:22:30 +0200 (Thu, 15 Apr 2010)
New Revision: 1782
Modified:
trunk/Makefile.defs
trunk/core/AmConfig.cpp
trunk/core/AmConfig.h
trunk/core/AmEventQueue.cpp
trunk/core/AmEventQueue.h
trunk/core/AmSession.cpp
trunk/core/AmSession.h
trunk/core/AmSessionContainer.cpp
trunk/core/etc/sems.conf.sample
trunk/core/sems.cpp
trunk/core/sems.h
Log:
Introduces optional (compile-time) threadpool for signaling support.
to use it, set USE_THREADPOOL in Makefile.defs and configure thread pool
size with session_processor_threads= parameter in sems.conf :
+# compile with session thread pool support?
+# use this for very high concurrent call count
+# applications (e.g. for signaling only)
+# if compiled with thread pool, there will be a
+# thread pool of configurable size processing the
+# signaling and application logic of the calls.
+# if compiled without thread pool support, every
+# session will have its own thread.
+#
+#USE_THREADPOOL = yes
Modified: trunk/Makefile.defs
===================================================================
--- trunk/Makefile.defs 2010-04-13 15:55:00 UTC (rev 1781)
+++ trunk/Makefile.defs 2010-04-15 12:22:30 UTC (rev 1782)
@@ -22,6 +22,18 @@
# -DSUPPORT_IPV6 \
# -DNO_THREADID_LOG \
+
+# compile with session thread pool support?
+# use this for very high concurrent call count
+# applications (e.g. for signaling only)
+# if compiled with thread pool, there will be a
+# thread pool of configurable size processing the
+# signaling and application logic of the calls.
+# if compiled without thread pool support, every
+# session will have its own thread.
+#
+#USE_THREADPOOL = yes
+
# compile with spandsp DTMF detection? see soft-switch.org
# this needs a fairly new version of spandsp - tested with 0.0.4pre11
# will not work with spandsp 0.0.2 .
@@ -62,6 +74,10 @@
OS := $(shell $(CC) $(EXTRA_CFLAGS) -o $(GETOS) $(GETOS).c && $(GETOS))
ARCH := $(shell $(CC) $(EXTRA_CFLAGS) -o $(GETARCH) $(GETARCH).c && $(GETARCH))
+ifdef USE_THREADPOOL
+CPPFLAGS += -DSESSION_THREADPOOL
+endif
+
ifdef USE_SPANDSP
ifneq ($(spandsp_defs), 1)
spandsp_defs=1
Modified: trunk/core/AmConfig.cpp
===================================================================
--- trunk/core/AmConfig.cpp 2010-04-13 15:55:00 UTC (rev 1781)
+++ trunk/core/AmConfig.cpp 2010-04-15 12:22:30 UTC (rev 1782)
@@ -51,6 +51,7 @@
string AmConfig::PrefixSep = PREFIX_SEPARATOR;
int AmConfig::RtpLowPort = RTP_LOWPORT;
int AmConfig::RtpHighPort = RTP_HIGHPORT;
+int AmConfig::SessionProcessorThreads = NUM_SESSION_PROCESSORS;
int AmConfig::MediaProcessorThreads = NUM_MEDIA_PROCESSORS;
int AmConfig::LocalSIPPort = 5060;
string AmConfig::LocalSIPIP = "";
@@ -135,6 +136,13 @@
return 1;
}
+int AmConfig::setSessionProcessorThreads(const string& th) {
+ if(sscanf(th.c_str(),"%u",&SessionProcessorThreads) != 1) {
+ return 0;
+ }
+ return 1;
+}
+
int AmConfig::setMediaProcessorThreads(const string& th) {
if(sscanf(th.c_str(),"%u",&MediaProcessorThreads) != 1) {
return 0;
@@ -323,6 +331,25 @@
}
}
+ if(cfg.hasParameter("session_processor_threads")){
+#ifdef SESSION_THREADPOOL
+
if(!setSessionProcessorThreads(cfg.getParameter("session_processor_threads"))){
+ ERROR("invalid session_processor_threads value specified\n");
+ return -1;
+ }
+ if (SessionProcessorThreads<1) {
+ ERROR("invalid session_processor_threads value specified."
+ " need at least one thread\n");
+ return -1;
+ }
+#else
+ WARN("session_processor_threads specified in sems.conf,\n");
+ WARN("but SEMS is compiled without SESSION_THREADPOOL support.\n");
+ WARN("set USE_THREADPOOL in Makefile.defs to enable session thread
pool.\n");
+ WARN("SEMS will start now, but every call will have its own thread.\n");
+#endif
+ }
+
if(cfg.hasParameter("media_processor_threads")){
if(!setMediaProcessorThreads(cfg.getParameter("media_processor_threads"))){
ERROR("invalid media_processor_threads value specified");
@@ -330,6 +357,7 @@
}
}
+
// single codec in 200 OK
if(cfg.hasParameter("single_codec_in_ok")){
SingleCodecInOK = (cfg.getParameter("single_codec_in_ok") == "yes");
Modified: trunk/core/AmConfig.h
===================================================================
--- trunk/core/AmConfig.h 2010-04-13 15:55:00 UTC (rev 1781)
+++ trunk/core/AmConfig.h 2010-04-15 12:22:30 UTC (rev 1782)
@@ -75,7 +75,9 @@
static int RtpLowPort;
/** Highest local RTP port */
static int RtpHighPort;
- /** number of session scheduler threads */
+ /** number of session (signaling/application) processor threads */
+ static int SessionProcessorThreads;
+ /** number of media processor threads */
static int MediaProcessorThreads;
/** the interface SIP requests are sent from - needed for registrar_client */
static string LocalSIPIP;
@@ -153,6 +155,8 @@
static int setFork(const string& fork);
/** Setter for parameter stderr, returns 0 on invalid value */
static int setStderr(const string& s);
+ /** Setter for parameter SessionProcessorThreads, returns 0 on invalid value
*/
+ static int setSessionProcessorThreads(const string& th);
/** Setter for parameter MediaProcessorThreads, returns 0 on invalid value */
static int setMediaProcessorThreads(const string& th);
/** Setter for parameter DeadRtpTime, returns 0 on invalid value */
Modified: trunk/core/AmEventQueue.cpp
===================================================================
--- trunk/core/AmEventQueue.cpp 2010-04-13 15:55:00 UTC (rev 1781)
+++ trunk/core/AmEventQueue.cpp 2010-04-15 12:22:30 UTC (rev 1782)
@@ -29,8 +29,11 @@
#include "log.h"
#include "AmConfig.h"
+#include <typeinfo>
AmEventQueue::AmEventQueue(AmEventHandler* handler)
- : handler(handler),ev_pending(false)
+ : handler(handler),
+ wakeup_handler(NULL),
+ ev_pending(false)
{
}
@@ -53,6 +56,8 @@
if(event)
ev_queue.push(event);
ev_pending.set(true);
+ if (NULL != wakeup_handler)
+ wakeup_handler->notify(this);
m_queue.unlock();
if (AmConfig::LogEvents)
@@ -70,10 +75,12 @@
m_queue.unlock();
if (AmConfig::LogEvents)
- DBG("before processing event\n");
+ DBG("before processing event (%s)\n",
+ typeid(*event).name());
handler->process(event);
if (AmConfig::LogEvents)
- DBG("event processed\n");
+ DBG("event processed (%s)\n",
+ typeid(*event).name());
delete event;
m_queue.lock();
}
@@ -117,3 +124,10 @@
m_queue.unlock();
}
+void AmEventQueue::setEventNotificationSink(AmEventNotificationSink*
+ _wakeup_handler) {
+ // locking actually not necessary - if replacing pointer is atomic
+ m_queue.lock();
+ wakeup_handler = _wakeup_handler;
+ m_queue.unlock();
+}
Modified: trunk/core/AmEventQueue.h
===================================================================
--- trunk/core/AmEventQueue.h 2010-04-13 15:55:00 UTC (rev 1781)
+++ trunk/core/AmEventQueue.h 2010-04-15 12:22:30 UTC (rev 1782)
@@ -40,6 +40,16 @@
virtual void postEvent(AmEvent*)=0;
};
+class AmEventQueue;
+/** a receiver for notifications about
+ the fact that events are pending */
+class AmEventNotificationSink
+{
+ public:
+ virtual ~AmEventNotificationSink() { }
+ virtual void notify(AmEventQueue* sender) = 0;
+};
+
/**
* \brief Asynchronous event queue implementation
*
@@ -52,12 +62,13 @@
{
protected:
AmEventHandler* handler;
+ AmEventNotificationSink* wakeup_handler;
std::queue<AmEvent*> ev_queue;
AmMutex m_queue;
AmCondition<bool> ev_pending;
public:
- AmEventQueue(AmEventHandler*);
+ AmEventQueue(AmEventHandler* handler);
virtual ~AmEventQueue();
void postEvent(AmEvent*);
@@ -65,6 +76,8 @@
void waitForEvent();
void wakeup();
void processSingleEvent();
+
+ void setEventNotificationSink(AmEventNotificationSink* _wakeup_handler);
};
#endif
Modified: trunk/core/AmSession.cpp
===================================================================
--- trunk/core/AmSession.cpp 2010-04-13 15:55:00 UTC (rev 1781)
+++ trunk/core/AmSession.cpp 2010-04-15 12:22:30 UTC (rev 1782)
@@ -33,6 +33,7 @@
#include "AmPlugIn.h"
#include "AmApi.h"
#include "AmSessionContainer.h"
+#include "AmSessionProcessor.h"
#include "AmMediaProcessor.h"
#include "AmDtmfDetector.h"
#include "AmPlayoutBuffer.h"
@@ -57,17 +58,22 @@
AmSession::AmSession()
- : AmEventQueue(this), // AmDialogState(),
+ : AmEventQueue(this),
dlg(this),
detached(true),
sess_stopped(false),rtp_str(this),negotiate_onreply(false),
input(0), output(0), local_input(0), local_output(0),
m_dtmfDetector(this), m_dtmfEventQueue(&m_dtmfDetector),
m_dtmfDetectionEnabled(true),
- accept_early_session(false)
+ accept_early_session(false),
+ processing_status(SESSION_PROCESSING_EVENTS)
#ifdef WITH_ZRTP
, zrtp_session(NULL), zrtp_audio(NULL), enable_zrtp(true)
#endif
+
+#ifdef SESSION_THREADPOOL
+ , _pid(this)
+#endif
{
use_local_audio[AM_AUDIO_IN] = false;
use_local_audio[AM_AUDIO_OUT] = false;
@@ -83,6 +89,8 @@
#ifdef WITH_ZRTP
AmZRTP::freeSession(zrtp_session);
#endif
+
+ DBG("AmSession destructor finished\n");
}
void AmSession::setCallgroup(const string& cg) {
@@ -296,8 +304,41 @@
sdp.genResponse(advertisedIP(), rtp_str.getLocalPort(), *sdp_reply,
AmConfig::SingleCodecInOK);
}
-void AmSession::run()
-{
+#ifdef SESSION_THREADPOOL
+void AmSession::start() {
+ AmSessionProcessorThread* processor_thread =
+ AmSessionProcessor::getProcessorThread();
+ if (NULL == processor_thread)
+ throw string("no processing thread available");
+
+ // have the thread register and start us
+ processor_thread->startSession(this);
+}
+
+bool AmSession::is_stopped() {
+ return processing_status == SESSION_ENDED_DISCONNECTED;
+}
+#else
+// in this case every session has its own thread
+// - this is the main processing loop
+void AmSession::run() {
+ DBG("startup session\n");
+ if (!startup())
+ return;
+
+ DBG("running session event loop\n");
+ while (true) {
+ waitForEvent();
+ if (!processingCycle())
+ break;
+ }
+
+ DBG("session event loop ended, finalizing session\n");
+ finalize();
+}
+#endif
+
+bool AmSession::startup() {
#ifdef WITH_ZRTP
if (enable_zrtp) {
zrtp_session = (zrtp_conn_ctx_t*)malloc(sizeof(zrtp_conn_ctx_t));
@@ -315,7 +356,7 @@
&profile,
AmZRTP::zrtp_instance_zid) )
{
ERROR("initializing ZRTP session context\n");
- return;
+ return false;
}
zrtp_audio = zrtp_attach_stream(zrtp_session, rtp_str.get_ssrc());
@@ -323,7 +364,7 @@
if (NULL == zrtp_audio) {
ERROR("attaching zrtp stream.\n");
- return;
+ return false;
}
DBG("initialized ZRTP session context OK\n");
@@ -340,32 +381,7 @@
onStart();
- while (!sess_stopped.get() ||
- (dlg.getStatus() == AmSipDialog::Disconnecting)// ||
- // (dlg.getUACTransPending())
- ){
-
- waitForEvent();
- processEvents();
-
- DBG("%s dlg.getUACTransPending() = %i\n",
- dlg.callid.c_str(),dlg.getUACTransPending());
- }
-
- if ( dlg.getStatus() != AmSipDialog::Disconnected ) {
-
- DBG("dlg '%s' not terminated: sending bye\n",dlg.callid.c_str());
- if(dlg.bye() == 0){
- while ( dlg.getStatus() != AmSipDialog::Disconnected ){
- waitForEvent();
- processEvents();
- }
- }
- else {
- WARN("failed to terminate call properly\n");
- }
- }
- }
+ }
catch(const AmSession::Exception& e){ throw e; }
catch(const string& str){
ERROR("%s\n",str.c_str());
@@ -374,28 +390,128 @@
catch(...){
throw AmSession::Exception(500,"unexpected exception.");
}
+
+ } catch(const AmSession::Exception& e){
+ ERROR("%i %s\n",e.code,e.reason.c_str());
+ onBeforeDestroy();
+ destroy();
+
+ session_num_mut.lock();
+ session_num--;
+ session_num_mut.unlock();
+
+ return false;
}
- catch(const AmSession::Exception& e){
+
+ return true;
+}
+
+bool AmSession::processEventsCatchExceptions() {
+ try {
+ try {
+ processEvents();
+ }
+ catch(const AmSession::Exception& e){ throw e; }
+ catch(const string& str){
+ ERROR("%s\n",str.c_str());
+ throw AmSession::Exception(500,"unexpected exception.");
+ }
+ catch(...){
+ throw AmSession::Exception(500,"unexpected exception.");
+ }
+ } catch(const AmSession::Exception& e){
ERROR("%i %s\n",e.code,e.reason.c_str());
+ return false;
}
+ return true;
+}
+/** one cycle of the event processing loop.
+ this should be called until it returns false. */
+bool AmSession::processingCycle() {
+
+ switch (processing_status) {
+ case SESSION_PROCESSING_EVENTS:
+ {
+ if (!processEventsCatchExceptions())
+ return false; // exception occured, stop processing
+
+ int dlg_status = dlg.getStatus();
+ bool s_stopped = sess_stopped.get();
+
+ DBG("%s/%s: %s, %s, %i UACTransPending\n",
+ dlg.callid.c_str(),getLocalTag().c_str(),
+ AmSipDialog::status2str[dlg_status],
+ s_stopped?"stopped":"running",
+ dlg.getUACTransPending());
+
+ // session running?
+ if (!s_stopped || (dlg_status == AmSipDialog::Disconnecting))
+ return true;
+
+ // session stopped?
+ if (s_stopped &&
+ (dlg_status == AmSipDialog::Disconnected)) {
+ processing_status = SESSION_ENDED_DISCONNECTED;
+ return false;
+ }
+
+ // wait for session's status to be disconnected
+ // todo: set some timer to tear down the session anyway,
+ // or react properly on negative reply to BYE (e.g. timeout)
+ processing_status = SESSION_WAITING_DISCONNECTED;
+
+ if (dlg_status != AmSipDialog::Disconnected) {
+ // app did not send BYE - do that for the app
+ if (dlg.bye() != 0) {
+ processing_status = SESSION_ENDED_DISCONNECTED;
+ // BYE sending failed - don't wait for dlg status to go disconnected
+ return false;
+ }
+ }
+
+ return true;
+
+ } break;
+
+ case SESSION_WAITING_DISCONNECTED: {
+ // processing events until dialog status is Disconnected
+
+ if (!processEventsCatchExceptions()) {
+ processing_status = SESSION_ENDED_DISCONNECTED;
+ return false; // exception occured, stop processing
+ }
+ bool res = dlg.getStatus() != AmSipDialog::Disconnected;
+ if (!res)
+ processing_status = SESSION_ENDED_DISCONNECTED;
+ return res;
+ }; break;
+
+ default: {
+ ERROR("unknown session processing state\n");
+ return false; // stop processing
+ }
+ }
+}
+
+void AmSession::finalize() {
+ DBG("running finalize sequence...\n");
onBeforeDestroy();
destroy();
-
+
session_num_mut.lock();
session_num--;
session_num_mut.unlock();
-
- // wait at least until session is out of RtpScheduler
- //detached.wait_for();
DBG("session is stopped.\n");
}
-
-void AmSession::on_stop()
+#ifndef SESSION_THREADPOOL
+void AmSession::on_stop()
+#else
+void AmSession::stop()
+#endif
{
- //sess_stopped.set(true);
- DBG("AmSession::on_stop()\n");
+ DBG("AmSession::stop()\n");
if (!getDetached())
AmMediaProcessor::instance()->clearSession(this);
Modified: trunk/core/AmSession.h
===================================================================
--- trunk/core/AmSession.h 2010-04-13 15:55:00 UTC (rev 1781)
+++ trunk/core/AmSession.h 2010-04-15 12:22:30 UTC (rev 1782)
@@ -66,10 +66,13 @@
*
* The session is identified by Call-ID, From-Tag and To-Tag.
*/
-class AmSession : public AmThread,
- public AmEventQueue,
- public AmEventHandler,
- public AmSipDialogEventHandler
+class AmSession :
+#ifndef SESSION_THREADPOOL
+ public AmThread,
+#endif
+ public AmEventQueue,
+ public AmEventHandler,
+ public AmSipDialogEventHandler
{
AmMutex audio_mut;
// remote (to/from RTP) audio inout
@@ -95,12 +98,41 @@
AmDtmfEventQueue m_dtmfEventQueue;
bool m_dtmfDetectionEnabled;
+ enum ProcessingStatus {
+ SESSION_PROCESSING_EVENTS,
+ SESSION_WAITING_DISCONNECTED,
+ SESSION_ENDED_DISCONNECTED
+ };
+ ProcessingStatus processing_status;
+
+#ifndef SESSION_THREADPOOL
/** @see AmThread::run() */
void run();
-
- /** @see AmThread::on_stop() */
void on_stop();
+#else
+public:
+ void start();
+ bool is_stopped();
+private:
+ void stop();
+ void* _pid;
+#endif
+
+ /** @return whether startup was successful */
+ bool startup();
+
+ /** @return whether session continues running */
+ bool processingCycle();
+
+ /** clean up session */
+ void finalize();
+
+ /** process pending events,
+ @return whether everything went smoothly */
+ bool processEventsCatchExceptions();
+
+
AmCondition<bool> sess_stopped;
AmCondition<bool> detached;
@@ -111,6 +143,7 @@
friend class AmMediaProcessorThread;
friend class AmSessionContainer;
friend class AmSessionFactory;
+ friend class AmSessionProcessorThread;
protected:
AmSdp sdp;
Modified: trunk/core/AmSessionContainer.cpp
===================================================================
--- trunk/core/AmSessionContainer.cpp 2010-04-13 15:55:00 UTC (rev 1781)
+++ trunk/core/AmSessionContainer.cpp 2010-04-15 12:22:30 UTC (rev 1782)
@@ -90,11 +90,11 @@
MONITORING_MARK_FINISHED(cur_session->getLocalTag().c_str());
- DBG("session %p has been destroyed'\n",(void*)cur_session->_pid);
+ DBG("session [%p] has been destroyed\n",(void*)cur_session->_pid);
delete cur_session;
}
else {
- DBG("session %p still running\n",(void*)cur_session->_pid);
+ DBG("session [%p] still running\n",(void*)cur_session->_pid);
n_sessions.push(cur_session);
}
@@ -191,9 +191,10 @@
}
AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg*
session_params) {
+
AmSession* session = NULL;
try {
- if((session = createSession(req, session_params)) != 0){
+ if((session = createSession(req, session_params)) != 0) {
session->dlg.updateStatusFromLocalRequest(req); // sets local tag as well
session->setCallgroup(req.from_tag);
@@ -228,9 +229,19 @@
INFO("Starting UAC session %s app %s\n",
session->getLocalTag().c_str(), req.cmd.c_str());
}
+
+ try {
+ session->start();
+ } catch (const string& err) {
+ AmEventDispatcher::instance()->
+ delEventQueue(session->getLocalTag(),
+ session->getCallID(),
+ session->getRemoteTag());
+
+ delete session;
+ throw;
+ }
- session->start();
-
}
}
catch(const AmSession::Exception& e){
@@ -280,7 +291,17 @@
"to", req.to.c_str(),
"ruri", req.r_uri.c_str());
- session->start();
+ try {
+ session->start();
+ } catch (const string& err) {
+ AmEventDispatcher::instance()->
+ delEventQueue(session->getLocalTag(),
+ session->getCallID(),
+ session->getRemoteTag());
+
+ delete session;
+ throw;
+ }
session->postEvent(new AmSipRequestEvent(req));
}
Modified: trunk/core/etc/sems.conf.sample
===================================================================
--- trunk/core/etc/sems.conf.sample 2010-04-13 15:55:00 UTC (rev 1781)
+++ trunk/core/etc/sems.conf.sample 2010-04-15 12:22:30 UTC (rev 1782)
@@ -230,6 +230,16 @@
############################################################
# tuning
+# optional parameter: session_processor_threads=<num_value>
+#
+# - controls how many threads should be created that
+# process the application logic and in-dialog signaling.
+# This is only available if compiled with threadpool support!
+# (set USE_THREADPOOL in Makefile.defs)
+# Defaults to 10
+#
+# session_processor_threads=50
+
# optional parameter: media_processor_threads=<num_value>
#
# - controls how many threads should be created that
Modified: trunk/core/sems.cpp
===================================================================
--- trunk/core/sems.cpp 2010-04-13 15:55:00 UTC (rev 1781)
+++ trunk/core/sems.cpp 2010-04-15 12:22:30 UTC (rev 1782)
@@ -30,7 +30,7 @@
#include "AmConfig.h"
#include "AmPlugIn.h"
#include "AmSessionContainer.h"
-//#include "AmServer.h"
+#include "AmSessionProcessor.h"
#include "AmMediaProcessor.h"
#include "AmRtpReceiver.h"
#include "AmEventDispatcher.h"
@@ -99,7 +99,6 @@
clean_up_mut.lock();
if(need_clean.get()) {
-
need_clean.set(false);
AmSessionContainer::dispose();
@@ -111,7 +110,6 @@
AmMediaProcessor::dispose();
AmEventDispatcher::dispose();
-
}
clean_up_mut.unlock();
@@ -413,6 +411,11 @@
DBG("Starting session container\n");
AmSessionContainer::instance()->start();
+
+#ifdef SESSION_THREADPOOL
+ DBG("starting session processor threads\n");
+ AmSessionProcessor::addThreads(AmConfig::SessionProcessorThreads);
+#endif
DBG("Starting media processor\n");
AmMediaProcessor::instance()->init();
Modified: trunk/core/sems.h
===================================================================
--- trunk/core/sems.h 2010-04-13 15:55:00 UTC (rev 1781)
+++ trunk/core/sems.h 2010-04-15 12:22:30 UTC (rev 1782)
@@ -52,6 +52,9 @@
#define SESSION_EXPIRES 60 // seconds
#define MINIMUM_TIMER 5 //seconds
+// threads to start for signaling/application
+#define NUM_SESSION_PROCESSORS 10
+// threads to start for RTP processing
#define NUM_MEDIA_PROCESSORS 1
#define MAX_NET_DEVICES 32
_______________________________________________
Semsdev mailing list
[email protected]
http://lists.iptel.org/mailman/listinfo/semsdev