Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs7.sourceforge.net:/tmp/cvs-serv3214/runtime
Modified Files:
Tag: XQuery_0-16
pathfinder.mx serialize_dflt.mx shttpd.c shttpd.h
xrpc_client.mx xrpc_server.mx
Log Message:
- changed XRPC to reuse MAPI clients instead of forking a pthread
for handling each individual request.
code may not be fully working yet -- more tomorrow
check-in to hand over to Jennie.
Index: shttpd.c
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/shttpd.c,v
retrieving revision 1.19
retrieving revision 1.19.4.1
diff -u -d -r1.19 -r1.19.4.1
--- shttpd.c 6 Oct 2006 12:22:53 -0000 1.19
+++ shttpd.c 15 Feb 2007 01:46:13 -0000 1.19.4.1
@@ -464,7 +464,7 @@
struct userurl *p;
for (p = urls; p != NULL; p = p->next)
- if (strcmp(p->url, url) == 0)
+ if (strncmp(p->url, url, strlen(p->url)) == 0)
return (p);
return (NULL);
@@ -486,41 +486,8 @@
#include <pthread.h>
#endif /* _WIN32 */
-struct thrarg {
- struct userurl userurl;
- struct shttpd_callback_arg arg;
-};
-/*
- * This function is run in dedicated thread.
- * It executes user-defined callback function, and exits.
- * Return value is discarded.
- */
-static void * WINAPI
-do_thread(struct thrarg *p)
-{
- /* Put client socket to blocking mode */
-#ifdef _WIN32
- u_long flags = 0;
- (void) ioctlsocket(p->arg.connection->sock, FIONBIO, &flags);
-#else
- int flags = fcntl(p->arg.connection->sock, F_GETFL);
- flags &= ~O_NONBLOCK;
- (void) fcntl(p->arg.connection->sock, F_SETFL, flags);
- (void) signal(SIGPIPE, SIG_IGN);
-#endif /* _WIN32 */
- /* Call user function */
- p->userurl.func(&p->arg);
-
- /* Free up the resources XXX what about concurency here ? */
- p->arg.connection->flags |= FLAG_FINISHED;
- free(p);
-
- pthread_exit(0);
-
- return (NULL);
-}
/*
* The URI should be handled is user-registered callback function.
@@ -530,19 +497,18 @@
* number of bytes copied to the local IO buffer. Mark local IO as done,
* and shttpd will take care about passing data back to client.
*/
+
static void
do_embedded(struct conn *c)
{
- struct shttpd_callback_arg arg;
+ struct shttpd_callback_arg *arg = malloc(sizeof(struct
shttpd_callback_arg));
const struct userurl *p = c->userurl;
unsigned long n;
- pthread_t thr;
- struct thrarg *param;
- arg.connection = c;
- arg.callback_data = p->data;
- arg.buf = c->local.buf;
- arg.buflen = sizeof(c->local.buf);
+ arg->connection = c;
+ arg->callback_data = p->data;
+ arg->buf = c->local.buf;
+ arg->buflen = sizeof(c->local.buf);
if (c->http_method == METHOD_POST && c->cclength) {
if (c->query == NULL) {
@@ -581,16 +547,21 @@
/* Null-terminate query data */
c->query[c->cclength] = '\0';
}
- /* Multi-Threaded scenario. Run dedicated thread for connection. */
- if ((param = malloc(sizeof(*param))) == NULL)
- return;
- param->userurl = *p;
- param->arg = arg;
-
/* FIXME check return value */
c->flags |= FLAG_THREADED;
- (void) pthread_create(&thr, 0,(LPTHREAD_START_ROUTINE)do_thread, param);
- (void) pthread_detach(thr);
+
+ /* Put client socket to blocking mode */
+#ifdef _WIN32
+ u_long flags = 0;
+ (void) ioctlsocket(arg->connection->sock, FIONBIO, &flags);
+#else
+ int flags = fcntl(arg->connection->sock, F_GETFL);
+ flags &= ~O_NONBLOCK;
+ (void) fcntl(arg->connection->sock, F_SETFL, flags);
+ (void) signal(SIGPIPE, SIG_IGN);
+#endif /* _WIN32 */
+
+ p->func(arg);
}
/*
@@ -654,6 +625,7 @@
/*
* Change all occurences of '/' characters to OS-specific directory separator
*/
+
static void
copypath(const char *src, char *dst, size_t dstlen)
{
@@ -3069,8 +3041,7 @@
/* If document_root is not set, set it to current directory */
if (STROPT(OPT_DOCROOT) == NULL){
char tmp[8192];
- char *docroot = NULL;
- docroot = getcwd(tmp, 8192);
+ char *docroot = getcwd(tmp, 8192);
if (docroot == NULL) {
elog(ERR_FATAL, "unable to set document_root: %s",
strerror(errno));
return; /* force abort */
@@ -3259,6 +3230,26 @@
return arg->connection->sock;
}
+char*
+shttpd_get_method(struct shttpd_callback_arg *arg)
+{
+ return arg->connection->method;
+}
+
+char*
+shttpd_get_uri(struct shttpd_callback_arg *arg)
+{
+ return arg->connection->uri;
+}
+
+void
+shttpd_finish(struct shttpd_callback_arg *arg)
+{
+ /* Free up the resources XXX what about concurency here ? */
+ arg->connection->flags |= FLAG_FINISHED;
+ free(arg);
+}
+
static void
substitute(char *buf, const char *kw, const char *subst)
{
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.288.2.12
retrieving revision 1.288.2.13
diff -u -d -r1.288.2.12 -r1.288.2.13
--- pathfinder.mx 12 Feb 2007 22:51:42 -0000 1.288.2.12
+++ pathfinder.mx 15 Feb 2007 01:46:12 -0000 1.288.2.13
@@ -368,9 +368,10 @@
@:[EMAIL PROTECTED](NODE, 32)@
@:[EMAIL PROTECTED](ATTR, 33)@
@:[EMAIL PROTECTED](ELEM, 34)@
+@:ws_kind_decl(@1)@
@= ws_mil_decl
-const @1 := @2;
+const @1 := @[EMAIL PROTECTED];
@mil
@:ws(mil_decl)@
@:ws_decl(mil)@
@@ -381,13 +382,15 @@
# KIND constants, carefully chosen
# atomic value items can be retrieved with 'kind.select(int_nil,ATOMIC)'
# NODE is not a type but all node types can be retrieved with
'kind.select(NODE,int_nil)'
-const ELEMENT := chr(0);
-const TEXT := chr(1);
-const COMMENT := chr(2);
-const PI := chr(3);
-const DOCUMENT := chr(4);
-const COLLECTION := chr(5);
[EMAIL PROTECTED] ws_kind_decl
+@:[EMAIL PROTECTED](ELEMENT, 0,,,,,,.chr())@
+@:[EMAIL PROTECTED](TEXT, 1,,,,,,.chr())@
+@:[EMAIL PROTECTED](COMMENT, 2,,,,,,.chr())@
+@:[EMAIL PROTECTED](PI, 3,,,,,,.chr())@
+@:[EMAIL PROTECTED](DOCUMENT, 4,,,,,,.chr())@
+@:[EMAIL PROTECTED](COLLECTION,5,,,,,,.chr())@
[EMAIL PROTECTED]
# zero
const WS := [EMAIL PROTECTED]; # first container in ws =
transient doc container
const PRE_BASE := [EMAIL PROTECTED]; # all our PRE bats start at 0
(the super-root)
@@ -3345,42 +3348,14 @@
@:ws(c_decl)@
@:ws_decl(c)@
-#define SOAP_NS "http://www.w3.org/2003/05/soap-envelope"
-#define XDT_NS "http://www.w3.org/2005/xpath-datatypes"
-#define XS_NS "http://www.w3.org/2001/XMLSchema"
-#define XSI_NS "http://www.w3.org/2001/XMLSchema-instance"
-#define XRPC_NS "http://monetdb.cwi.nl/XQuery"
-#define XRPC_LOC "http://monetdb.cwi.nl/XQuery/XRPC.xsd"
-
#define XTRACT_KIND(X) (X & 63)
#define XTRACT_CONT(X) (X >> 6)
#define SET_CONT_KIND(X,Y) (X << 6 | Y)
-/* xquery_method : execute a loop-lifted xquery function
- *
- * argc = #params
- * itercnt = #iterations
- * argcnt[iter] = #items per param
- * argtpe[] = xquery type of each parameter (as string, e.g. 'xs:integer')
- * argval[] = str representation of item (e.g. '42')
- *
- * shredBAT = optional shredded document table, that is added to working set
- * params of type xs:anyNode are represented as int pre-numbers.
- *
- * we return an error string, or NULL iff everything went A-OK
- */
-pathfinder_export char *
-xquery_method(stream *out,
- int timing,
- char* moduleNS,
- char* uri,
- char *method,
- lng argc,
- lng itercnt,
- lng** argcnt,
- str* argtpe,
- str* argval,
- BAT* shredBAT);
+/* exports for XRPC */
+pathfinder_export void xquery_client_engine(mapi_client*);
+pathfinder_export char* xquery_method(mapi_client*, int, char*, char*, char*,
lng, lng, lng**, str*, str*, BAT*);
+pathfinder_export void xquery_client_end(mapi_client *, char *);
#endif
@c
@@ -4372,18 +4347,22 @@
* =================== client session management
================================
*
* xquery_client *
- * xquery_client_alloc(mapi_client *fc);
+ * xquery_client_alloc(mapi_client *mc);
* - allocate a new xquery client, returns error message (NULL on success)
*
* char*
- * xquery_client_init(mapi_client *fc);
+ * xquery_client_init(mapi_client *mc);
* - initialize a new xquery cache context, returns error string (NULL if ok)
*
* void
- * xquery_client_free(mapi_client *fc);
+ * xquery_client_free(mapi_client *mc);
* - free a xquery cache context (terminate)
*
* void
+ * xquery_client_reset(xquery_client *ctx, char *err);
+ * - reset xquery execution for next statement
+ *
+ * void
* xquery_client_end(xquery_client *ctx, char *err);
* - end of xquery execution (struct stays alive for reuse).
*/
@@ -4475,11 +4454,11 @@
* allocate a new xquery client, returns client cntxt (0 on error)
*/
static char*
-xquery_client_alloc(mapi_client *fc)
+xquery_client_alloc(mapi_client *mc)
{
- xquery_client *ctx = xquery_client_new(fc->stk);
+ xquery_client *ctx = xquery_client_new(mc->stk);
- fc->fc = ctx;
+ mc->fc = ctx;
if (!ctx) {
return "!ERROR: no space to allocate xquery client\n";
} else {
@@ -4549,9 +4528,9 @@
}
static void
-xquery_client_free(mapi_client *fc)
+xquery_client_free(mapi_client *mc)
{
- xquery_client_free_(fc->fc);
+ xquery_client_free_(mc->fc);
}
@@ -4572,12 +4551,12 @@
static char*
-xquery_client_init(mapi_client *fc)
+xquery_client_init(mapi_client *mc)
{
- xquery_client *ctx = fc->fc;
+ xquery_client *ctx = mc->fc;
char* err = xquery_client_init_(ctx);
- ctx->fderr = fc->c->fdout;
+ ctx->fderr = mc->c->fdout;
if (err == 0 && (ctx->cacheid != xquery_cacheid || ctx->cachesize >
xquery_client_bytes)) {
/* re-initialize everything */
@@ -4585,7 +4564,7 @@
err = xquery_client_alloc_(ctx);
}
if (err)
- fprintf(stderr, "xquery_client_init: client %d %s\n", fc->stk, err);
+ fprintf(stderr, "xquery_client_init: client %d %s\n", mc->stk, err);
return err;
}
@@ -4595,7 +4574,7 @@
* end of xquery execution (struct stays alive for reuse).
*/
static void
-xquery_client_end(xquery_client *ctx, char *err)
+xquery_client_reset(xquery_client *ctx, char *err)
{
oid zero = 0, one = 1;
@@ -4637,7 +4616,7 @@
*ctx->shredBAT = int_nil;
- MT_set_lock(pf_cache_lock, "xquery_client_end");
+ MT_set_lock(pf_cache_lock, "xquery_client_reset");
/* only deactivate the loaded modules */
xquery_loaded_module *mod= ctx->loaded_modules;
while(mod) {
@@ -4645,7 +4624,7 @@
mod->ns = NULL;
mod = mod->next;
}
- MT_unset_lock(pf_cache_lock, "xquery_client_end");
+ MT_unset_lock(pf_cache_lock, "xquery_client_reset");
if (err)
fprintf(stderr, "xquery_server: client %d %s\n", ctx->stk, err);
@@ -5098,13 +5077,13 @@
return NULL;
}
-static void
-xquery_client_engine(mapi_client *fc )
+void
+xquery_client_engine(mapi_client *mc)
{
ssize_t n = 0;
size_t curlen;
- xquery_client *ctx = fc->fc;
- stream *in = fc->c->fdin, *out = fc->c->fdout;
+ xquery_client *ctx = mc->fc;
+ stream *in = mc->c->fdin, *out = mc->c->fdout;
char *p, *xquery, *err = (char*) -1, *msg = NULL;
/* buf is the mode string that is available during execution in the MIL
genType var */
@@ -5117,7 +5096,7 @@
ctx->mode = XQ_MAPI;
while (1) {
/* use the MAPI protocol to read as much xquery buffer as possible */
- if (!fc->c->blocked &&
+ if (!mc->c->blocked &&
stream_write(out, PROMPT1, sizeof(PROMPT1) - 1, 1) < 0) {
msg = "could not write prompt";
break;
@@ -5164,7 +5143,7 @@
p += sizeof("copy ")-1;
/* start shredder (first character should be skipped)
*/
- if (!fc->c->blocked &&
+ if (!mc->c->blocked &&
stream_write(out, PROMPT1, sizeof(PROMPT1) -
1, 1) < 0) {
msg = "could not write prompt";
break;
@@ -5212,7 +5191,7 @@
/* memory debugging */
xquery_prepared_function *fun = ctx->prepared_functions;
BAT *b = NULL;
- view_client_size(&b, &fc->stk);
+ view_client_size(&b, &mc->stk);
if (b) {
while(fun) {
BUNins(b, fun->def->proc, &fun->def->size, FALSE);
@@ -5224,19 +5203,25 @@
}
}
/* second and on queries also need a cleared context */
- xquery_client_end(ctx, NULL);
+ xquery_client_reset(ctx, NULL);
}
- xquery_client_end(ctx, msg);
+ xquery_client_end(mc, msg);
+}
- /* for some reason, mapi_client_end sets keep_open=1 so it continuously
leaks streams */
- stream_close(fc->c->fdin);
- stream_destroy(fc->c->fdin);
- fc->c->fdin = NULL;
- stream_close(fc->c->fdout);
- stream_destroy(fc->c->fdout);
- fc->c->fdout = NULL;
- monetSetChannel(THRget(THRgettid()), NULL, NULL);
+void
+xquery_client_end(mapi_client *mc, char *err)
+{
+ xquery_client *ctx = (xquery_client*) mc->fc;
+
+ xquery_client_reset(ctx, err);
+ stream_close(mc->c->fdin);
+ stream_destroy(mc->c->fdin);
+ mc->c->fdin = NULL;
+ stream_close(mc->c->fdout);
+ stream_destroy(mc->c->fdout);
+ mc->c->fdout = NULL;
+ monetSetChannel(THRget(THRgettid()), NULL, NULL);
}
int
@@ -5346,12 +5331,22 @@
return GDK_SUCCEED;
}
-/*
- * call a method in a temporary xquery client context
- */
+/* xquery_method : execute a loop-lifted xquery function
+ *
+ * argc = #params
+ * itercnt = #iterations
+ * argcnt[iter] = #items per param
+ * argtpe[] = xquery type of each parameter (as string, e.g. 'xs:integer')
+ * argval[] = str representation of item (e.g. '42')
+ *
+ * shredBAT = optional shredded document table, that is added to working set
+ * params of type xs:anyNode are represented as int pre-numbers.
+ *
+ * we return an error string, or NULL iff everything went A-OK
+ */
char*
-xquery_method(stream *out,
- int timing,
+xquery_method(mapi_client *mc,
+ int flags,
char* module,
char* uri,
char* method,
@@ -5362,38 +5357,24 @@
str* argval,
BAT* shredBAT)
{
+ xquery_client *ctx = (xquery_client*) mc->fc;
lng usec = GDKusec();
- MT_Id XQthread_id = THRgettid(), old_tid;
- Thread XQthread = THRget(XQthread_id), old_thread;
- char *err, *buf, *ns = "fn", *mode = "xml-noheader-xrpc";
+ char *mode = (flags&2)?"timing-xml-noheader":"timing-xml-noheader-xrpc";
+ char *err, *buf, *ns = "fn";
stream *s = NULL;
- mapi_client *mc = MAPIclient(GDKin, out, "xquery" );
- xquery_client *ctx;
- Variable v;
-
- if (mc == NULL || mc->fc == NULL)
- return "xquery_method: out of client slots.\n";
- ctx = mc->fc;
- ctx->fderr = GDKerr;
ctx->mode = 0;
-
buf = GDKstrdup(module);
(*ctx->moduleNS) = buf;
buf = GDKstrdup(method);
(*ctx->method) = buf;
- if (timing){
- mode = "timing-xml-noheader-xrpc";
+ if ((flags&1) == 0){
+ mode += 7; /* no "timing-" */
}
err = xquery_change_genType(ctx, mode);
if (err) return err;
- old_tid = mc->t;
- old_thread = mc->thread;
- mc->t = XQthread_id;
- mc->thread = XQthread;
- monetSetChannel(XQthread, GDKin, out);
if (argc >= 1000) {
/* hack: pass argc+1000 and you get debug output */
s = open_wastream("/tmp/xrpc.mil");
@@ -5417,31 +5398,8 @@
if (s) {
stream_close(ctx->fderr);
stream_destroy(ctx->fderr);
+ ctx->fderr = GDKerr;
}
- monetSetChannel(XQthread, GDKin, GDKout);
-
- mc->t = old_tid;
- mc->thread = old_thread;
- xquery_client_end(ctx, NULL);
-
- for (v = monet_cntxt[mc->stk].var; v && v != mc->lastvar; v = v->next) {
- ATOMunfix(v->binding.vtype, VALptr(&v->binding));
- VALclear(&v->binding); /* -> void-nil (plus clean-up/free) */
- VALset(&v->binding, v->binding.vtype, ATOMnilptr(v->binding.vtype));
/* -> typed-nil */
- }
- /*
- if (mc->c->fdout) {
- stream_close(mc->c->fdout);
- stream_destroy(mc->c->fdout);
- mc->c->fdout = NULL;
- }
- if (mc->c->fdin) {
- stream_close(mc->c->fdin);
- stream_destroy(mc->c->fdin);
- mc->c->fdin = NULL;
- }
- */
- mc->inuse = 0;
return err;
}
Index: shttpd.h
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/shttpd.h,v
retrieving revision 1.6
retrieving revision 1.6.6.1
diff -u -d -r1.6 -r1.6.6.1
--- shttpd.h 4 May 2006 16:48:59 -0000 1.6
+++ shttpd.h 15 Feb 2007 01:46:13 -0000 1.6.6.1
@@ -167,8 +167,12 @@
extern void shttpd_poll(shttpd_socket *ctx, unsigned milliseconds);
extern const char *shttpd_get_var(struct conn *, const char *varname);
extern int shttpd_template(struct conn *, const char *headers, const char
*file, ...);
+
extern char *shttpd_get_msg(struct shttpd_callback_arg *arg);
+extern char *shttpd_get_method(struct shttpd_callback_arg *arg);
+extern char *shttpd_get_uri(struct shttpd_callback_arg *arg);
extern int shttpd_get_socket(struct shttpd_callback_arg *arg);
+extern void shttpd_finish(struct shttpd_callback_arg *arg);
#ifdef MT
extern int shttpd_printf(struct conn *, const char *fmt, ...);
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.6.2.4
retrieving revision 1.6.2.5
diff -u -d -r1.6.2.4 -r1.6.2.5
--- xrpc_client.mx 8 Feb 2007 18:15:29 -0000 1.6.2.4
+++ xrpc_client.mx 15 Feb 2007 01:46:14 -0000 1.6.2.5
@@ -436,46 +436,9 @@
#ifndef XRPC_CLIENT_H
#define XRPC_CLIENT_H
-#include "pathfinder.h"
-
-#include <gdk.h>
-#include <stream.h>
-#include <time.h>
-
-#ifdef _WIN32 /* Windows specific */
- #include <winsock.h>
- #define snprintf _snprintf
- #ifndef __MINGW32__
- #pragma comment(lib, "ws2_32")
- #endif
- #define sleep(s) Sleep(1000*(s))
-#else /* UNIX specific */
- #include <sys/select.h>
- #include <sys/types.h> /* used by socket */
- #include <sys/socket.h>
- #include <unistd.h>
- #include <netinet/in.h> /* hton and ntoh */
- #include <arpa/inet.h> /* dotted IP addr to and from 32-bits int */
- #include <netdb.h> /* convert domain names into IP addr */
- #include <errno.h>
- #include <ctype.h>
-#endif
-
-#include <stdio.h>
-#include <stdlib.h>
-
-#include "pf_support.h"
-#include "shredder.h"
-#include "serialize.h"
+#include "xrpc_server.h"
-#define PFTEXT 1
-#define ELEMENT 0
-#define MAX_NR_PARAMS 32
-#define HTTP_PORT 48080
-#define HTTPD_FUNC "/xrpc"
-#define MIN_RESPONSE_SIZE 16
#define MAX_BUF_SIZE (1024*1024)
-#define MAX_POST_HEADER_SIZE 1024
#define NR_RETRIES 3
#endif /* XRPC_CLIENT_H */
@@ -575,7 +538,7 @@
struct in_addr addr;
struct sockaddr_in sockaddr;
struct hostent *resolv = NULL;
- int i, ret, sock, port = HTTP_PORT;
+ int i, ret, sock, port = HTTPD_DEFAULT_PORT;
str strptr = NULL;
errno = 0;
@@ -801,12 +764,7 @@
return GDK_FAIL;
}
- b->pos = snprintf(b->buf, b->len,
- "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
- "<env:Envelope xmlns:env=\"%s\" xmlns:xrpc=\"%s\" "
- "xmlns:xs=\"%s\" xmlns:xsi=\"%s\" "
- "xsi:schemaLocation=\"%s %s\"><env:Body><xrpc:request "
- "xrpc:module=\"%s\" xrpc:location=\"%s\" xrpc:method=\"%s\">",
+ b->pos = snprintf(b->buf, b->len, XRPC_HEADER,
SOAP_NS, XRPC_NS, XS_NS, XSI_NS, XRPC_NS, XRPC_LOC,
rpc_module, rpc_uri, rpc_method);
@@ -1063,7 +1021,7 @@
}
str2buf(b, "</xrpc:call>");
}
- str2buf(b, "</xrpc:request></env:Body></env:Envelope>");
+ str2buf(b, XRPC_FOOTER);
b->buf[b->pos] = 0;
/* Stop timing Client Serialisation */
time_clntSeria = GDKusec() - time_clntSeria;
Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.23.2.2
retrieving revision 1.23.2.3
diff -u -d -r1.23.2.2 -r1.23.2.3
--- serialize_dflt.mx 8 Feb 2007 18:15:28 -0000 1.23.2.2
+++ serialize_dflt.mx 15 Feb 2007 01:46:12 -0000 1.23.2.3
@@ -37,6 +37,7 @@
#include "pf_config.h"
#include "serialize.h"
+#include "xrpc_server.h"
/* contains dummy callback functions */
#include "serialize_null.h"
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.7.2.5
retrieving revision 1.7.2.6
diff -u -d -r1.7.2.5 -r1.7.2.6
--- xrpc_server.mx 12 Feb 2007 22:51:42 -0000 1.7.2.5
+++ xrpc_server.mx 15 Feb 2007 01:46:14 -0000 1.7.2.6
@@ -43,6 +43,7 @@
.COMMAND httpd_start(int port, str option) : void = CMDhttpd_start;
"Start the HTTP server for RPC calls on the specified port."
+.EPILOGUE = xrpc_epilogue;
.END xrpc_server;
@mil
@@ -104,6 +105,7 @@
#include <gdk.h>
#include <stream.h>
+#include <mapi.h>
#include <time.h>
#ifdef _WIN32 /* Windows specific */
@@ -118,6 +120,9 @@
#include <sys/types.h> /* used by socket */
#include <sys/socket.h>
#include <unistd.h>
+ #include <netinet/in.h> /* hton and ntoh */
+ #include <arpa/inet.h> /* dotted IP addr to and from 32-bits int */
+ #include <netdb.h> /* convert domain names into IP addr */
#include <errno.h>
#include <ctype.h>
#endif
@@ -125,43 +130,64 @@
#include <stdio.h>
#include <stdlib.h>
+#include "pathfinder.h"
#include "pf_support.h"
#include "shredder.h"
#include "serialize.h"
#include "shttpd.h"
-#define MAXPARAMS 32
-#define HTTPD_FUNC "/xrpc"
-#define ERR404 "404 Bad Request"
-#define ERR500 "500 Internal Server Error"
-#define OUT_OF_MEM "Internal Receiver Error: out-of memory"
-#define NOT_WELL_FORMED "Request XML message not well-formed"
-
-/* TODO: why don't we define this in pathfinder.mx??? */
-#define ELEMENT 0
-#define PFTEXT 1
-#define COMMENT 2
-#define PI 3
-#define DOCUMENT 4
-#define COLLECTION 5
+#define HTTPD_DEFAULT_PORT 48080
+#define HTTPD_FUNC "/xrpc"
+#define ERR404 "404 Bad Request"
+#define ERR500 "500 Internal Server Error"
+#define OUT_OF_MEM "Internal Receiver Error: out-of memory"
+#define NOT_WELL_FORMED "Request XML message not well-formed"
+#define MAX_NR_PARAMS 32
-#define SOAP_NS "http://www.w3.org/2003/05/soap-envelope"
-#define XRPC_NS "http://monetdb.cwi.nl/XQuery"
-#define XRPC_LOC "http://monetdb.cwi.nl/XQuery/XRPC.xsd"
-#define XS_NS "http://www.w3.org/2001/XMLSchema"
-#define XSI_NS "http://www.w3.org/2001/XMLSchema-instance"
+#define SOAP_NS "http://www.w3.org/2003/05/soap-envelope"
+#define XDT_NS "http://www.w3.org/2005/xpath-datatypes"
+#define XS_NS "http://www.w3.org/2001/XMLSchema"
+#define XSI_NS "http://www.w3.org/2001/XMLSchema-instance"
+#define XRPC_NS "http://monetdb.cwi.nl/XQuery"
+#define XRPC_LOC "http://monetdb.cwi.nl/XQuery/XRPC.xsd"
-/* To prevent starting the HTTP server twice */
-int rpcd_running = 0;
-int timing = 0;
+#define XRPC_HEADER "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"\
+ "<env:Envelope xmlns:env=\"%s\""\
+ " xmlns:xrpc=\"%s\""\
+ " xmlns:xs=\"%s\""\
+ " xmlns:xsi=\"%s\""\
+ " xsi:schemaLocation=\"%s %s\">"\
+ "<env:Body>"\
+ "<xrpc:request xrpc:module=\"%s\""\
+ " xrpc:location=\"%s\""\
+ " xrpc:method=\"%s\">"
+#define XRPC_HTTP_CALL "<xrpc:call>"\
+ "<xrpc:sequence>"\
+ "<xrpc:atomic-value
xsi:type=\"xs:string\">%s</xrpc:atomic-value>"\
+ "</xrpc:sequence>"\
+ "</xrpc:call>"
+#define XRPC_PUT_CALL "<xrpc:call>"\
+ "<xrpc:sequence>"\
+ "<xrpc:atomic-value
xsi:type=\"xs:string\">%s</xrpc:atomic-value>"\
+ "</xrpc:sequence>"\
+ "<xrpc:sequence>"\
+ "<xrpc:element>%s</xrpc:element>"\
+ "</xrpc:sequence>"\
+ "</xrpc:call>"
+#define XRPC_FOOTER "</xrpc:request>"\
+ "</env:Body>"\
+ "</env:Envelope>"
#endif /* XQUERY_RPC_H */
@c
#include "pf_config.h"
#include "xrpc_server.h"
-/* stream_close(s); stream_destroy(s); \
- * */
+
+/* To prevent starting the HTTP server twice */
+int rpcd_running = 0;
+int timing = 0;
+
#define clean_up(s, argcnt, argtpe, argval, iterc, nr_args) { \
lng i = 0; \
@@ -205,37 +231,52 @@
static BAT*
request2bat(struct shttpd_callback_arg *arg, stream *out)
{
- char *strptr = NULL, *req_msg = NULL;
+ char *strptr = NULL, *del = NULL, *req_msg = shttpd_get_msg(arg);
lng percentage = 0;
BAT *shredBAT = NULL;
bit verbose = FALSE;
- /* Remove the first line of the message, which containing
- * "<?xml...?>", so that the message we pass to CMDshred2bats starts
- * directly with <env:Envelope ...> */
- req_msg = shttpd_get_msg(arg);
- strptr = req_msg + 7; /* strlen("<?xml...?>") >= 7 */
- if ( (strstr(req_msg, "<?xml") != req_msg) ||
- (strptr = strchr(strptr, (int)'<')) == NULL ) {
- send_err(out, 1, ERR404, "env:Sender", NOT_WELL_FORMED);
- return NULL;
- }
+ if (strcmp(shttpd_get_method(arg), "POST")) {
+ /* support GET, PUT and DEL by redirecting those to a predefined XRPC
module */
+ char *docname = shttpd_get_uri(arg) + 6; /* skip "/xrpc/" prefix that
directed us here */
+ char *body_format = XRPC_HTTP_CALL;
+ char *p, uri[128];
- if(!(shredBAT = BATnew(TYPE_str, TYPE_bat, 32))){
- send_err(out, 1, ERR500, "env:Receiver", OUT_OF_MEM);
- return NULL;
- }
+ size_t len = 2000 + strlen(docname);
+ if (strcmp(shttpd_get_method(arg), "PUT") == 0) len += strlen(req_msg);
- if (CMDshred_str(shredBAT, strptr, &percentage, NULL, &verbose) ==
GDK_FAIL) {
- if(BBPreclaim(shredBAT) == -1){
- GDKerror("request2bat: failed to destroy shredBAT on error, "
- "because it is used by other process!");
- GDKerror("THIS SHOULD NEVER HAPPEN!!!");
+ strptr = del = p = (char*) GDKmalloc(len);
+ if (del == NULL) {
+ send_err(out, 1, ERR500, "env:Receiver", OUT_OF_MEM);
+ return NULL;
+ }
+
+ /* the PUT, GET and DELETE methods are implemented in this module */
+ sprintf(uri, "http://localhost:%d/admin/http.xq", rpcd_running);
+
+ /* create a fake XRPC message for GET/PUT/DELETE (=method) that
invoke xrpc:method(URI) */
+ p += sprintf(p, XRPC_HEADER, SOAP_NS, XRPC_NS, XS_NS, XSI_NS, XRPC_NS,
XRPC_LOC,
+ "http", uri, shttpd_get_method(arg));
+ p += sprintf(p, body_format, docname, req_msg);
+ strcpy(p, XRPC_FOOTER);
+ } else {
+ /* Remove the first line of the message, which containing
+ * "<?xml...?>", so that the message we pass to CMDshred2bats starts
+ * directly with <env:Envelope ...> */
+ strptr = req_msg + 7; /* strlen("<?xml...?>") >= 7 */
+ if ((strstr(req_msg, "<?xml") != req_msg) || (strptr = strchr(strptr,
(int)'<')) == NULL ) {
+ send_err(out, 1, ERR404, "env:Sender", NOT_WELL_FORMED);
+ return NULL;
}
+ }
+ if (!(shredBAT = BATnew(TYPE_str, TYPE_bat, 32))) {
+ send_err(out, 1, ERR500, "env:Receiver", OUT_OF_MEM);
+ } else if (CMDshred_str(shredBAT, strptr, &percentage, NULL, &verbose) ==
GDK_FAIL) {
+ BBPreclaim(shredBAT);
+ shredBAT = NULL;
send_err(out, 1, ERR404, "env:Sender", NOT_WELL_FORMED);
- return NULL;
}
-
+ if (del) GDKfree(del);
return shredBAT;
}
@@ -407,9 +448,8 @@
* @return GDK_SUCCEED, or
* GDK_FAIL if an error has occurred.
*/
-static int
-handle_rpc_request(struct shttpd_callback_arg *arg)
-{
+static int
+xrpc_handle_request(mapi_client *mc, struct shttpd_callback_arg *arg) {
char errstr[1024], *module = NULL, *location = NULL, *method = NULL;
int sock = -1, k = 0, simple_param = 1;
stream *out = NULL;
@@ -445,12 +485,14 @@
char level_diff = 0; /* indicates how many levels each pre_level
value of a node should be reduced. */
- THRnew(MT_getpid(), "rpc_req_handler"); /* register this thread */
+ int serializeMode = timing;
+ /* GET,PUT,DELETE HTTP requests get serializeMode|=2 to avoid XRPC
serialization */
+ if (strcmp(shttpd_get_method(arg), "POST")) serializeMode |= 2;
/* Create our own output stream for further data transfer. */
sock = shttpd_get_socket(arg);
if (!(out = socket_wastream(sock, "rpc_response"))) {
- GDKerror("handle_rpc_request: failed to create "
+ GDKerror("xrpc_handle_request: failed to create "
"socket_wastream for socket %d", sock);
close(sock);
return GDK_FAIL;
@@ -560,7 +602,7 @@
argcnt[i][j] = 0;
}
- max_args = iterc * MAXPARAMS;
+ max_args = iterc * MAX_NR_PARAMS;
argval = GDKmalloc(max_args * sizeof(char *));
argtpe = GDKmalloc(max_args * sizeof(char *));
if (!argval || !argtpe) {
@@ -597,7 +639,7 @@
if (nr_args == max_args) {
snprintf(errstr, 1024, "XRPC request: too many "
- "parameters, maximum is %d", MAXPARAMS);
+ "parameters, maximum is %d", MAX_NR_PARAMS);
send_err(out, 1, ERR404, "env:Sender", errstr);
clean_up(out,argcnt,argtpe,argval,iterc,nr_args);
return GDK_FAIL;
@@ -629,7 +671,7 @@
val_node_pre = tpe_node_pre + 1;
if( (tpe_node_size != 1) ||
- (pre_kindT[val_node_pre] != PFTEXT) ) {
+ (pre_kindT[val_node_pre] != TEXT) ) {
snprintf(errstr, 1024, "XRPC request: "
"iteration%lld/parameter%lld/value%d "
"of type \"%s\" is expected to have a "
@@ -687,7 +729,7 @@
pre_kindT);
} else if (strcmp(argtpe[nr_args], "xs:text") == 0) {
val_node_pre = get_elem_pre(tpe_node_pre+1,
- (tpe_node_pre+tpe_node_size), PFTEXT,
+ (tpe_node_pre+tpe_node_size), TEXT,
pre_kindT);
} else if (strcmp(argtpe[nr_args], "xs:comment") == 0) {
val_node_pre = get_elem_pre(tpe_node_pre+1,
@@ -757,7 +799,7 @@
stream_printf(out, "HTTP/1.1 200 OK\r\n"
"Content-Type: application/soap+xml; "
"charset=\"utf-8\"\r\n\r\n");
- char *err = xquery_method(out, timing, module, location, method,
+ char *err = xquery_method(mc, serializeMode, module, location, method,
argc, iterc, argcnt, argtpe, argval,
simple_param?NULL:shredBAT);
@@ -777,26 +819,71 @@
}
clean_up(out, argcnt, argtpe, argval, iterc, nr_args);
- if(BBPreclaim(shredBAT) == -1){
- GDKerror("handle_rpc_request: failed to destroy BAT \"shredBAT\",
because it is in use by other process");
- GDKerror("THIS SHOULD NEVER HAPPEN!!!");
- return GDK_FAIL;
- }
-
+ BBPreclaim(shredBAT);
return GDK_SUCCEED;
}
+
+/*
+ * xrpc MAPI client handler (overrides the xquery_client_engine)
+ */
+static void
+xrpc_client_engine(mapi_client *mc) {
+ struct shttpd_callback_arg *arg = (struct shttpd_callback_arg *) mc->arg;
+
+ /* do the work */
+ (void) xrpc_handle_request(mc, arg);
+
+ /* clean up */
+ mc->engine = xquery_client_engine;
+ xquery_client_end(mc, NULL);
+ shttpd_finish(arg);
+}
+
+
+/*
+ * handle request asynchronously using a MAPI xquery client
+ */
+static int
+xrpc_fork_mapiclient(struct shttpd_callback_arg *arg) {
+ /* get me a MAPI thread from the xquery client pool */
+ int sock = shttpd_get_socket(arg);
+ stream *fdin = socket_rastream(sock, "XRPC read");
+ if (fdin && stream_errnr(fdin) == 0) {
+ stream *fdout = socket_wastream(sock, "XRPC write");
+ if (fdout && stream_errnr(fdout) == 0) {
+ mapi_client *mc = MAPIclient(fdin, fdout, "xquery");
+ if (mc) {
+ mc->engine = xrpc_client_engine; /* override
xquery_client_engine (will be restored later) */
+ mc->arg = (char*) arg; /* HACK! pass xrpc arg */
+ MT_up_sema(mc->s, "XRPC"); /* activate the thread */
+ return 0;
+ }
+ stream_close(fdout);
+ stream_destroy(fdout);
+ }
+ stream_close(fdin);
+ stream_destroy(fdin);
+ }
+ shttpd_finish(arg);
+ return -1;
+}
+
int
CMDhttpd_start(int* port, str option)
{
shttpd_socket ctx;
+ char *s = "/usr/share/", docdir[1024]; /* httpd serves out
/usr/share/MonetDB/xrpc/ */
+ BUN p = BUNfnd(GDKenv, "datadir"); /* normally use 'datadir' iso
/usr/share (often datdir = prefix/share) */
+ if (p) s = BUNtail(GDKenv,p);
+ snprintf(docdir, 1024, "%s%cMonetDB%cxrpc%c", s, DIR_SEP, DIR_SEP,
DIR_SEP);
if (rpcd_running) {
GDKerror("CMDhttpd_start: RPC receiver already running\n");
return GDK_FAIL;
}
- rpcd_running = 1;
+ rpcd_running = *port;
if (option && strstr(option, "timing") != NULL)
timing = 1;
@@ -804,9 +891,10 @@
/* Initialize with specific config file, pass NULL to use default
* values */
shttpd_init(NULL);
+ shttpd_setopt("document_root", docdir);
/* Register call back function */
- shttpd_register_url(HTTPD_FUNC, &handle_rpc_request, NULL);
+ shttpd_register_url(HTTPD_FUNC, xrpc_fork_mapiclient, NULL);
/* Open listening socket */
ctx = shttpd_open_port(*port);
@@ -817,10 +905,6 @@
return GDK_SUCCEED;
}
-bat* xrpc_prelude() {
- return NULL; /* Nothing to do here. */
-}
-
void xrpc_epilogue() {
shttpd_fini(); /* Shut down the HTTP server. */
rpcd_running = 0; /* Stop RPC server */
-------------------------------------------------------------------------
Take Surveys. Earn Cash. Influence the Future of IT
Join SourceForge.net's Techsay panel and you'll get the chance to share your
opinions on IT & business topics through brief surveys-and earn cash
http://www.techsay.com/default.php?page=join.php&p=sourceforge&CID=DEVDEV
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins