Author: sayer
Date: 2008-09-26 00:42:30 +0200 (Fri, 26 Sep 2008)
New Revision: 1102
Added:
trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.cpp
trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.h
Modified:
trunk/apps/examples/xmlrpc2di/Makefile
trunk/apps/examples/xmlrpc2di/XMLRPC2DI.cpp
trunk/apps/examples/xmlrpc2di/XMLRPC2DI.h
trunk/apps/examples/xmlrpc2di/etc/xmlrpc2di.conf
Log:
multi threaded xmlrpc server implementation
Modified: trunk/apps/examples/xmlrpc2di/Makefile
===================================================================
--- trunk/apps/examples/xmlrpc2di/Makefile 2008-09-24 00:56:34 UTC (rev
1101)
+++ trunk/apps/examples/xmlrpc2di/Makefile 2008-09-25 22:42:30 UTC (rev
1102)
@@ -3,7 +3,7 @@
plug_in_name = xmlrpc2di
module_ldflags = -lxmlrpc++
-module_cflags =
+module_cflags = -DHAVE_XMLRPCPP_SSL
# for gentoo ebuild or cvs-20040713 version:
# module_cflags = -DHAVE_XMLRPCPP_SSL
#
Added: trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.cpp
===================================================================
--- trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.cpp 2008-09-24
00:56:34 UTC (rev 1101)
+++ trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.cpp 2008-09-25
22:42:30 UTC (rev 1102)
@@ -0,0 +1,122 @@
+/*
+ * Written by Jae An ([EMAIL PROTECTED])
+ *
+ * adapted for SEMS by Stefan Sayer stefan.sayer at iptego.com
+ */
+#include "XmlRpcServer.h"
+#include "XmlRpcServerConnection.h"
+#include "XmlRpcServerMethod.h"
+#include "XmlRpcSocket.h"
+#include "XmlRpcUtil.h"
+#include "XmlRpcException.h"
+#include "MultithreadXmlRpcServer.h"
+
+#include "log.h"
+
+using namespace XmlRpc;
+WorkerThread::WorkerThread(MultithreadXmlRpcServer* chief)
+ : runcond(false), chief(chief) {
+}
+
+// call this method before calling run
+void WorkerThread::addXmlRpcSource(XmlRpcSource* source,unsigned eventMask)
+{
+ dispatcher.addSource(source,eventMask);
+ runcond.set(true);
+}
+
+void WorkerThread::run()
+{
+ chief->reportBack(this);
+
+ while (!is_stopped()) {
+ runcond.wait_for();
+ dispatcher.work(-1.0);
+
+ dispatcher.clear(); // close socket and others ...
+ runcond.set(false);
+ /* tell chief we can work again */
+ chief->reportBack(this);
+ }
+ DBG("WorkerThread stopped.\n");
+}
+
+void WorkerThread::on_stop() {
+}
+
+MultithreadXmlRpcServer::MultithreadXmlRpcServer()
+ : XmlRpcServer(), have_waiting(false) {
+}
+
+// Wait until all the worker threads are done.
+MultithreadXmlRpcServer::~MultithreadXmlRpcServer()
+{
+ for (std::vector<WorkerThread*>::iterator it=
+ workers.begin(); it != workers.end();it++) {
+ (*it)->stop();
+ (*it)->join();
+ delete *it;
+ }
+}
+
+// Accept a client connection request and create a connection to
+// handle method calls from the client.
+void MultithreadXmlRpcServer::acceptConnection()
+{
+ int s = XmlRpcSocket::accept(this->getfd());
+ if (s < 0)
+ {
+ ERROR("MultithreadXmlRpcServer::acceptConnection: Could not accept
connection (%s).",
+ XmlRpcSocket::getErrorMsg().c_str());
+
+ }
+ else if ( ! XmlRpcSocket::setNonBlocking(s))
+ {
+ XmlRpcSocket::close(s);
+ ERROR("XmlRpcServer::acceptConnection: Could not set socket "
+ "to non-blocking input mode (%s).\n",
+ XmlRpcSocket::getErrorMsg().c_str());
+ }
+ else // Notify the dispatcher to listen for input on this source when we are
in work()
+ {
+ WorkerThread* thr = NULL;
+ while (!thr) {
+ if (!have_waiting.get())
+ have_waiting.wait_for();
+ thr = getIdleThread();
+ }
+ thr->addXmlRpcSource(this->createConnection(s),
XmlRpcDispatch::ReadableEvent);
+
+ // // Uh oh.. all threads are busy...
+ // // Just close the connection so that the rejected client would
try again.
+ // XmlRpcSocket::close(s);
+ // XmlRpcUtil::error("MultithreadXmlRpcServer::acceptConnection:
All threads are busy. Rejected a client");
+ }
+}
+
+void MultithreadXmlRpcServer::reportBack(WorkerThread* thr) {
+ waiting_mut.lock();
+ waiting.push(thr);
+ have_waiting.set(true);
+ waiting_mut.unlock();
+}
+
+WorkerThread* MultithreadXmlRpcServer::getIdleThread() {
+ WorkerThread* res = NULL;
+ waiting_mut.lock();
+ if (!waiting.empty()) {
+ res = waiting.front();
+ waiting.pop();
+ }
+ have_waiting.set(!waiting.empty());
+ waiting_mut.unlock();
+ return res;
+}
+
+void MultithreadXmlRpcServer::createThreads(unsigned int n) {
+ for (unsigned int i=0;i<n;i++) {
+ WorkerThread* thr = new WorkerThread(this);
+ workers.push_back(thr);
+ thr->start();
+ }
+}
Property changes on: trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.cpp
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Added: trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.h
===================================================================
--- trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.h 2008-09-24
00:56:34 UTC (rev 1101)
+++ trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.h 2008-09-25
22:42:30 UTC (rev 1102)
@@ -0,0 +1,66 @@
+/**
+ * Written by Jae An ([EMAIL PROTECTED])
+ * http://sourceforge.net/forum/forum.php?thread_id=910224&forum_id=240495
+ *
+ * adapted for SEMS by Stefan Sayer stefan.sayer at iptego.com
+ */
+
+#ifndef _MULTI_THREAD_XMLRPCSERVER_H_
+#define _MULTI_THREAD_XMLRPCSERVER_H_
+
+#include <AmThread.h>
+#include "XmlRpcServer.h"
+#include "XmlRpcDispatch.h"
+
+#include <queue>
+#include <vector>
+
+namespace XmlRpc {
+ class MultithreadXmlRpcServer;
+
+ class WorkerThread : public AmThread {
+ MultithreadXmlRpcServer* chief;
+ AmCondition<bool> runcond;
+
+ public:
+ WorkerThread(MultithreadXmlRpcServer* chief);
+
+ void addXmlRpcSource(XmlRpcSource* source,unsigned eventMask); // call
this method to make it run
+
+ void run();
+ void on_stop();
+ private:
+ XmlRpcDispatch dispatcher;
+ };
+
+
+#define MAX_THREAD_SIZE 8
+
+ //! Multi-threaded sever class to handle XML RPC requests
+ class MultithreadXmlRpcServer : public XmlRpcServer {
+ public:
+ //! Create a server object.
+ MultithreadXmlRpcServer();
+ //! Destructor.
+ virtual ~MultithreadXmlRpcServer();
+
+ /* report back from work */
+ void reportBack(WorkerThread* thr);
+ void createThreads(unsigned int n);
+
+ protected:
+
+ //! Accept a client connection request
+ virtual void acceptConnection();
+
+ private:
+ AmMutex waiting_mut;
+ std::queue<WorkerThread*> waiting;
+ AmCondition<bool> have_waiting;
+ std::vector<WorkerThread*> workers;
+ WorkerThread* getIdleThread();
+ };
+
+} // namespace XmlRpc
+
+#endif
Property changes on: trunk/apps/examples/xmlrpc2di/MultithreadXmlRpcServer.h
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Modified: trunk/apps/examples/xmlrpc2di/XMLRPC2DI.cpp
===================================================================
--- trunk/apps/examples/xmlrpc2di/XMLRPC2DI.cpp 2008-09-24 00:56:34 UTC (rev
1101)
+++ trunk/apps/examples/xmlrpc2di/XMLRPC2DI.cpp 2008-09-25 22:42:30 UTC (rev
1102)
@@ -65,10 +65,32 @@
return 0;
configured = true;
+
AmConfigReader cfg;
if(cfg.loadFile(AmConfig::ModConfigPath + string(MOD_NAME ".conf")))
return -1;
+ string multithreaded = cfg.getParameter("multithreaded", "yes");
+
+ XmlRpcServer* s;
+ bool multi_threaded = false;
+ unsigned int threads = 0;
+ if (multithreaded == "yes") {
+ multi_threaded = true;
+ if (!cfg.getParameter("threads").length())
+ threads = 5;
+ else
+ threads = cfg.getParameterInt("threads", 5);
+
+ DBG("Running multi-threaded XMLRPC server with %u threads\n", threads);
+ MultithreadXmlRpcServer* mt_s = new MultithreadXmlRpcServer();
+ mt_s->createThreads(threads);
+ s = mt_s;
+ } else {
+ DBG("Running single-threaded XMLRPC server\n");
+ s = new XmlRpcServer();
+ }
+
ServerRetryAfter = cfg.getParameterInt("server_retry_after", 10);
DBG("retrying failed server after %u seconds\n", ServerRetryAfter);
@@ -106,8 +128,7 @@
DBG("XMLRPC Server: %snabling builtin method 'di'.\n", export_di?"E":"Not
e");
- server = new XMLRPC2DIServer(XMLRPCPort, export_di, direct_export);
-
+ server = new XMLRPC2DIServer(XMLRPCPort, export_di, direct_export, s);
server->start();
return 0;
}
@@ -228,14 +249,16 @@
XMLRPC2DIServer::XMLRPC2DIServer(unsigned int port,
bool di_export,
- string direct_export)
+ string direct_export,
+ XmlRpcServer* s)
: port(port),
+ s(s),
// register method 'calls'
- calls_method(&s),
+ calls_method(s),
// register method 'get_loglevel'
- setloglevel_method(&s),
+ setloglevel_method(s),
// register method 'set_loglevel'
- getloglevel_method(&s)
+ getloglevel_method(s)
{
DBG("XMLRPC Server: enabled builtin method 'calls'\n");
DBG("XMLRPC Server: enabled builtin method 'get_loglevel'\n");
@@ -244,7 +267,7 @@
// export all methods via 'di' function?
if (di_export) {
// register method 'di'
- di_method = new XMLRPC2DIServerDIMethod(&s);
+ di_method = new XMLRPC2DIServerDIMethod(s);
}
vector<string> export_ifaces = explode(direct_export, ";");
@@ -281,7 +304,7 @@
for (unsigned int i=0;i<fct_list.size();i++) {
string method = fct_list.get(i).asCStr();
// see whether method already registered
- bool has_method = (NULL != s.findMethod(method));
+ bool has_method = (NULL != s->findMethod(method));
if (has_method) {
ERROR("name conflict for method '%s' from interface '%s', "
"method already exported!\n",
@@ -294,14 +317,14 @@
DBG("XMLRPC Server: adding method '%s'\n",
method.c_str());
DIMethodProxy* mp = new DIMethodProxy(method, method, di_f);
- s.addMethod(mp);
+ s->addMethod(mp);
}
DBG("XMLRPC Server: adding method '%s.%s'\n",
iface.c_str(), method.c_str());
DIMethodProxy* mp = new DIMethodProxy(iface + "." + method,
method, di_f);
- s.addMethod(mp);
+ s->addMethod(mp);
}
} catch (AmDynInvoke::NotImplemented& e) {
ERROR("Not implemented in interface '%s': '%s'\n",
@@ -318,9 +341,9 @@
void XMLRPC2DIServer::run() {
DBG("Binding XMLRPC2DIServer to port %u \n", port);
- s.bindAndListen(port);
+ s->bindAndListen(port);
DBG("starting XMLRPC2DIServer...\n");
- s.work(-1.0);
+ s->work(-1.0);
}
void XMLRPC2DIServer::on_stop() {
DBG("sorry, don't know how to stop the server.\n");
Modified: trunk/apps/examples/xmlrpc2di/XMLRPC2DI.h
===================================================================
--- trunk/apps/examples/xmlrpc2di/XMLRPC2DI.h 2008-09-24 00:56:34 UTC (rev
1101)
+++ trunk/apps/examples/xmlrpc2di/XMLRPC2DI.h 2008-09-25 22:42:30 UTC (rev
1102)
@@ -28,6 +28,7 @@
#define XMLRPC2DISERVER_H
#include "XmlRpc.h"
+#include "MultithreadXmlRpcServer.h"
using namespace XmlRpc;
#include "AmThread.h"
@@ -83,7 +84,8 @@
};
class XMLRPC2DIServer : public AmThread {
- XmlRpcServer s;
+ XmlRpcServer* s;
+
unsigned int port;
XMLRPC2DIServerCallsMethod calls_method;
XMLRPC2DIServerSetLoglevelMethod setloglevel_method;
@@ -94,7 +96,8 @@
public:
XMLRPC2DIServer(unsigned int port,
bool di_export,
- string direct_export);
+ string direct_export,
+ XmlRpcServer* s);
void run();
void on_stop();
Modified: trunk/apps/examples/xmlrpc2di/etc/xmlrpc2di.conf
===================================================================
--- trunk/apps/examples/xmlrpc2di/etc/xmlrpc2di.conf 2008-09-24 00:56:34 UTC
(rev 1101)
+++ trunk/apps/examples/xmlrpc2di/etc/xmlrpc2di.conf 2008-09-25 22:42:30 UTC
(rev 1102)
@@ -1,6 +1,17 @@
# port to bind XMLRPC server to
xmlrpc_port=8090
+
+# run multi-threaded server?
+# Default: yes
+#
+# multithreaded = yes
+
+# threads to run - this many requests can be processed in parallel
+# Default: 5
+#
+# threads=5
+
# export all DI functions with the function call 'di'?
# defaults to: yes
# export_di=yes
@@ -11,3 +22,6 @@
# defaults to: none
# direct_export=di_dial;registrar_client
+# run the XMLRPC server at all (default: yes)
+#
+# run_server=yes
_______________________________________________
Semsdev mailing list
[email protected]
http://lists.iptel.org/mailman/listinfo/semsdev