Module: sems
Branch: master
Commit: ef4000a11fdfad17e86ee9e24536dec9b29bf21c
URL:    
http://git.sip-router.org/cgi-bin/gitweb.cgi/sems/?a=commit;h=ef4000a11fdfad17e86ee9e24536dec9b29bf21c

Author: Raphael Coeffic <[email protected]>
Committer: Raphael Coeffic <[email protected]>
Date:   Thu May 19 15:33:16 2011 +0200

clean exit fo xmlrpc2di.

---

 apps/xmlrpc2di/MultithreadXmlRpcServer.cpp |   41 ++++++++++++++++++++++++---
 apps/xmlrpc2di/MultithreadXmlRpcServer.h   |   16 +++++++++-
 apps/xmlrpc2di/XMLRPC2DI.cpp               |   39 ++++++++++++++++++++++++--
 apps/xmlrpc2di/XMLRPC2DI.h                 |   12 +++++++-
 4 files changed, 97 insertions(+), 11 deletions(-)

diff --git a/apps/xmlrpc2di/MultithreadXmlRpcServer.cpp 
b/apps/xmlrpc2di/MultithreadXmlRpcServer.cpp
index 0a55a9a..88c5de7 100644
--- a/apps/xmlrpc2di/MultithreadXmlRpcServer.cpp
+++ b/apps/xmlrpc2di/MultithreadXmlRpcServer.cpp
@@ -10,37 +10,68 @@
 #include "XmlRpcUtil.h" 
 #include "XmlRpcException.h" 
 #include "MultithreadXmlRpcServer.h" 
- 
+#include "AmUtils.h"
+#include "AmEventDispatcher.h"
 #include "log.h"
  
 using namespace XmlRpc; 
 WorkerThread::WorkerThread(MultithreadXmlRpcServer* chief) 
