Author: sayer
Date: 2008-09-26 00:42:30 +0200 (Fri, 26 Sep 2008)
New Revision: 1102

Added:
   trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.cpp
   trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.h
Modified:
   trunk/apps/examples/xmlrpc2di/Makefile
   trunk/apps/examples/xmlrpc2di/XMLRPC2DI.cpp
   trunk/apps/examples/xmlrpc2di/XMLRPC2DI.h
   trunk/apps/examples/xmlrpc2di/etc/xmlrpc2di.conf
Log:
multi threaded xmlrpc server implementation

Modified: trunk/apps/examples/xmlrpc2di/Makefile
===================================================================
--- trunk/apps/examples/xmlrpc2di/Makefile      2008-09-24 00:56:34 UTC (rev 
1101)
+++ trunk/apps/examples/xmlrpc2di/Makefile      2008-09-25 22:42:30 UTC (rev 
1102)
@@ -3,7 +3,7 @@
 plug_in_name = xmlrpc2di
 
 module_ldflags = -lxmlrpc++ 
-module_cflags  = 
+module_cflags  = -DHAVE_XMLRPCPP_SSL 
 # for gentoo ebuild or cvs-20040713 version:
 # module_cflags  = -DHAVE_XMLRPCPP_SSL
 #

Added: trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.cpp
===================================================================
--- trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.cpp   2008-09-24 
00:56:34 UTC (rev 1101)
+++ trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.cpp   2008-09-25 
22:42:30 UTC (rev 1102)
@@ -0,0 +1,122 @@
+/* 
+ * Written by Jae An ([EMAIL PROTECTED]) 
+ *
+ * adapted for SEMS by Stefan Sayer stefan.sayer at iptego.com
+ */ 
+#include "XmlRpcServer.h" 
+#include "XmlRpcServerConnection.h" 
+#include "XmlRpcServerMethod.h" 
+#include "XmlRpcSocket.h" 
+#include "XmlRpcUtil.h" 
+#include "XmlRpcException.h" 
+#include "MultithreadXmlRpcServer.h" 
+ 
+#include "log.h"
+ 
+using namespace XmlRpc; 
+WorkerThread::WorkerThread(MultithreadXmlRpcServer* chief) 
+  : runcond(false), chief(chief) {
+} 
+ 
+// call this method before calling run 
+void WorkerThread::addXmlRpcSource(XmlRpcSource* source,unsigned eventMask) 
+{ 
+  dispatcher.addSource(source,eventMask); 
+  runcond.set(true);
+}  
+ 
+void WorkerThread::run() 
+{ 
+  chief->reportBack(this);
+
+  while (!is_stopped()) {
+    runcond.wait_for();
+    dispatcher.work(-1.0); 
+  
+    dispatcher.clear(); // close socket and others ...  
+    runcond.set(false);
+    /* tell chief we can work again */
+    chief->reportBack(this);
+  }
+  DBG("WorkerThread stopped.\n");
+} 
+
+void WorkerThread::on_stop() {
+}
+
+MultithreadXmlRpcServer::MultithreadXmlRpcServer()
+  : XmlRpcServer(), have_waiting(false)   {
+}
+
+// Wait until all the worker threads are done. 
+MultithreadXmlRpcServer::~MultithreadXmlRpcServer() 
+{ 
+  for (std::vector<WorkerThread*>::iterator it=
+        workers.begin(); it != workers.end();it++) {
+    (*it)->stop();
+    (*it)->join();
+    delete *it;
+  }
+} 
+
+// Accept a client connection request and create a connection to 
+// handle method calls from the client. 
+void MultithreadXmlRpcServer::acceptConnection() 
+{ 
+  int s = XmlRpcSocket::accept(this->getfd()); 
+  if (s < 0) 
+    { 
+      ERROR("MultithreadXmlRpcServer::acceptConnection: Could not accept 
connection (%s).", 
+                       XmlRpcSocket::getErrorMsg().c_str()); 
+      
+    }
+  else if ( ! XmlRpcSocket::setNonBlocking(s))
+    {
+      XmlRpcSocket::close(s);
+      ERROR("XmlRpcServer::acceptConnection: Could not set socket "
+           "to non-blocking input mode (%s).\n", 
+           XmlRpcSocket::getErrorMsg().c_str());
+    }
+  else // Notify the dispatcher to listen for input on this source when we are 
in work() 
+    { 
+      WorkerThread* thr = NULL;
+      while (!thr) {
+       if (!have_waiting.get()) 
+           have_waiting.wait_for();
+       thr = getIdleThread();
+      }
+      thr->addXmlRpcSource(this->createConnection(s), 
XmlRpcDispatch::ReadableEvent); 
+ 
+      //       // Uh oh.. all threads are busy...  
+      //       // Just close the connection so that the rejected client would 
try again. 
+      //       XmlRpcSocket::close(s); 
+      //       XmlRpcUtil::error("MultithreadXmlRpcServer::acceptConnection: 
All threads are busy. Rejected a client"); 
+    } 
+}
+
+void MultithreadXmlRpcServer::reportBack(WorkerThread* thr) {
+  waiting_mut.lock();
+  waiting.push(thr);
+  have_waiting.set(true);
+  waiting_mut.unlock();
+}
+
+WorkerThread* MultithreadXmlRpcServer::getIdleThread() {
+  WorkerThread* res = NULL;
+  waiting_mut.lock();
+  if (!waiting.empty()) {
+    res = waiting.front();
+    waiting.pop();
+  }
+  have_waiting.set(!waiting.empty());
+  waiting_mut.unlock();
+  return res;
+}
+
+void MultithreadXmlRpcServer::createThreads(unsigned int n) {
+  for (unsigned int i=0;i<n;i++) {
+    WorkerThread* thr = new WorkerThread(this);
+    workers.push_back(thr);
+    thr->start();
+  }
+}


