Module: sems Branch: 1.4 Commit: 85be85f0ea85509782f6f41423d16bb95037e9c3 URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sems/?a=commit;h=85be85f0ea85509782f6f41423d16bb95037e9c3
Author: Raphael Coeffic <[email protected]> Committer: Raphael Coeffic <[email protected]> Date: Thu May 19 15:33:16 2011 +0200 clean exit fo xmlrpc2di. --- apps/xmlrpc2di/MultithreadXmlRpcServer.cpp | 41 ++++++++++++++++++++++++--- apps/xmlrpc2di/MultithreadXmlRpcServer.h | 16 +++++++++- apps/xmlrpc2di/XMLRPC2DI.cpp | 39 ++++++++++++++++++++++++-- apps/xmlrpc2di/XMLRPC2DI.h | 12 +++++++- 4 files changed, 97 insertions(+), 11 deletions(-) diff --git a/apps/xmlrpc2di/MultithreadXmlRpcServer.cpp b/apps/xmlrpc2di/MultithreadXmlRpcServer.cpp index 0a55a9a..88c5de7 100644 --- a/apps/xmlrpc2di/MultithreadXmlRpcServer.cpp +++ b/apps/xmlrpc2di/MultithreadXmlRpcServer.cpp @@ -10,37 +10,68 @@ #include "XmlRpcUtil.h" #include "XmlRpcException.h" #include "MultithreadXmlRpcServer.h" - +#include "AmUtils.h" +#include "AmEventDispatcher.h" #include "log.h" using namespace XmlRpc; WorkerThread::WorkerThread(MultithreadXmlRpcServer* chief) - : runcond(false), chief(chief) { + : running(true), runcond(false), chief(chief) { } // call this method before calling run void WorkerThread::addXmlRpcSource(XmlRpcSource* source,unsigned eventMask) { dispatcher.addSource(source,eventMask); - runcond.set(true); + wakeup(); } - + +void WorkerThread::wakeup() { + runcond.set(true); +} + void WorkerThread::run() { + running.set(true); + + string eventqueue_name = "MT_XMLRPC_SERVER_" + long2str((unsigned long)pthread_self()); + + // register us as SIP event receiver for MOD_NAME + AmEventDispatcher::instance()->addEventQueue(eventqueue_name, this); + chief->reportBack(this); - while (!is_stopped()) { + while (running.get()) { runcond.wait_for(); dispatcher.work(-1.0); dispatcher.clear(); // close socket and others ... runcond.set(false); + /* tell chief we can work again */ chief->reportBack(this); } + + AmEventDispatcher::instance()->delEventQueue(eventqueue_name); DBG("WorkerThread stopped.\n"); } +void WorkerThread::postEvent(AmEvent* ev) { + + if (ev->event_id == E_SYSTEM) { + AmSystemEvent* sys_ev = dynamic_cast<AmSystemEvent*>(ev); + if (sys_ev) { + if (sys_ev->sys_event == AmSystemEvent::ServerShutdown) { + DBG("XMLRPC worker thread received system Event: ServerShutdown, stopping\n"); + running.set(false); + runcond.set(true); + } + return; + } + } + WARN("unknown event received\n"); +} + void WorkerThread::on_stop() { } diff --git a/apps/xmlrpc2di/MultithreadXmlRpcServer.h b/apps/xmlrpc2di/MultithreadXmlRpcServer.h index e38588c..a6be731 100644 --- a/apps/xmlrpc2di/MultithreadXmlRpcServer.h +++ b/apps/xmlrpc2di/MultithreadXmlRpcServer.h @@ -11,6 +11,7 @@ #include <AmThread.h> #include "XmlRpcServer.h" #include "XmlRpcDispatch.h" +#include "AmEventQueue.h" #include <queue> #include <vector> @@ -18,17 +19,27 @@ namespace XmlRpc { class MultithreadXmlRpcServer; - class WorkerThread : public AmThread { + class WorkerThread + : public AmThread, + public AmEventQueueInterface + { + MultithreadXmlRpcServer* chief; AmCondition<bool> runcond; + AmCondition<bool> running; + public: WorkerThread(MultithreadXmlRpcServer* chief); void addXmlRpcSource(XmlRpcSource* source,unsigned eventMask); // call this method to make it run + void wakeup(); + void run(); void on_stop(); + + void postEvent(AmEvent* ev); private: XmlRpcDispatch dispatcher; }; @@ -37,7 +48,8 @@ namespace XmlRpc { #define MAX_THREAD_SIZE 8 //! Multi-threaded sever class to handle XML RPC requests - class MultithreadXmlRpcServer : public XmlRpcServer { + class MultithreadXmlRpcServer + : public XmlRpcServer { public: //! Create a server object. MultithreadXmlRpcServer(); diff --git a/apps/xmlrpc2di/XMLRPC2DI.cpp b/apps/xmlrpc2di/XMLRPC2DI.cpp index 9500b81..5364eba 100644 --- a/apps/xmlrpc2di/XMLRPC2DI.cpp +++ b/apps/xmlrpc2di/XMLRPC2DI.cpp @@ -31,6 +31,7 @@ #include "AmUtils.h" #include "AmArg.h" #include "AmSession.h" +#include "AmEventDispatcher.h" #include "TOXmlRpcClient.h" #define MOD_NAME "xmlrpc2di" @@ -325,7 +326,8 @@ XMLRPC2DIServer::XMLRPC2DIServer(unsigned int port, bool di_export, string direct_export, XmlRpcServer* s) - : port(port), + : AmEventQueue(this), + port(port), bind_ip(bind_ip), s(s), // register method 'calls' @@ -434,12 +436,43 @@ void XMLRPC2DIServer::registerMethods(const std::string& iface) { void XMLRPC2DIServer::run() { DBG("Binding XMLRPC2DIServer to port %u \n", port); s->bindAndListen(port, bind_ip); + + // register us as SIP event receiver for MOD_NAME + AmEventDispatcher::instance()->addEventQueue(MOD_NAME, this); + DBG("starting XMLRPC2DIServer...\n"); - s->work(-1.0); + running.set(true); + do { + s->work(DEF_XMLRPCSERVER_WORK_INTERVAL); + processEvents(); + } + while(running.get()); + + AmEventDispatcher::instance()->delEventQueue(MOD_NAME); + DBG("Exiting XMLRPC2DIServer.\n"); +} + +void XMLRPC2DIServer::process(AmEvent* ev) { + if (ev->event_id == E_SYSTEM) { + AmSystemEvent* sys_ev = dynamic_cast<AmSystemEvent*>(ev); + if(sys_ev){ + DBG("XMLRPC2DIServer received system Event\n"); + if (sys_ev->sys_event == AmSystemEvent::ServerShutdown) { + DBG("XMLRPC2DIServer received system Event: ServerShutdown, " + "stopping thread\n"); + running.set(false); + } + return; + } + } + WARN("unknown event received\n"); } + void XMLRPC2DIServer::on_stop() { - DBG("sorry, don't know how to stop the server.\n"); + DBG("on_stop().\n"); + running.set(false); } + void XMLRPC2DIServerCallsMethod::execute(XmlRpcValue& params, XmlRpcValue& result) { int res = AmSession::getSessionNum(); DBG("XMLRPC2DI: calls = %d\n", res); diff --git a/apps/xmlrpc2di/XMLRPC2DI.h b/apps/xmlrpc2di/XMLRPC2DI.h index fe6d338..a3fef2d 100644 --- a/apps/xmlrpc2di/XMLRPC2DI.h +++ b/apps/xmlrpc2di/XMLRPC2DI.h @@ -41,6 +41,8 @@ using std::string; #include <time.h> +#define DEF_XMLRPCSERVER_WORK_INTERVAL .200 /* 200 ms */ + #define DEF_XMLRPCSERVERMETHOD(cls_name, func_name) \ class cls_name \ : public XmlRpcServerMethod { \ @@ -90,12 +92,18 @@ struct DIMethodProxy : public XmlRpcServerMethod void execute(XmlRpcValue& params, XmlRpcValue& result); }; -class XMLRPC2DIServer : public AmThread { +class XMLRPC2DIServer +: public AmThread, + public AmEventQueue, + public AmEventHandler +{ XmlRpcServer* s; unsigned int port; string bind_ip; + AmSharedVar<bool> running; + XMLRPC2DIServerCallsMethod calls_method; XMLRPC2DIServerSetLoglevelMethod setloglevel_method; XMLRPC2DIServerGetLoglevelMethod getloglevel_method; @@ -110,6 +118,8 @@ class XMLRPC2DIServer : public AmThread { XMLRPC2DIServerDIMethod* di_method; void registerMethods(const std::string& iface); + void process(AmEvent* ev); + public: XMLRPC2DIServer(unsigned int port, const string& bind_ip, _______________________________________________ Semsdev mailing list [email protected] http://lists.iptel.org/mailman/listinfo/semsdev