-  : runcond(false), chief(chief) {
+  : running(true), runcond(false), chief(chief) {
 } 
  
 // call this method before calling run 
 void WorkerThread::addXmlRpcSource(XmlRpcSource* source,unsigned eventMask) 
 { 
   dispatcher.addSource(source,eventMask); 
-  runcond.set(true);
+  wakeup();
 }  
- 
+
+void WorkerThread::wakeup() {
+  runcond.set(true);
+}
+
 void WorkerThread::run() 
 { 
+  running.set(true);
+
+  string eventqueue_name = "MT_XMLRPC_SERVER_" + long2str((unsigned 
long)pthread_self());
+
+  // register us as SIP event receiver for MOD_NAME
+  AmEventDispatcher::instance()->addEventQueue(eventqueue_name, this);
+
   chief->reportBack(this);
 
-  while (!is_stopped()) {
+  while (running.get()) {
     runcond.wait_for();
     dispatcher.work(-1.0); 
   
     dispatcher.clear(); // close socket and others ...  
     runcond.set(false);
+
     /* tell chief we can work again */
     chief->reportBack(this);
   }
+
+  AmEventDispatcher::instance()->delEventQueue(eventqueue_name);
   DBG("WorkerThread stopped.\n");
 } 
 
+void WorkerThread::postEvent(AmEvent* ev) {
+
+  if (ev->event_id == E_SYSTEM) {
+    AmSystemEvent* sys_ev = dynamic_cast<AmSystemEvent*>(ev);
+    if (sys_ev) {
+      if (sys_ev->sys_event == AmSystemEvent::ServerShutdown) {
+       DBG("XMLRPC worker thread received system Event: ServerShutdown, 
stopping\n");
+       running.set(false);
+       runcond.set(true);
+      }
+      return;
+    }
+  }
+  WARN("unknown event received\n");
+}
+
 void WorkerThread::on_stop() {
 }
 
diff --git a/apps/xmlrpc2di/MultithreadXmlRpcServer.h 
b/apps/xmlrpc2di/MultithreadXmlRpcServer.h
index e38588c..a6be731 100644
--- a/apps/xmlrpc2di/MultithreadXmlRpcServer.h
+++ b/apps/xmlrpc2di/MultithreadXmlRpcServer.h
@@ -11,6 +11,7 @@
 #include <AmThread.h> 
 #include "XmlRpcServer.h" 
 #include "XmlRpcDispatch.h" 
+#include "AmEventQueue.h"
 
 #include <queue>
 #include <vector>
@@ -18,17 +19,27 @@
 namespace XmlRpc { 
   class MultithreadXmlRpcServer;
 
-  class WorkerThread : public AmThread {
+  class WorkerThread
+    : public AmThread,
+    public AmEventQueueInterface
+  {
+
     MultithreadXmlRpcServer* chief;
     AmCondition<bool> runcond;
 
+    AmCondition<bool> running;
+
   public: 
     WorkerThread(MultithreadXmlRpcServer* chief);
 
     void addXmlRpcSource(XmlRpcSource* source,unsigned eventMask); // call 
this method to make it run 
+    void wakeup();
+
 
     void run(); 
     void on_stop(); 
+
+    void postEvent(AmEvent* ev);
   private:  
     XmlRpcDispatch dispatcher;  
   }; 
@@ -37,7 +48,8 @@ namespace XmlRpc {
 #define MAX_THREAD_SIZE 8  
  
   //! Multi-threaded sever class to handle XML RPC requests 
-  class MultithreadXmlRpcServer : public XmlRpcServer { 
+  class MultithreadXmlRpcServer
+    : public XmlRpcServer { 
   public: 
     //! Create a server object. 
     MultithreadXmlRpcServer();
diff --git a/apps/xmlrpc2di/XMLRPC2DI.cpp b/apps/xmlrpc2di/XMLRPC2DI.cpp
index 9500b81..5364eba 100644
--- a/apps/xmlrpc2di/XMLRPC2DI.cpp
+++ b/apps/xmlrpc2di/XMLRPC2DI.cpp
@@ -31,6 +31,7 @@
 #include "AmUtils.h"
 #include "AmArg.h"
 #include "AmSession.h"
+#include "AmEventDispatcher.h"
 #include "TOXmlRpcClient.h"
 
 #define MOD_NAME "xmlrpc2di"
@@ -325,7 +326,8 @@ XMLRPC2DIServer::XMLRPC2DIServer(unsigned int port,
                                 bool di_export, 
                                 string direct_export,
                                 XmlRpcServer* s) 
-  : port(port),
+  : AmEventQueue(this),
+    port(port),
     bind_ip(bind_ip),
     s(s),
     // register method 'calls'
@@ -434,12 +436,43 @@ void XMLRPC2DIServer::registerMethods(const std::string& 
iface) {
 void XMLRPC2DIServer::run() {
   DBG("Binding XMLRPC2DIServer to port %u \n", port);
   s->bindAndListen(port, bind_ip);
+
+  // register us as SIP event receiver for MOD_NAME
+  AmEventDispatcher::instance()->addEventQueue(MOD_NAME, this);
+
   DBG("starting XMLRPC2DIServer...\n");
-  s->work(-1.0);
+  running.set(true);
+  do {
+    s->work(DEF_XMLRPCSERVER_WORK_INTERVAL);
+    processEvents();
+  } 
+  while(running.get());
+
+  AmEventDispatcher::instance()->delEventQueue(MOD_NAME);
+  DBG("Exiting XMLRPC2DIServer.\n");
+}
+
+void XMLRPC2DIServer::process(AmEvent* ev) {
+  if (ev->event_id == E_SYSTEM) {
+    AmSystemEvent* sys_ev = dynamic_cast<AmSystemEvent*>(ev);
+    if(sys_ev){        
+      DBG("XMLRPC2DIServer received system Event\n");
+      if (sys_ev->sys_event == AmSystemEvent::ServerShutdown) {
+       DBG("XMLRPC2DIServer received system Event: ServerShutdown, "
+           "stopping thread\n");
+       running.set(false);
+      }
+      return;
+    }
+  }
+  WARN("unknown event received\n");
 }
+
 void XMLRPC2DIServer::on_stop() {
-  DBG("sorry, don't know how to stop the server.\n");
+  DBG("on_stop().\n");
+  running.set(false);
 }
+
 void XMLRPC2DIServerCallsMethod::execute(XmlRpcValue& params, XmlRpcValue& 
result) {
   int res = AmSession::getSessionNum();
   DBG("XMLRPC2DI: calls = %d\n", res);
diff --git a/apps/xmlrpc2di/XMLRPC2DI.h b/apps/xmlrpc2di/XMLRPC2DI.h
index fe6d338..a3fef2d 100644
--- a/apps/xmlrpc2di/XMLRPC2DI.h
+++ b/apps/xmlrpc2di/XMLRPC2DI.h
@@ -41,6 +41,8 @@ using std::string;
 
 #include <time.h>
 
+#define DEF_XMLRPCSERVER_WORK_INTERVAL .200 /* 200 ms */
+
 #define DEF_XMLRPCSERVERMETHOD(cls_name, func_name)            \
   class cls_name                                               \
   :  public XmlRpcServerMethod {                               \
@@ -90,12 +92,18 @@ struct DIMethodProxy : public XmlRpcServerMethod
   void execute(XmlRpcValue& params, XmlRpcValue& result);
 };
 
-class XMLRPC2DIServer : public AmThread {
+class XMLRPC2DIServer
+: public AmThread,
+  public AmEventQueue,
+  public AmEventHandler
+{
   XmlRpcServer* s;
 
   unsigned int port; 
   string bind_ip;
 
+  AmSharedVar<bool> running;
+
   XMLRPC2DIServerCallsMethod       calls_method;
   XMLRPC2DIServerSetLoglevelMethod setloglevel_method;
   XMLRPC2DIServerGetLoglevelMethod getloglevel_method;
@@ -110,6 +118,8 @@ class XMLRPC2DIServer : public AmThread {
   XMLRPC2DIServerDIMethod*         di_method;
   void registerMethods(const std::string& iface);
 
+  void process(AmEvent* ev);
+
  public: 
   XMLRPC2DIServer(unsigned int port,
                  const string& bind_ip,

_______________________________________________
Semsdev mailing list
[email protected]
http://lists.iptel.org/mailman/listinfo/semsdev

Reply via email to