Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs7.sourceforge.net:/tmp/cvs-serv3214/runtime

Modified Files:
      Tag: XQuery_0-16
        pathfinder.mx serialize_dflt.mx shttpd.c shttpd.h 
        xrpc_client.mx xrpc_server.mx 
Log Message:
- changed XRPC to reuse MAPI clients instead of forking a pthread
  for handling each individual request.

  code may not be fully working yet -- more tomorrow
  check-in to hand over to Jennie.
  


Index: shttpd.c
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/shttpd.c,v
retrieving revision 1.19
retrieving revision 1.19.4.1
diff -u -d -r1.19 -r1.19.4.1
--- shttpd.c    6 Oct 2006 12:22:53 -0000       1.19
+++ shttpd.c    15 Feb 2007 01:46:13 -0000      1.19.4.1
@@ -464,7 +464,7 @@
        struct userurl  *p;
 
        for (p = urls; p != NULL; p = p->next)
-               if (strcmp(p->url, url) == 0)
+               if (strncmp(p->url, url, strlen(p->url)) == 0)
                        return (p);
 
        return (NULL);
@@ -486,41 +486,8 @@
 #include <pthread.h>
 #endif /* _WIN32 */
 
-struct thrarg {
-       struct userurl                  userurl;
-       struct shttpd_callback_arg      arg;
-};
 
-/*
- * This function is run in dedicated thread.
- * It executes user-defined callback function, and exits.
- * Return value is discarded.
- */
-static void * WINAPI
-do_thread(struct thrarg *p)
-{
-       /* Put client socket to blocking mode */
-#ifdef _WIN32
-       u_long flags = 0;
-       (void) ioctlsocket(p->arg.connection->sock, FIONBIO, &flags);
-#else
-       int flags = fcntl(p->arg.connection->sock, F_GETFL);
-       flags &= ~O_NONBLOCK;
-       (void) fcntl(p->arg.connection->sock, F_SETFL, flags);
-       (void) signal(SIGPIPE, SIG_IGN);
-#endif /* _WIN32 */
 
-       /* Call user function */
-       p->userurl.func(&p->arg);
-
-       /* Free up the resources XXX what about concurency here ? */
-       p->arg.connection->flags |= FLAG_FINISHED;
-       free(p);
-
-       pthread_exit(0);
-
-       return (NULL);
-}
 
 /*
  * The URI should be handled is user-registered callback function.
@@ -530,19 +497,18 @@
  * number of bytes copied to the local IO buffer. Mark local IO as done,
  * and shttpd will take care about passing data back to client.
  */