Property changes on: trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.cpp
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Added: trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.h
===================================================================
--- trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.h     2008-09-24 
00:56:34 UTC (rev 1101)
+++ trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.h     2008-09-25 
22:42:30 UTC (rev 1102)
@@ -0,0 +1,66 @@
+/** 
+ * Written by Jae An ([EMAIL PROTECTED]) 
+ * http://sourceforge.net/forum/forum.php?thread_id=910224&forum_id=240495
+ *
+ * adapted for SEMS by Stefan Sayer stefan.sayer at iptego.com
+ */ 
+
+#ifndef _MULTI_THREAD_XMLRPCSERVER_H_ 
+#define _MULTI_THREAD_XMLRPCSERVER_H_ 
+  
+#include <AmThread.h> 
+#include "XmlRpcServer.h" 
+#include "XmlRpcDispatch.h" 
+
+#include <queue>
+#include <vector>
+ 
+namespace XmlRpc { 
+  class MultithreadXmlRpcServer;
+
+  class WorkerThread : public AmThread {
+    MultithreadXmlRpcServer* chief;
+    AmCondition<bool> runcond;
+
+  public: 
+    WorkerThread(MultithreadXmlRpcServer* chief);
+
+    void addXmlRpcSource(XmlRpcSource* source,unsigned eventMask); // call 
this method to make it run 
+
+    void run(); 
+    void on_stop(); 
+  private:  
+    XmlRpcDispatch dispatcher;  
+  }; 
+ 
+ 
+#define MAX_THREAD_SIZE 8  
+ 
+  //! Multi-threaded sever class to handle XML RPC requests 
+  class MultithreadXmlRpcServer : public XmlRpcServer { 
+  public: 
+    //! Create a server object. 
+    MultithreadXmlRpcServer();
+    //! Destructor. 
+    virtual ~MultithreadXmlRpcServer(); 
+ 
+    /* report back from work */
+    void reportBack(WorkerThread* thr);
+    void createThreads(unsigned int n);
+
+  protected: 
+ 
+    //! Accept a client connection request 
+    virtual void acceptConnection(); 
+ 
+  private:
+    AmMutex waiting_mut;
+    std::queue<WorkerThread*> waiting; 
+    AmCondition<bool> have_waiting;
+    std::vector<WorkerThread*> workers;
+    WorkerThread* getIdleThread();
+  }; 
+ 
+} // namespace XmlRpc 
+ 
+#endif  


Property changes on: trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.h
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Modified: trunk/apps/examples/xmlrpc2di/XMLRPC2DI.cpp
===================================================================
--- trunk/apps/examples/xmlrpc2di/XMLRPC2DI.cpp 2008-09-24 00:56:34 UTC (rev 
1101)
+++ trunk/apps/examples/xmlrpc2di/XMLRPC2DI.cpp 2008-09-25 22:42:30 UTC (rev 
1102)
@@ -65,10 +65,32 @@
     return 0;
   configured = true;
 
+  
   AmConfigReader cfg;
   if(cfg.loadFile(AmConfig::ModConfigPath + string(MOD_NAME ".conf")))
     return -1;
 
+  string multithreaded = cfg.getParameter("multithreaded", "yes");
+
+  XmlRpcServer* s;
+  bool multi_threaded = false;
+  unsigned int threads = 0;
+  if (multithreaded == "yes") {
+    multi_threaded = true;
+    if (!cfg.getParameter("threads").length())
+      threads = 5;
+    else 
+      threads = cfg.getParameterInt("threads", 5);
+
+    DBG("Running multi-threaded XMLRPC server with %u threads\n", threads);
+    MultithreadXmlRpcServer* mt_s = new MultithreadXmlRpcServer();
+    mt_s->createThreads(threads);    
+    s = mt_s;
+  } else {
+    DBG("Running single-threaded XMLRPC server\n");
+    s = new XmlRpcServer();
+  }
+
   ServerRetryAfter = cfg.getParameterInt("server_retry_after", 10);
   DBG("retrying failed server after %u seconds\n", ServerRetryAfter);
 
@@ -106,8 +128,7 @@
   DBG("XMLRPC Server: %snabling builtin method 'di'.\n", export_di?"E":"Not 
e");
 
 
-  server = new XMLRPC2DIServer(XMLRPCPort, export_di, direct_export);
-
+  server = new XMLRPC2DIServer(XMLRPCPort, export_di, direct_export, s);
   server->start();
   return 0;
 }
@@ -228,14 +249,16 @@
 
 XMLRPC2DIServer::XMLRPC2DIServer(unsigned int port, 
                                 bool di_export, 
-                                string direct_export) 
+                                string direct_export,
+                                XmlRpcServer* s) 
   : port(port),
+    s(s),
     // register method 'calls'
-    calls_method(&s),
+    calls_method(s),
     // register method 'get_loglevel'
-    setloglevel_method(&s),
+    setloglevel_method(s),
     // register method 'set_loglevel'
-    getloglevel_method(&s)
+    getloglevel_method(s)
 {      
   DBG("XMLRPC Server: enabled builtin method 'calls'\n");
   DBG("XMLRPC Server: enabled builtin method 'get_loglevel'\n");
@@ -244,7 +267,7 @@
   // export all methods via 'di' function? 
   if (di_export) {
     // register method 'di'
-    di_method = new XMLRPC2DIServerDIMethod(&s);
+    di_method = new XMLRPC2DIServerDIMethod(s);
   }
   
   vector<string> export_ifaces = explode(direct_export, ";");
@@ -281,7 +304,7 @@
     for (unsigned int i=0;i<fct_list.size();i++) {
       string method = fct_list.get(i).asCStr();
       // see whether method already registered
-      bool has_method = (NULL != s.findMethod(method));
+      bool has_method = (NULL != s->findMethod(method));
       if (has_method) {
        ERROR("name conflict for method '%s' from interface '%s', "
              "method already exported!\n",
@@ -294,14 +317,14 @@
        DBG("XMLRPC Server: adding method '%s'\n",
            method.c_str());
        DIMethodProxy* mp = new DIMethodProxy(method, method, di_f);
-       s.addMethod(mp);
+       s->addMethod(mp);
       }
       
       DBG("XMLRPC Server: adding method '%s.%s'\n",
          iface.c_str(), method.c_str());
       DIMethodProxy* mp = new DIMethodProxy(iface + "." + method, 
                                            method, di_f);
-      s.addMethod(mp);
+      s->addMethod(mp);
     }
   } catch (AmDynInvoke::NotImplemented& e) {
     ERROR("Not implemented in interface '%s': '%s'\n", 
@@ -318,9 +341,9 @@
 
 void XMLRPC2DIServer::run() {
   DBG("Binding XMLRPC2DIServer to port %u \n", port);
-  s.bindAndListen(port);
+  s->bindAndListen(port);
   DBG("starting XMLRPC2DIServer...\n");
-  s.work(-1.0);
+  s->work(-1.0);
 }
 void XMLRPC2DIServer::on_stop() {
   DBG("sorry, don't know how to stop the server.\n");

Modified: trunk/apps/examples/xmlrpc2di/XMLRPC2DI.h
===================================================================
--- trunk/apps/examples/xmlrpc2di/XMLRPC2DI.h   2008-09-24 00:56:34 UTC (rev 
1101)
+++ trunk/apps/examples/xmlrpc2di/XMLRPC2DI.h   2008-09-25 22:42:30 UTC (rev 
1102)
@@ -28,6 +28,7 @@
 #define XMLRPC2DISERVER_H
 
 #include "XmlRpc.h"
+#include "MultithreadXmlRpcServer.h"
 using namespace XmlRpc;
 
 #include "AmThread.h"
@@ -83,7 +84,8 @@
 };
 
 class XMLRPC2DIServer : public AmThread {
-  XmlRpcServer s;
+  XmlRpcServer* s;
+
   unsigned int port; 
   XMLRPC2DIServerCallsMethod       calls_method;
   XMLRPC2DIServerSetLoglevelMethod setloglevel_method;
@@ -94,7 +96,8 @@
  public: 
   XMLRPC2DIServer(unsigned int port, 
                  bool di_export, 
-                 string direct_export);
+                 string direct_export,
+                 XmlRpcServer* s);
   void run();
   void on_stop();
   

Modified: trunk/apps/examples/xmlrpc2di/etc/xmlrpc2di.conf
===================================================================
--- trunk/apps/examples/xmlrpc2di/etc/xmlrpc2di.conf    2008-09-24 00:56:34 UTC 
(rev 1101)
+++ trunk/apps/examples/xmlrpc2di/etc/xmlrpc2di.conf    2008-09-25 22:42:30 UTC 
(rev 1102)
@@ -1,6 +1,17 @@
 # port to bind XMLRPC server to 
 xmlrpc_port=8090
 
+
+# run multi-threaded server? 
+# Default: yes
+# 
+# multithreaded = yes
+
+# threads to run - this many requests can be processed in parallel
+# Default: 5
+#
+# threads=5
+
 # export all DI functions with the function call 'di'?
 # defaults to: yes
 # export_di=yes
@@ -11,3 +22,6 @@
 # defaults to: none
 # direct_export=di_dial;registrar_client
 
+# run the XMLRPC server at all (default: yes)
+#
+# run_server=yes 

_______________________________________________
Semsdev mailing list
[email protected]
http://lists.iptel.org/mailman/listinfo/semsdev

Reply via email to