+
 static void
 do_embedded(struct conn *c)
 {
-       struct shttpd_callback_arg      arg;
+       struct shttpd_callback_arg      *arg = malloc(sizeof(struct 
shttpd_callback_arg));
        const struct userurl            *p = c->userurl;
        unsigned long                   n;
-       pthread_t                       thr;
-       struct thrarg                   *param;
 
-       arg.connection          = c;
-       arg.callback_data       = p->data;
-       arg.buf                 = c->local.buf;
-       arg.buflen              = sizeof(c->local.buf);
+       arg->connection         = c;
+       arg->callback_data      = p->data;
+       arg->buf                = c->local.buf;
+       arg->buflen             = sizeof(c->local.buf);
 
        if (c->http_method == METHOD_POST && c->cclength) {
                if (c->query == NULL) {
@@ -581,16 +547,21 @@
                /* Null-terminate query data */
                c->query[c->cclength] = '\0';
        }
-       /* Multi-Threaded scenario. Run dedicated thread for connection. */
-       if ((param = malloc(sizeof(*param))) == NULL)
-               return;
-       param->userurl  = *p;
-       param->arg      = arg;
-
        /* FIXME check return value */
        c->flags |= FLAG_THREADED;
-       (void) pthread_create(&thr, 0,(LPTHREAD_START_ROUTINE)do_thread, param);
-       (void) pthread_detach(thr);
+
+       /* Put client socket to blocking mode */
+#ifdef _WIN32
+       u_long flags = 0;
+       (void) ioctlsocket(arg->connection->sock, FIONBIO, &flags);
+#else
+       int flags = fcntl(arg->connection->sock, F_GETFL);
+       flags &= ~O_NONBLOCK;
+       (void) fcntl(arg->connection->sock, F_SETFL, flags);
+       (void) signal(SIGPIPE, SIG_IGN);
+#endif /* _WIN32 */
+
+       p->func(arg);
 }
 
 /*
@@ -654,6 +625,7 @@
 /*
  * Change all occurences of '/' characters to OS-specific directory separator
  */
+
 static void
 copypath(const char *src, char *dst, size_t dstlen)
 {
@@ -3069,8 +3041,7 @@
        /* If document_root is not set, set it to current directory */
        if (STROPT(OPT_DOCROOT) == NULL){
         char tmp[8192];
-        char *docroot = NULL;
-               docroot = getcwd(tmp, 8192);
+        char *docroot = getcwd(tmp, 8192); 
         if (docroot == NULL) {
             elog(ERR_FATAL, "unable to set document_root: %s", 
strerror(errno));
             return;  /* force abort */
@@ -3259,6 +3230,26 @@
     return arg->connection->sock;
 }
 
+char* 
+shttpd_get_method(struct shttpd_callback_arg *arg)
+{
+    return arg->connection->method;
+}
+
+char* 
+shttpd_get_uri(struct shttpd_callback_arg *arg)
+{
+    return arg->connection->uri;
+}
+
+void 
+shttpd_finish(struct shttpd_callback_arg *arg)
+{
+    /* Free up the resources XXX what about concurency here ? */
+    arg->connection->flags |= FLAG_FINISHED;
+    free(arg);
+}
+
 static void
 substitute(char *buf, const char *kw, const char *subst)
 {

Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.288.2.12
retrieving revision 1.288.2.13
diff -u -d -r1.288.2.12 -r1.288.2.13
--- pathfinder.mx       12 Feb 2007 22:51:42 -0000      1.288.2.12
+++ pathfinder.mx       15 Feb 2007 01:46:12 -0000      1.288.2.13
@@ -368,9 +368,10 @@
 @:[EMAIL PROTECTED](NODE,    32)@
 @:[EMAIL PROTECTED](ATTR,    33)@
 @:[EMAIL PROTECTED](ELEM,    34)@
+@:ws_kind_decl(@1)@
 
 @= ws_mil_decl
-const @1 := @2;
+const @1 := @[EMAIL PROTECTED];
 @mil
 @:ws(mil_decl)@
 @:ws_decl(mil)@
@@ -381,13 +382,15 @@
 # KIND constants, carefully chosen
 # atomic value items can be retrieved with 'kind.select(int_nil,ATOMIC)'
 # NODE is not a type but all node types can be retrieved with 
'kind.select(NODE,int_nil)'
-const ELEMENT          := chr(0);
-const TEXT             := chr(1);
-const COMMENT          := chr(2);
-const PI               := chr(3);
-const DOCUMENT         := chr(4);
-const COLLECTION       := chr(5);
[EMAIL PROTECTED] ws_kind_decl
+@:[EMAIL PROTECTED](ELEMENT,   0,,,,,,.chr())@
+@:[EMAIL PROTECTED](TEXT,      1,,,,,,.chr())@
+@:[EMAIL PROTECTED](COMMENT,   2,,,,,,.chr())@
+@:[EMAIL PROTECTED](PI,        3,,,,,,.chr())@
+@:[EMAIL PROTECTED](DOCUMENT,  4,,,,,,.chr())@
+@:[EMAIL PROTECTED](COLLECTION,5,,,,,,.chr())@
 
[EMAIL PROTECTED]
 # zero
 const WS               := [EMAIL PROTECTED]; # first container in ws = 
transient doc container
 const PRE_BASE         := [EMAIL PROTECTED]; # all our PRE bats start at 0 
(the super-root)
@@ -3345,42 +3348,14 @@
 @:ws(c_decl)@
 @:ws_decl(c)@
 
-#define SOAP_NS  "http://www.w3.org/2003/05/soap-envelope";
-#define XDT_NS   "http://www.w3.org/2005/xpath-datatypes";
-#define XS_NS    "http://www.w3.org/2001/XMLSchema";
-#define XSI_NS   "http://www.w3.org/2001/XMLSchema-instance";
-#define XRPC_NS  "http://monetdb.cwi.nl/XQuery";
-#define XRPC_LOC "http://monetdb.cwi.nl/XQuery/XRPC.xsd";
-
 #define XTRACT_KIND(X)     (X & 63)
 #define XTRACT_CONT(X)     (X >> 6)
 #define SET_CONT_KIND(X,Y) (X << 6 | Y)
 
-/* xquery_method : execute a loop-lifted xquery function
- *
- * argc          = #params
- * itercnt       = #iterations
- * argcnt[iter]  = #items per param 
- * argtpe[]      = xquery type of each parameter (as string, e.g. 'xs:integer')
- * argval[]      = str representation of item (e.g. '42')
- *
- * shredBAT   = optional shredded document table, that is added to working set
- *              params of type xs:anyNode are represented as int pre-numbers.
- *
- * we return an error string, or NULL iff everything went A-OK  
- */
-pathfinder_export char *
-xquery_method(stream *out, 
-              int timing,
-              char* moduleNS, 
-              char* uri, 
-              char *method, 
-              lng argc, 
-              lng itercnt, 
-              lng** argcnt, 
-              str* argtpe, 
-              str* argval, 
-              BAT* shredBAT);
+/* exports for XRPC */
+pathfinder_export void  xquery_client_engine(mapi_client*);
+pathfinder_export char* xquery_method(mapi_client*, int, char*, char*, char*, 
lng, lng, lng**, str*, str*, BAT*);
+pathfinder_export void  xquery_client_end(mapi_client *, char *); 
 
 #endif
 @c
@@ -4372,18 +4347,22 @@
  * =================== client session management 
================================
  *
  * xquery_client *
- * xquery_client_alloc(mapi_client *fc);
+ * xquery_client_alloc(mapi_client *mc);
  * - allocate a new xquery client, returns error message (NULL on success)
  *
  * char*
- * xquery_client_init(mapi_client *fc);
+ * xquery_client_init(mapi_client *mc);
  * - initialize a new xquery cache context, returns error string (NULL if ok)
  *
  * void
- * xquery_client_free(mapi_client *fc);
+ * xquery_client_free(mapi_client *mc);
  * - free a xquery cache context (terminate)
  *
  * void
+ * xquery_client_reset(xquery_client *ctx, char *err);
+ * - reset xquery execution for next statement
+ *
+ * void
  * xquery_client_end(xquery_client *ctx, char *err);
  * - end of xquery execution (struct stays alive for reuse). 
  */
@@ -4475,11 +4454,11 @@
  * allocate a new xquery client, returns client cntxt (0 on error)
  */
 static char* 
-xquery_client_alloc(mapi_client *fc)
+xquery_client_alloc(mapi_client *mc)
 {
-    xquery_client *ctx = xquery_client_new(fc->stk);
+    xquery_client *ctx = xquery_client_new(mc->stk);
 
-    fc->fc = ctx;
+    mc->fc = ctx;
     if (!ctx) {
         return "!ERROR: no space to allocate xquery client\n";
     } else {
@@ -4549,9 +4528,9 @@
 }
 
 static void 
-xquery_client_free(mapi_client *fc) 
+xquery_client_free(mapi_client *mc) 
 {
-    xquery_client_free_(fc->fc);
+    xquery_client_free_(mc->fc);
 }
 
 
@@ -4572,12 +4551,12 @@
 
 
 static char* 
-xquery_client_init(mapi_client *fc) 
+xquery_client_init(mapi_client *mc) 
 { 
-    xquery_client *ctx = fc->fc;
+    xquery_client *ctx = mc->fc;
     char* err = xquery_client_init_(ctx);
 
-    ctx->fderr = fc->c->fdout;
+    ctx->fderr = mc->c->fdout;
 
     if (err == 0 && (ctx->cacheid != xquery_cacheid || ctx->cachesize > 
xquery_client_bytes)) {
         /* re-initialize everything */
@@ -4585,7 +4564,7 @@
         err = xquery_client_alloc_(ctx);
     }
     if (err)
-        fprintf(stderr, "xquery_client_init: client %d %s\n", fc->stk, err);
+        fprintf(stderr, "xquery_client_init: client %d %s\n", mc->stk, err);
     return err;
 }
 
@@ -4595,7 +4574,7 @@
  * end of xquery execution (struct stays alive for reuse).
  */
 static void 
-xquery_client_end(xquery_client *ctx, char *err) 
+xquery_client_reset(xquery_client *ctx, char *err) 
 {
     oid zero = 0, one = 1;
 
@@ -4637,7 +4616,7 @@
     *ctx->shredBAT = int_nil; 
 
 
-    MT_set_lock(pf_cache_lock, "xquery_client_end");
+    MT_set_lock(pf_cache_lock, "xquery_client_reset");
     /* only deactivate the loaded modules */
     xquery_loaded_module *mod= ctx->loaded_modules; 
     while(mod) {
@@ -4645,7 +4624,7 @@
             mod->ns = NULL; 
             mod = mod->next;
     }
-    MT_unset_lock(pf_cache_lock, "xquery_client_end");
+    MT_unset_lock(pf_cache_lock, "xquery_client_reset");
 
     if (err) 
         fprintf(stderr, "xquery_server: client %d %s\n", ctx->stk, err);
@@ -5098,13 +5077,13 @@
     return NULL;
 }
 
-static void
-xquery_client_engine(mapi_client *fc )
+void
+xquery_client_engine(mapi_client *mc)
 {
     ssize_t n = 0;
     size_t curlen;
-    xquery_client *ctx = fc->fc;
-    stream *in = fc->c->fdin, *out = fc->c->fdout;
+    xquery_client *ctx = mc->fc;
+    stream *in = mc->c->fdin, *out = mc->c->fdout;
     char *p, *xquery, *err = (char*) -1, *msg = NULL;
 
     /* buf is the mode string that is available during execution in the MIL 
genType var */
@@ -5117,7 +5096,7 @@
     ctx->mode = XQ_MAPI;
     while (1) {
         /* use the MAPI protocol to read as much xquery buffer as possible */
-        if (!fc->c->blocked && 
+        if (!mc->c->blocked && 
                 stream_write(out, PROMPT1, sizeof(PROMPT1) - 1, 1) < 0) {
             msg = "could not write prompt"; 
             break;
@@ -5164,7 +5143,7 @@
                         p += sizeof("copy ")-1;
 
                         /* start shredder (first character should be skipped) 
*/
-                        if (!fc->c->blocked && 
+                        if (!mc->c->blocked && 
                                 stream_write(out, PROMPT1, sizeof(PROMPT1) - 
1, 1) < 0) {
                                 msg = "could not write prompt"; 
                                 break;
@@ -5212,7 +5191,7 @@
                 /* memory debugging */
                 xquery_prepared_function *fun = ctx->prepared_functions;
                 BAT *b = NULL;
-                view_client_size(&b, &fc->stk);
+                view_client_size(&b, &mc->stk);
                 if (b) {
                    while(fun) {
                       BUNins(b, fun->def->proc, &fun->def->size, FALSE);
@@ -5224,19 +5203,25 @@
             }
         }
         /* second and on queries also need a cleared context */
-        xquery_client_end(ctx, NULL); 
+        xquery_client_reset(ctx, NULL); 
     }
-    xquery_client_end(ctx, msg); 
+    xquery_client_end(mc, msg); 
+}
 
-    /* for some reason, mapi_client_end sets keep_open=1 so it continuously 
leaks streams */
-    stream_close(fc->c->fdin);
-    stream_destroy(fc->c->fdin);
-    fc->c->fdin = NULL;
-    stream_close(fc->c->fdout);
-    stream_destroy(fc->c->fdout);
-    fc->c->fdout = NULL;
-    monetSetChannel(THRget(THRgettid()), NULL, NULL);
+void 
+xquery_client_end(mapi_client *mc, char *err) 
+{ 
+    xquery_client *ctx = (xquery_client*) mc->fc;
+
+    xquery_client_reset(ctx, err); 
 
+    stream_close(mc->c->fdin);
+    stream_destroy(mc->c->fdin);
+    mc->c->fdin = NULL;
+    stream_close(mc->c->fdout);
+    stream_destroy(mc->c->fdout);
+    mc->c->fdout = NULL;
+    monetSetChannel(THRget(THRgettid()), NULL, NULL);
 }
 
 int
@@ -5346,12 +5331,22 @@
     return GDK_SUCCEED;
 }
 
-/*
- * call a method in a temporary xquery client context
- */  
+/* xquery_method : execute a loop-lifted xquery function
+ *
+ * argc          = #params
+ * itercnt       = #iterations
+ * argcnt[iter]  = #items per param
+ * argtpe[]      = xquery type of each parameter (as string, e.g. 'xs:integer')
+ * argval[]      = str representation of item (e.g. '42')
+ *
+ * shredBAT   = optional shredded document table, that is added to working set
+ *              params of type xs:anyNode are represented as int pre-numbers.
+ *
+ * we return an error string, or NULL iff everything went A-OK
+ */
 char*
-xquery_method(stream *out,
-              int timing,
+xquery_method(mapi_client *mc,
+              int flags,
               char* module,
               char* uri,
               char* method,
@@ -5362,38 +5357,24 @@
               str* argval,
               BAT* shredBAT)
 {
+    xquery_client *ctx = (xquery_client*) mc->fc;
     lng usec = GDKusec();
-    MT_Id XQthread_id = THRgettid(), old_tid;
-    Thread XQthread = THRget(XQthread_id), old_thread;
-    char *err, *buf, *ns = "fn", *mode = "xml-noheader-xrpc";
+    char *mode = (flags&2)?"timing-xml-noheader":"timing-xml-noheader-xrpc";
+    char *err, *buf, *ns = "fn";
     stream *s = NULL;
-    mapi_client *mc = MAPIclient(GDKin, out, "xquery" );
-    xquery_client *ctx;
-    Variable v;
-
-    if (mc == NULL || mc->fc == NULL)
-        return "xquery_method: out of client slots.\n";
 
-    ctx = mc->fc;
-    ctx->fderr = GDKerr;
     ctx->mode = 0;
-
     buf = GDKstrdup(module);
     (*ctx->moduleNS) = buf;
     buf = GDKstrdup(method);
     (*ctx->method) = buf;
 
-    if (timing){
-        mode = "timing-xml-noheader-xrpc";
+    if ((flags&1) == 0){
+        mode += 7; /* no "timing-" */
     }
     err = xquery_change_genType(ctx, mode);
     if (err) return err;
 
-    old_tid = mc->t;
-    old_thread = mc->thread;
-    mc->t = XQthread_id;
-    mc->thread = XQthread;
-    monetSetChannel(XQthread, GDKin, out);
     if (argc >= 1000) {
         /* hack: pass argc+1000 and you get debug output */
         s = open_wastream("/tmp/xrpc.mil");
@@ -5417,31 +5398,8 @@
     if (s) {
         stream_close(ctx->fderr);
         stream_destroy(ctx->fderr);
+        ctx->fderr = GDKerr;
     }
-    monetSetChannel(XQthread, GDKin, GDKout);
-
-    mc->t = old_tid;
-    mc->thread = old_thread;
-    xquery_client_end(ctx, NULL);
-
-    for (v = monet_cntxt[mc->stk].var; v && v != mc->lastvar; v = v->next) {
-        ATOMunfix(v->binding.vtype, VALptr(&v->binding));
-        VALclear(&v->binding); /* -> void-nil (plus clean-up/free) */
-        VALset(&v->binding, v->binding.vtype, ATOMnilptr(v->binding.vtype)); 
/* -> typed-nil */
-    }
-    /*
-    if (mc->c->fdout) {
-        stream_close(mc->c->fdout);
-        stream_destroy(mc->c->fdout);
-        mc->c->fdout = NULL;
-    }
-    if (mc->c->fdin) {
-        stream_close(mc->c->fdin);
-        stream_destroy(mc->c->fdin);
-        mc->c->fdin = NULL;
-    }
-    */
-    mc->inuse = 0;
     return err;
 }
 

Index: shttpd.h
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/shttpd.h,v
retrieving revision 1.6
retrieving revision 1.6.6.1
diff -u -d -r1.6 -r1.6.6.1
--- shttpd.h    4 May 2006 16:48:59 -0000       1.6
+++ shttpd.h    15 Feb 2007 01:46:13 -0000      1.6.6.1
@@ -167,8 +167,12 @@
 extern void shttpd_poll(shttpd_socket *ctx, unsigned milliseconds);
 extern const char *shttpd_get_var(struct conn *, const char *varname);
 extern int shttpd_template(struct conn *, const char *headers, const char 
*file, ...);
+
 extern char *shttpd_get_msg(struct shttpd_callback_arg *arg);
+extern char *shttpd_get_method(struct shttpd_callback_arg *arg);
+extern char *shttpd_get_uri(struct shttpd_callback_arg *arg);
 extern int shttpd_get_socket(struct shttpd_callback_arg *arg);
+extern void shttpd_finish(struct shttpd_callback_arg *arg);
 
 #ifdef MT
 extern int shttpd_printf(struct conn *, const char *fmt, ...);

Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.6.2.4
retrieving revision 1.6.2.5
diff -u -d -r1.6.2.4 -r1.6.2.5
--- xrpc_client.mx      8 Feb 2007 18:15:29 -0000       1.6.2.4
+++ xrpc_client.mx      15 Feb 2007 01:46:14 -0000      1.6.2.5
@@ -436,46 +436,9 @@
 #ifndef XRPC_CLIENT_H
 #define XRPC_CLIENT_H
 
-#include "pathfinder.h"
-
-#include <gdk.h>
-#include <stream.h>
-#include <time.h>
-
-#ifdef _WIN32   /* Windows specific */
-    #include <winsock.h>
-    #define snprintf _snprintf
-    #ifndef __MINGW32__
-        #pragma comment(lib, "ws2_32")
-    #endif
-    #define sleep(s) Sleep(1000*(s))
-#else           /* UNIX specific */
-    #include <sys/select.h>
-    #include <sys/types.h>     /* used by socket */
-    #include <sys/socket.h>
-    #include <unistd.h>
-    #include <netinet/in.h> /* hton and ntoh */
-    #include <arpa/inet.h>  /* dotted IP addr to and from 32-bits int */
-    #include <netdb.h>         /* convert domain names into IP addr */
-    #include <errno.h>
-    #include <ctype.h>
-#endif
-
-#include <stdio.h>
-#include <stdlib.h>
-
-#include "pf_support.h"
-#include "shredder.h"
-#include "serialize.h"
+#include "xrpc_server.h"
 
-#define PFTEXT                  1
-#define ELEMENT                 0
-#define MAX_NR_PARAMS           32
-#define HTTP_PORT               48080
-#define HTTPD_FUNC              "/xrpc"
-#define MIN_RESPONSE_SIZE       16
 #define MAX_BUF_SIZE            (1024*1024)
-#define MAX_POST_HEADER_SIZE    1024
 #define NR_RETRIES              3
 
 #endif /* XRPC_CLIENT_H */
@@ -575,7 +538,7 @@
     struct in_addr      addr;
     struct sockaddr_in  sockaddr;
     struct hostent     *resolv = NULL;
-    int i, ret, sock, port = HTTP_PORT;
+    int i, ret, sock, port = HTTPD_DEFAULT_PORT;
     str strptr = NULL;
 
     errno = 0;
@@ -801,12 +764,7 @@
         return GDK_FAIL;
     }
 
-    b->pos = snprintf(b->buf, b->len,
-            "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
-            "<env:Envelope xmlns:env=\"%s\" xmlns:xrpc=\"%s\" "
-            "xmlns:xs=\"%s\" xmlns:xsi=\"%s\" "
-            "xsi:schemaLocation=\"%s %s\"><env:Body><xrpc:request "
-            "xrpc:module=\"%s\" xrpc:location=\"%s\" xrpc:method=\"%s\">",
+    b->pos = snprintf(b->buf, b->len, XRPC_HEADER,
             SOAP_NS, XRPC_NS, XS_NS, XSI_NS, XRPC_NS, XRPC_LOC,
             rpc_module, rpc_uri, rpc_method);
 
@@ -1063,7 +1021,7 @@
         }
         str2buf(b, "</xrpc:call>");
     }
-    str2buf(b, "</xrpc:request></env:Body></env:Envelope>");
+    str2buf(b, XRPC_FOOTER);
     b->buf[b->pos] = 0;
     /* Stop timing Client Serialisation */
     time_clntSeria = GDKusec() - time_clntSeria;

Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.23.2.2
retrieving revision 1.23.2.3
diff -u -d -r1.23.2.2 -r1.23.2.3
--- serialize_dflt.mx   8 Feb 2007 18:15:28 -0000       1.23.2.2
+++ serialize_dflt.mx   15 Feb 2007 01:46:12 -0000      1.23.2.3
@@ -37,6 +37,7 @@
 
 #include "pf_config.h"
 #include "serialize.h"
+#include "xrpc_server.h"
 /* contains dummy callback functions */
 #include "serialize_null.h"
 

Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.7.2.5
retrieving revision 1.7.2.6
diff -u -d -r1.7.2.5 -r1.7.2.6
--- xrpc_server.mx      12 Feb 2007 22:51:42 -0000      1.7.2.5
+++ xrpc_server.mx      15 Feb 2007 01:46:14 -0000      1.7.2.6
@@ -43,6 +43,7 @@
 .COMMAND httpd_start(int port, str option) : void = CMDhttpd_start;
 "Start the HTTP server for RPC calls on the specified port."
 
+.EPILOGUE = xrpc_epilogue;
 .END xrpc_server;
 
 @mil
@@ -104,6 +105,7 @@
 
 #include <gdk.h>
 #include <stream.h>
+#include <mapi.h>
 #include <time.h>
 
 #ifdef _WIN32   /* Windows specific */
@@ -118,6 +120,9 @@
     #include <sys/types.h>     /* used by socket */
     #include <sys/socket.h>
     #include <unistd.h>
+    #include <netinet/in.h> /* hton and ntoh */
+    #include <arpa/inet.h>  /* dotted IP addr to and from 32-bits int */
+    #include <netdb.h>                /* convert domain names into IP addr */
     #include <errno.h>
     #include <ctype.h>
 #endif
@@ -125,43 +130,64 @@
 #include <stdio.h>
 #include <stdlib.h>
 
+#include "pathfinder.h"
 #include "pf_support.h"
 #include "shredder.h"
 #include "serialize.h"
 #include "shttpd.h"
 
-#define MAXPARAMS       32
-#define HTTPD_FUNC      "/xrpc"
-#define ERR404          "404 Bad Request"
-#define ERR500          "500 Internal Server Error"
-#define OUT_OF_MEM      "Internal Receiver Error: out-of memory"
-#define NOT_WELL_FORMED "Request XML message not well-formed"
-
-/* TODO: why don't we define this in pathfinder.mx??? */
-#define ELEMENT         0
-#define PFTEXT          1
-#define COMMENT         2
-#define PI              3
-#define DOCUMENT        4
-#define COLLECTION      5
+#define HTTPD_DEFAULT_PORT      48080
+#define HTTPD_FUNC              "/xrpc"
+#define ERR404                  "404 Bad Request"
+#define ERR500                  "500 Internal Server Error"
+#define OUT_OF_MEM              "Internal Receiver Error: out-of memory"
+#define NOT_WELL_FORMED         "Request XML message not well-formed"
+#define MAX_NR_PARAMS           32
 
-#define SOAP_NS  "http://www.w3.org/2003/05/soap-envelope";
-#define XRPC_NS  "http://monetdb.cwi.nl/XQuery";
-#define XRPC_LOC "http://monetdb.cwi.nl/XQuery/XRPC.xsd";
-#define XS_NS    "http://www.w3.org/2001/XMLSchema";
-#define XSI_NS   "http://www.w3.org/2001/XMLSchema-instance";
+#define SOAP_NS     "http://www.w3.org/2003/05/soap-envelope";
+#define XDT_NS      "http://www.w3.org/2005/xpath-datatypes";
+#define XS_NS       "http://www.w3.org/2001/XMLSchema";
+#define XSI_NS      "http://www.w3.org/2001/XMLSchema-instance";
+#define XRPC_NS     "http://monetdb.cwi.nl/XQuery";
+#define XRPC_LOC    "http://monetdb.cwi.nl/XQuery/XRPC.xsd";
 
-/* To prevent starting the HTTP server twice */
-int rpcd_running = 0;
-int timing = 0;
+#define XRPC_HEADER "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"\
+                    "<env:Envelope xmlns:env=\"%s\""\
+                                 " xmlns:xrpc=\"%s\""\
+                                 " xmlns:xs=\"%s\""\
+                                 " xmlns:xsi=\"%s\""\
+                                 " xsi:schemaLocation=\"%s %s\">"\
+                     "<env:Body>"\
+                      "<xrpc:request xrpc:module=\"%s\""\
+                               " xrpc:location=\"%s\""\
+                               " xrpc:method=\"%s\">"
+#define XRPC_HTTP_CALL "<xrpc:call>"\
+                        "<xrpc:sequence>"\
+                         "<xrpc:atomic-value 
xsi:type=\"xs:string\">%s</xrpc:atomic-value>"\
+                        "</xrpc:sequence>"\
+                       "</xrpc:call>"
+#define XRPC_PUT_CALL  "<xrpc:call>"\
+                        "<xrpc:sequence>"\
+                         "<xrpc:atomic-value 
xsi:type=\"xs:string\">%s</xrpc:atomic-value>"\
+                        "</xrpc:sequence>"\
+                        "<xrpc:sequence>"\
+                         "<xrpc:element>%s</xrpc:element>"\
+                        "</xrpc:sequence>"\
+                       "</xrpc:call>"
+#define XRPC_FOOTER   "</xrpc:request>"\
+                     "</env:Body>"\
+                    "</env:Envelope>"
 
 #endif /* XQUERY_RPC_H */
 
 @c
 #include "pf_config.h"
 #include "xrpc_server.h"
-/*    stream_close(s); stream_destroy(s);                             \
- *    */
+
+/* To prevent starting the HTTP server twice */
+int rpcd_running = 0;
+int timing = 0;
+
 
 #define clean_up(s, argcnt, argtpe, argval, iterc, nr_args) {       \
     lng i = 0;                                                      \
@@ -205,37 +231,52 @@
 static BAT*
 request2bat(struct shttpd_callback_arg *arg, stream *out)
 {
-    char *strptr = NULL, *req_msg = NULL;
+    char *strptr = NULL, *del = NULL, *req_msg = shttpd_get_msg(arg); 
     lng percentage = 0;
     BAT *shredBAT = NULL;
     bit verbose = FALSE;
 
-    /* Remove the first line of the message, which containing
-     * "<?xml...?>", so that the message we pass to CMDshred2bats starts
-     * directly with <env:Envelope ...> */
-    req_msg = shttpd_get_msg(arg);
-    strptr  = req_msg + 7; /* strlen("<?xml...?>") >= 7 */
-    if ( (strstr(req_msg, "<?xml") != req_msg) ||
-         (strptr = strchr(strptr, (int)'<')) == NULL ) {
-        send_err(out, 1, ERR404, "env:Sender", NOT_WELL_FORMED);
-        return NULL;
-    }
+    if (strcmp(shttpd_get_method(arg), "POST")) {
+        /* support GET, PUT and DEL by redirecting those to a predefined XRPC 
module */
+        char *docname = shttpd_get_uri(arg) + 6; /* skip "/xrpc/" prefix that 
directed us here */
+        char *body_format = XRPC_HTTP_CALL;
+        char *p, uri[128];
 
-    if(!(shredBAT = BATnew(TYPE_str, TYPE_bat, 32))){
-        send_err(out, 1, ERR500, "env:Receiver", OUT_OF_MEM);
-        return NULL;
-    }
+        size_t len = 2000 + strlen(docname);
+        if (strcmp(shttpd_get_method(arg), "PUT") == 0) len += strlen(req_msg);
 
-    if (CMDshred_str(shredBAT, strptr, &percentage, NULL, &verbose) == 
GDK_FAIL) {
-        if(BBPreclaim(shredBAT) == -1){
-            GDKerror("request2bat: failed to destroy shredBAT on error, "
-                    "because it is used by other process!");
-            GDKerror("THIS SHOULD NEVER HAPPEN!!!");
+        strptr = del = p = (char*) GDKmalloc(len);
+        if (del == NULL) {
+            send_err(out, 1, ERR500, "env:Receiver", OUT_OF_MEM);
+            return NULL;
+        }
+
+        /* the PUT, GET and DELETE methods are implemented in this module */
+        sprintf(uri, "http://localhost:%d/admin/http.xq";, rpcd_running);
+
+        /* create a fake XRPC message for  GET/PUT/DELETE (=method) that 
invoke xrpc:method(URI) */
+        p += sprintf(p, XRPC_HEADER, SOAP_NS, XRPC_NS, XS_NS, XSI_NS, XRPC_NS, 
XRPC_LOC,
+            "http", uri, shttpd_get_method(arg));
+        p += sprintf(p, body_format, docname, req_msg);   
+        strcpy(p, XRPC_FOOTER);
+    } else {
+        /* Remove the first line of the message, which containing
+         * "<?xml...?>", so that the message we pass to CMDshred2bats starts
+         * directly with <env:Envelope ...> */
+        strptr  = req_msg + 7; /* strlen("<?xml...?>") >= 7 */
+        if ((strstr(req_msg, "<?xml") != req_msg) || (strptr = strchr(strptr, 
(int)'<')) == NULL ) {
+            send_err(out, 1, ERR404, "env:Sender", NOT_WELL_FORMED);
+            return NULL;
         }
+    }
+    if (!(shredBAT = BATnew(TYPE_str, TYPE_bat, 32))) {
+        send_err(out, 1, ERR500, "env:Receiver", OUT_OF_MEM);
+    } else if (CMDshred_str(shredBAT, strptr, &percentage, NULL, &verbose) == 
GDK_FAIL) {
+        BBPreclaim(shredBAT);
+        shredBAT = NULL;
         send_err(out, 1, ERR404, "env:Sender", NOT_WELL_FORMED);
-        return NULL;
     }
-
+    if (del) GDKfree(del);
     return shredBAT;
 }
 
@@ -407,9 +448,8 @@
  * @return GDK_SUCCEED, or
  *         GDK_FAIL if an error has occurred.
  */
-static int
-handle_rpc_request(struct shttpd_callback_arg *arg)
-{
+static int 
+xrpc_handle_request(mapi_client *mc, struct shttpd_callback_arg *arg) {
     char errstr[1024], *module = NULL, *location = NULL, *method = NULL;
     int sock = -1, k = 0, simple_param = 1;
     stream *out = NULL;
@@ -445,12 +485,14 @@
     char level_diff = 0; /* indicates how many levels each pre_level
                             value of a node should be reduced. */
 
-    THRnew(MT_getpid(), "rpc_req_handler"); /* register this thread */
+    int serializeMode = timing; 
+    /* GET,PUT,DELETE HTTP requests get serializeMode|=2 to avoid XRPC 
serialization */ 
+    if (strcmp(shttpd_get_method(arg), "POST")) serializeMode |= 2; 
 
     /* Create our own output stream for further data transfer. */
     sock = shttpd_get_socket(arg);
     if (!(out = socket_wastream(sock, "rpc_response"))) {
-        GDKerror("handle_rpc_request: failed to create "
+        GDKerror("xrpc_handle_request: failed to create "
                 "socket_wastream for socket %d", sock);
         close(sock);
         return GDK_FAIL;
@@ -560,7 +602,7 @@
             argcnt[i][j] = 0;
     }
 
-    max_args = iterc * MAXPARAMS;
+    max_args = iterc * MAX_NR_PARAMS;
     argval = GDKmalloc(max_args * sizeof(char *));
     argtpe = GDKmalloc(max_args * sizeof(char *));
     if (!argval || !argtpe) {
@@ -597,7 +639,7 @@
 
                 if (nr_args == max_args) {
                     snprintf(errstr, 1024, "XRPC request: too many "
-                            "parameters, maximum is %d", MAXPARAMS);
+                            "parameters, maximum is %d", MAX_NR_PARAMS);
                     send_err(out, 1, ERR404, "env:Sender", errstr);
                     clean_up(out,argcnt,argtpe,argval,iterc,nr_args);
                     return GDK_FAIL;
@@ -629,7 +671,7 @@
 
                     val_node_pre = tpe_node_pre + 1;
                     if( (tpe_node_size != 1) || 
-                        (pre_kindT[val_node_pre] != PFTEXT) ) {
+                        (pre_kindT[val_node_pre] != TEXT) ) {
                         snprintf(errstr, 1024, "XRPC request: "
                                 "iteration%lld/parameter%lld/value%d "
                                 "of type \"%s\" is expected to have a "
@@ -687,7 +729,7 @@
                                 pre_kindT);
                     } else if (strcmp(argtpe[nr_args], "xs:text") == 0) {
                         val_node_pre = get_elem_pre(tpe_node_pre+1,
-                                (tpe_node_pre+tpe_node_size), PFTEXT,
+                                (tpe_node_pre+tpe_node_size), TEXT,
                                 pre_kindT);
                     } else if (strcmp(argtpe[nr_args], "xs:comment") == 0) {
                         val_node_pre = get_elem_pre(tpe_node_pre+1,
@@ -757,7 +799,7 @@
     stream_printf(out, "HTTP/1.1 200 OK\r\n"
             "Content-Type: application/soap+xml; "
             "charset=\"utf-8\"\r\n\r\n");
-    char *err = xquery_method(out, timing, module, location, method,
+    char *err = xquery_method(mc, serializeMode, module, location, method,
             argc, iterc, argcnt, argtpe, argval,
             simple_param?NULL:shredBAT);
 
@@ -777,26 +819,71 @@
     }
 
     clean_up(out, argcnt, argtpe, argval, iterc, nr_args);
-    if(BBPreclaim(shredBAT) == -1){
-        GDKerror("handle_rpc_request: failed to destroy BAT \"shredBAT\", 
because it is in use by other process");
-        GDKerror("THIS SHOULD NEVER HAPPEN!!!");
-        return GDK_FAIL;
-    }
-
+    BBPreclaim(shredBAT);
     return GDK_SUCCEED;
 }
 
+
+/*
+ * xrpc MAPI client handler (overrides the xquery_client_engine)
+ */
+static void 
+xrpc_client_engine(mapi_client *mc) {
+    struct shttpd_callback_arg *arg = (struct shttpd_callback_arg *) mc->arg;
+
+    /* do the work */
+    (void) xrpc_handle_request(mc, arg);
+
+    /* clean up */
+    mc->engine = xquery_client_engine; 
+    xquery_client_end(mc, NULL);
+    shttpd_finish(arg); 
+}
+
+
+/*
+ * handle request asynchronously using a MAPI xquery client
+ */
+static int
+xrpc_fork_mapiclient(struct shttpd_callback_arg *arg) {
+    /* get me a MAPI thread from the xquery client pool */
+    int sock = shttpd_get_socket(arg);
+    stream *fdin = socket_rastream(sock, "XRPC read");
+    if (fdin && stream_errnr(fdin) == 0) {
+        stream *fdout = socket_wastream(sock, "XRPC write");
+        if (fdout && stream_errnr(fdout) == 0) {
+            mapi_client *mc = MAPIclient(fdin, fdout, "xquery");
+            if (mc) {
+                mc->engine = xrpc_client_engine; /* override 
xquery_client_engine (will be restored later) */ 
+                mc->arg = (char*) arg; /* HACK! pass xrpc arg */
+                MT_up_sema(mc->s, "XRPC"); /* activate the thread */
+                return 0;
+            }
+            stream_close(fdout);
+            stream_destroy(fdout);
+        }
+        stream_close(fdin);
+        stream_destroy(fdin);
+    }
+    shttpd_finish(arg);
+    return -1;
+}
+
 int
 CMDhttpd_start(int* port, str option)
 {
     shttpd_socket ctx;
+    char *s = "/usr/share/", docdir[1024]; /* httpd serves out 
/usr/share/MonetDB/xrpc/ */
+    BUN p = BUNfnd(GDKenv, "datadir"); /* normally use 'datadir' iso 
/usr/share  (often datdir = prefix/share) */
+    if (p) s = BUNtail(GDKenv,p); 
+    snprintf(docdir, 1024, "%s%cMonetDB%cxrpc%c", s, DIR_SEP, DIR_SEP, 
DIR_SEP);
        
     if (rpcd_running) {
         GDKerror("CMDhttpd_start: RPC receiver already running\n");
         return GDK_FAIL;
     }
 
-    rpcd_running = 1;
+    rpcd_running = *port;
 
     if (option && strstr(option, "timing") != NULL)
         timing = 1;
@@ -804,9 +891,10 @@
     /* Initialize with specific config file, pass NULL to use default
      * values */
     shttpd_init(NULL);
+    shttpd_setopt("document_root", docdir);
 
     /* Register call back function */
-    shttpd_register_url(HTTPD_FUNC, &handle_rpc_request, NULL);
+    shttpd_register_url(HTTPD_FUNC, xrpc_fork_mapiclient, NULL);
 
     /* Open listening socket */
     ctx = shttpd_open_port(*port);
@@ -817,10 +905,6 @@
     return GDK_SUCCEED;
 }
 
-bat* xrpc_prelude() {
-    return NULL;        /* Nothing to do here. */
-}
-
 void xrpc_epilogue() {
     shttpd_fini();      /* Shut down the HTTP server. */
     rpcd_running = 0;   /* Stop RPC server */


-------------------------------------------------------------------------
Take Surveys. Earn Cash. Influence the Future of IT
Join SourceForge.net's Techsay panel and you'll get the chance to share your
opinions on IT & business topics through brief surveys-and earn cash
http://www.techsay.com/default.php?page=join.php&p=sourceforge&CID=DEVDEV
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to