Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv13386/runtime

Modified Files:
      Tag: xrpcdemo
        pathfinder.mx serialize.mx serialize_dflt.mx xrpc_client.mx 
        xrpc_server.mx 
Log Message:
ok, some things start to work now regarding
- read-only repeatable read
- XRPC message tracing

not fully tested yet, though..



U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.45.4.8
retrieving revision 1.45.4.9
diff -u -d -r1.45.4.8 -r1.45.4.9
--- xrpc_client.mx      7 Jun 2008 00:38:28 -0000       1.45.4.8
+++ xrpc_client.mx      8 Jun 2008 01:02:38 -0000       1.45.4.9
@@ -920,14 +920,17 @@
 
     for (my_iter = 0; my_iter < iterc; my_iter++) {
         if (arity == 0) {
-            assert(stream_write(bs, "<xrpc:call/>", 1, 12) == 12);
+            ret = stream_write(bs, "<xrpc:call/>", 1, 12);
+            if (ret != 12) return clean_up(bs, argcnt, iterc);
             continue;
         }
 
-        assert(stream_write(bs, "<xrpc:call>", 1, 11) == 11); /* start an 
iteration */
+        ret = stream_write(bs, "<xrpc:call>", 1, 11); /* start an iteration */
+        if (ret != 11) return clean_up(bs, argcnt, iterc);
         for (my_argc = 0; my_argc < arity; my_argc++) {
             if (argcnt[my_iter][my_argc] == 0) {
-                assert(stream_write(bs, "<xrpc:sequence/>", 1, 16) == 16);
+                ret = stream_write(bs, "<xrpc:sequence/>", 1, 16);
+                if (ret != 16) return clean_up(bs, argcnt, iterc);
                 continue;
             }
             arg_offset = 0;
@@ -943,7 +946,8 @@
             /* now 'arg_offset' contains the start position of the
              * values of my_iter and my_argc in the fun_* BATs */
 
-            assert(stream_write(bs, "<xrpc:sequence>", 1, 15) == 15); /* start 
a parameter */
+            ret = stream_write(bs, "<xrpc:sequence>", 1, 15); /* start a 
parameter */
+            if (ret != 15) return clean_up(bs, argcnt, iterc);
             int k;
             chr elem_kind = 0;
             oid item = 0;
@@ -961,32 +965,32 @@
                         ret = stream_printf(bs,
                                 "<xrpc:atomic-value 
xsi:type=\"xs:boolean\">%s</xrpc:atomic-value>",
                                 intVals[item] == TRUE ? "true" : "false");
-                        assert(ret > 0);
+                        if (ret <= 0) return clean_up(bs, argcnt, iterc);
                         break;
                     case INT:
                         ret = stream_printf(bs,
                                 "<xrpc:atomic-value 
xsi:type=\"xs:integer\">%lld</xrpc:atomic-value>",
                                 intVals[item]);
-                        assert(ret > 0);
+                        if (ret <= 0) return clean_up(bs, argcnt, iterc);
                         break;
                     case DEC:
                         ret = stream_printf(bs,
                                 "<xrpc:atomic-value 
xsi:type=\"xs:decimal\">%f</xrpc:atomic-value>",
                                 dblVals[item]);
-                        assert(ret > 0);
+                        if (ret <= 0) return clean_up(bs, argcnt, iterc);
                         break;
                     case DBL:
                         ret = stream_printf(bs,
                                 "<xrpc:atomic-value 
xsi:type=\"xs:double\">%f</xrpc:atomic-value>",
                                 dblVals[item]);
-                        assert(ret > 0);
+                        if (ret <= 0) return clean_up(bs, argcnt, iterc);
                         break;
                     case STR:
                     case U_A:
                         ret = stream_printf(bs,
                                 "<xrpc:atomic-value 
xsi:type=\"xs:string\">%s</xrpc:atomic-value>",
                                 (char*)BUNtail(strValsi, item));
-                        assert(ret > 0);
+                        if (ret <= 0) return clean_up(bs, argcnt, iterc);
                         break;
                     case ATTR:
                     {
@@ -1015,13 +1019,14 @@
                         a = *(oid*)BUNtail(attr_propi, i);
                         char *attr_val = (char*)BUNtail(prop_vali, a);
 
-                        assert(stream_write(bs, "<xrpc:attribute ", 1, 16) == 
16);
+                        ret = stream_write(bs, "<xrpc:attribute ", 1, 16);
+                        if (ret != 16) return clean_up(bs, argcnt, iterc);
                         if (prefix && *prefix){
                             ret = stream_printf(bs, "%s:", prefix);
-                            assert(ret > 0);
+                            if (ret <= 0) return clean_up(bs, argcnt, iterc);
                         }
                         ret = stream_printf(bs, "%s=\"%s\"/>", loc, attr_val);
-                        assert(ret > 0);
+                        if (ret <= 0) return clean_up(bs, argcnt, iterc);
                         break;
                     }
                     case ELEM:
@@ -1032,15 +1037,20 @@
                             return clean_up(bs, argcnt, iterc);
                         }
                         switch(elem_kind) {
-                            case 0: assert(stream_write(bs, "<xrpc:element>", 
1, 14) == 14);
+                            case 0: ret = stream_write(bs, "<xrpc:element>", 
1, 14);
+                                    if (ret != 14) return clean_up(bs, argcnt, 
iterc);
                                     break;
-                            case 1: assert(stream_write(bs, "<xrpc:text>", 1, 
11) == 11);
+                            case 1: ret = stream_write(bs, "<xrpc:text>", 1, 
11);
+                                    if (ret != 11) return clean_up(bs, argcnt, 
iterc);
                                     break;
-                            case 2: assert(stream_write(bs, "<xrpc:comment>", 
1, 14) == 14);
+                            case 2: ret = stream_write(bs, "<xrpc:comment>", 
1, 14);
+                                    if (ret != 14) return clean_up(bs, argcnt, 
iterc);
                                     break;
-                            case 3: assert(stream_write(bs, 
"<xrpc:processing-instruction>", 1, 29) == 29);
+                            case 3: ret = stream_write(bs, 
"<xrpc:processing-instruction>", 1, 29);
+                                    if (ret != 29) return clean_up(bs, argcnt, 
iterc);
                                     break;
-                            case 4: assert(stream_write(bs, "<xrpc:document>", 
1, 15) == 15);
+                            case 4: ret = stream_write(bs, "<xrpc:document>", 
1, 15);
+                                    if (ret != 15) return clean_up(bs, argcnt, 
iterc);
                                     break;
                             case 5: /* COLLECTION: nothing to be done, here */ 
  break;
                             default:
@@ -1076,15 +1086,20 @@
                         }
 
                         switch(elem_kind) {
-                            case 0: assert(stream_write(bs, "</xrpc:element>", 
1, 15) == 15);
+                            case 0: ret = stream_write(bs, "</xrpc:element>", 
1, 15);
+                                    if (ret != 15) return clean_up(bs, argcnt, 
iterc);
                                     break;
-                            case 1: assert(stream_write(bs, "</xrpc:text>", 1, 
12) == 12);
+                            case 1: ret = stream_write(bs, "</xrpc:text>", 1, 
12);
+                                    if (ret != 12) return clean_up(bs, argcnt, 
iterc);
                                     break;
-                            case 2: assert(stream_write(bs, "</xrpc:comment>", 
1, 15) == 15);
+                            case 2: ret = stream_write(bs, "</xrpc:comment>", 
1, 15);
+                                    if (ret != 15) return clean_up(bs, argcnt, 
iterc);
                                     break;
-                            case 3: assert(stream_write(bs, 
"</xrpc:processing-instruction>", 1, 30) == 30);
+                            case 3: ret = stream_write(bs, 
"</xrpc:processing-instruction>", 1, 30);
+                                    if (ret != 30) return clean_up(bs, argcnt, 
iterc);
                                     break;
-                            case 4: assert(stream_write(bs, 
"</xrpc:document>", 1, 16) == 16);
+                            case 4: ret = stream_write(bs, "</xrpc:document>", 
1, 16);
+                                    if (ret != 16) return clean_up(bs, argcnt, 
iterc);
                                     break;
                             case 5: /* COLLECTION: nothing to be done, here */ 
   break;
                             default:
@@ -1098,11 +1113,14 @@
                         return clean_up(bs, argcnt, iterc);
                 }
             }
-            assert(stream_write(bs, "</xrpc:sequence>", 1, 16) == 16);
+            ret = stream_write(bs, "</xrpc:sequence>", 1, 16);
+            if (ret != 16) return clean_up(bs, argcnt, iterc);
         }
-        assert(stream_write(bs, "</xrpc:call>", 1, 12) == 12);
+        ret = stream_write(bs, "</xrpc:call>", 1, 12);
+        if (ret != 12) return clean_up(bs, argcnt, iterc);
     }
-    assert(stream_write(bs, "</xrpc:request></env:Body></env:Envelope>\n", 1, 
42) == 42);
+    ret = stream_write(bs, "</xrpc:request></env:Body></env:Envelope>\n", 1, 
42);
+    if (ret != 42) return clean_up(bs, argcnt, iterc);
     /* Stop timing Client Serialisation */
     time_xrpcClntSeria = GDKusec() - time_xrpcClntSeria;
     if (timing) {

U serialize_dflt.mx
Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.46.4.6
retrieving revision 1.46.4.7
diff -u -d -r1.46.4.6 -r1.46.4.7
--- serialize_dflt.mx   7 Jun 2008 00:38:27 -0000       1.46.4.6
+++ serialize_dflt.mx   8 Jun 2008 01:02:38 -0000       1.46.4.7
@@ -1001,7 +1001,7 @@
         return PROBLEM;
     }
     len = strlen(HTTP_200_OK);
-    ret = stream_write(ctx->out, HTTP_200_OK, 1, len);
+    ret = stream_write(GDKout, HTTP_200_OK, 1, len);
     if(ret != len) return PROBLEM;
 
     len = strlen(SOAP_ENVELOPE);
@@ -1023,10 +1023,8 @@
         /* XRPC participants lists for nested transactions - append an extra 
item for this call */
         if(ret >= 0)
             ret = stream_printf(ctx->out, 
-                           "<xrpc:participants>\n  <xrpc:participant>%s,%s," 
-                           LLFMT ",%s:%d," LLFMT "</xrpc:participant>\n",
-                           ctx->xrpc_caller, xrpc_hostname, xrpc_port, 
ctx->xrpc_seqnr, 
-                           ctx->xrpc_method, time_exec);
+                "<xrpc:participants>\n  
<xrpc:participant>%s,%s:%d:"LLFMT",%s,"LLFMT"</xrpc:participant>\n",
+                ctx->xrpc_caller, xrpc_hostname, xrpc_port, ctx->xrpc_seqnr, 
ctx->xrpc_method, time_exec);
 
         /* print all nested calls (obtained from response messages)  */
         bi = bat_iterator(b);

U serialize.mx
Index: serialize.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize.mx,v
retrieving revision 1.109.4.2
retrieving revision 1.109.4.3
diff -u -d -r1.109.4.2 -r1.109.4.3
--- serialize.mx        5 Jun 2008 12:55:20 -0000       1.109.4.2
+++ serialize.mx        8 Jun 2008 01:02:37 -0000       1.109.4.3
@@ -300,6 +300,8 @@
     lng                *xrpc_timeout,
     lng                *xrpc_start);
 
+stream* xquery_print_trace(str msg, lng seqnr);
+
 #endif /* SERIALIZE_H */
 @c
 
@@ -1971,6 +1973,25 @@
     return GDK_SUCCEED;
 }
 
+stream*
+xquery_print_trace(str msg, lng xrpc_seqnr) {
+    char logdir[1024], logfile[1024], *docroot = GDKgetenv("datadir");
+    stream* fp;
+    if (docroot) {
+        snprintf(logdir, 1024, "%s%cMonetDB%cxrpc%clogs%c", docroot, DIR_SEP, 
DIR_SEP, DIR_SEP, DIR_SEP);
+        snprintf(logfile, 1024, "%s%s_" LLFMT ".xml", logdir, msg, xrpc_seqnr);
+        GDKcreatedir(logdir);
+        fp = open_wastream(logfile);
+    }
+    if (!fp || stream_errnr(fp)) {
+        GDKerror("print_result: could not open logfile %s for writing\n", 
logfile);
+        if (fp) stream_destroy(fp);
+        return NULL;
+    }
+    return fp;
+}
+
+
 int 
 xquery_print_result_DRIVER (
     str                 mode,
@@ -1996,20 +2017,15 @@
     lng                *xrpc_start)
 {
     stream *fp = GDKout;
+    int ret, trace = xrpc_method && *xrpc_method && xrpc_mode && 
strstr(xrpc_mode, "trace");
     (void) driverFun;
     (void) driverArg;
-    if (xrpc_mode && strstr(xrpc_mode, "trace")) {
-        char logfile[1024], *docroot = GDKgetenv("xrpc_http_docroot");
-        snprintf(logfile, 1024, "%s%clogs%cres_" LLFMT ".xml", docroot, 
DIR_SEP, DIR_SEP, *xrpc_seqnr);
-        stream *logstream = open_wstream(logfile);
-        fp = open_teestream(fp, logstream);
-        if (!logstream || !fp) {
-            GDKerror("print_result: could not open logfile %s for writing\n", 
logfile);
-            if (logstream) stream_destroy(logstream);
-            return GDK_FAIL;
-        }
+    if (trace) {
+        stream* logstream = xquery_print_trace("res",*xrpc_seqnr);
+        fp = logstream?open_teestream(fp, logstream):NULL;
+        if (fp == NULL) return GDK_FAIL;
     }
-    return xquery_print_result_driver (
+    ret = xquery_print_result_driver (
                fp,
                mode,
                driverFun, /* set of printing callback function */
@@ -2031,6 +2047,12 @@
                xrpc_seqnr, 
                xrpc_timeout, 
                xrpc_start);
+  
+     if (trace) {
+         stream_close(fp);
+         stream_destroy(fp);
+     }
+     return ret; 
 }
 
 #define FNDEEPEQTRACE 0

U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.4.8
retrieving revision 1.68.4.9
diff -u -d -r1.68.4.8 -r1.68.4.9
--- xrpc_server.mx      7 Jun 2008 00:38:29 -0000       1.68.4.8
+++ xrpc_server.mx      8 Jun 2008 01:02:39 -0000       1.68.4.9
@@ -257,6 +257,7 @@
 typedef struct {
     char *qid;
     char *caller;
+    lng seqnr;
     lng start;
     lng timeout;
     char *mode;
@@ -291,7 +292,7 @@
 xrpc_server_export BAT *xrpc_timeouts;
 xrpc_server_export BAT *xrpc_statuses;
 xrpc_server_export BAT *xrpc_locks;
-xrpc_server_export BAT *xrpc_wsids;
+xrpc_server_export BAT *xrpc_wsbats;
 xrpc_server_export BAT *xrpc_trusted;
 xrpc_server_export BAT *xrpc_admin;
 
@@ -361,6 +362,7 @@
     }
 }
 
+lng xrpc_reqnr = 1000000;
 
 XRPCreq_t *
 XRPCreq_new(
@@ -380,6 +382,7 @@
     XRPCreq_t *req = (XRPCreq_t*) GDKmalloc(sizeof(XRPCreq_t));
     if(!req) return NULL;
 
+    req->seqnr = 0;
     req->qid = qid;
     req->caller = caller;
     req->timeout = timeout;
@@ -534,13 +537,8 @@
             len_x += strlen(prefix) + 4;
         }
         err = alloca(len + len_x);
-        if (err) {
-            len += len_x;
-        } else {
-            len_x = 0;
-            err = alloca(len);
-        }
         assert(err);
+        len += len_x;
         pos += snprintf(err+pos, len-pos, "%s", msg);
         if (!len_x) {
             pos += snprintf(err+pos, len-pos, " '%s'.", location);
@@ -1025,7 +1023,7 @@
      */
     *errbuf = 0;
     GDKsetbuf(errbuf);
-    char *err = xquery_method(mc, flags, req->mode, req->module, 
+    char *err = xquery_method(mc, flags, &(req->seqnr), req->mode, 
req->module, 
             req->location, req->method, req->qid, req->caller, req->timeout,
             req->argc, req->iterc, req->argcnt, req->argtpe, req->argval, 
             req->hasNodeParam?shredBAT:NULL);
@@ -1069,23 +1067,35 @@
     BAT *shredBAT;
     lng time_xrpcServDeSeria;
     int flags = timing|debug; 
+    char *msg;
 
     time_xrpcServDeSeria = GDKusec();
-    if(!(shredBAT = request2bat(mc->c->fdout, shttpd_get_msg(arg))))
+    if(!(shredBAT = request2bat(mc->c->fdout, msg=shttpd_get_msg(arg))))
         return GDK_FAIL;
 
     if(!(req = parse_request(mc->c->fdout, shredBAT, 0))) {
         BBPreclaim(shredBAT);
         return GDK_FAIL;
     }
+
     time_xrpcServDeSeria = GDKusec() - time_xrpcServDeSeria;
-    
+
     /* Execute the query and send XRPC response */
     if(execQuery(mc, flags, req, shredBAT) == GDK_FAIL) {
         BBPreclaim(shredBAT);
         XRPCreq_free(req);
         return GDK_FAIL;
     }
+    if (req->mode && strstr(req->mode,"trace")) {
+        stream *logstream = xquery_print_trace("req", req->seqnr);
+        if (logstream == NULL) return GDK_FAIL;
+        if (stream_errnr(logstream) || stream_write(logstream, msg, 1, 
strlen(msg)) <= 0) {
+            stream_destroy(logstream);
+            return GDK_FAIL;
+        }
+        stream_close(logstream);
+        stream_destroy(logstream);
+    }
 
     if (timing) {
         fprintf(stdout,
@@ -1368,7 +1378,7 @@
     BATseqbase([EMAIL PROTECTED], 1);
     BBPrename([EMAIL PROTECTED]>batCacheid, "[EMAIL PROTECTED]");
 @c
-BAT *xrpc_qids = NULL, *xrpc_statuses = NULL, *xrpc_timeouts = NULL, 
*xrpc_locks = NULL, *xrpc_wsids = NULL;
+BAT *xrpc_qids = NULL, *xrpc_statuses = NULL, *xrpc_timeouts = NULL, 
*xrpc_locks = NULL, *xrpc_wsbats = NULL;
 BAT *xrpc_trusted = NULL, *xrpc_admin = NULL, *xrpc_user = NULL;
 
 bat* xrpc_prelude(void) {
@@ -1376,7 +1386,7 @@
     @:xrpc_bat(statuses,str)@
     @:xrpc_bat(timeouts,lng)@
     @:xrpc_bat(locks,lock)@
-    @:xrpc_bat(wsids,lng)@
+    @:xrpc_bat(wsbats,bat)@
     @:xrpc_bat(trusted,str)@
     @:xrpc_bat(admin,str)@
     @:xrpc_bat(user,str)@

U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.10
retrieving revision 1.416.2.1.2.11
diff -u -d -r1.416.2.1.2.10 -r1.416.2.1.2.11
--- pathfinder.mx       7 Jun 2008 00:38:26 -0000       1.416.2.1.2.10
+++ pathfinder.mx       8 Jun 2008 01:02:34 -0000       1.416.2.1.2.11
@@ -509,27 +509,27 @@
 # meta-data BATs for multi-request XRPC transactions
 var xrpc_qids     := bat("xrpc_qids");      # bat[void,str] query-id 
(host|timestamp)
 var xrpc_timeouts := bat("xrpc_timeouts");  # bat[void,lng] query timeout in 
usec
-var xrpc_wsids    := bat("xrpc_wsids");     # bat[void,lng] ID of the 
workingset associated with this query
+var xrpc_wsbats   := bat("xrpc_wsbats");    # bat[void,lng] ID of the 
workingset associated with this query
 var xrpc_locks    := bat("xrpc_locks");     # bat[void,lock] lock for the 
workingset of this query 
 var xrpc_statuses := bat("xrpc_statuses");  # bat[void,str] 2PC status of this 
query: "exec", "wait", "prepare", "commit", "abort" or "timeout"
 var xrpc_lock     := pflock_get(6); # master XRPC lock, to protect above 
xrpc_* BATs
 var xrpc_hostport := my_hostname() + ":" + str(get_xrpc_port());
-var xrpc_querynr  := 9999999LL;
+var xrpc_querynr  := 999999LL;
 
 # abort a 2PC transaction, keeping the record (note ws should have been 
destroyed already)
 PROC _ws_xrpc_abort(oid idx, str status) : void
 {
     var wslock := xrpc_locks.find(idx);
-    var ws := bat(str(xrpc_wsids.find(idx)));
+    var ws := xrpc_wsbats.find(idx);
     ws_destroy(ws);
     xrpc_statuses.inplace(idx, status);
-    xrpc_wsids.inplace(idx, lng_nil);
+    xrpc_wsbats.inplace(idx, bat("xrpc_statuses"));
     xrpc_locks.inplace(idx, lock_nil);
     lock_destroy(wslock);
 }
 
 # end a 2PC request. On success, unlock ws and keep waiting for more. On 
failure, abort it.
-PROC _ws_xrpc_end(lng wsid, str errmsg) : void
+PROC _ws_xrpc_end(str xrpc_qid, str errmsg) : void
 {
     var idx := reverse(xrpc_qids).find(xrpc_qid);
     if (isnil(errmsg)) { 
@@ -545,7 +545,7 @@
 {
     var lim      := usec();
     var timeout  := 
mirror(xrpc_statuses.ord_select("wait")).leftfetchjoin(xrpc_timeouts).ord_uselect(0LL,lim).project("timeout");
-    var aborted  := 
mirror(xrpc_wsids.ord_uselect(lng_nil,lng_nil)).leftfetchjoin(xrpc_statuses).ord_select("abort");
+    var aborted  := 
mirror(xrpc_locks.ord_uselect(lock_nil,lock_nil)).leftfetchjoin(xrpc_statuses).ord_select("abort");
 
     kunion(aborted,timeout)@batloop() {
        _ws_xrpc_abort($h,$t);
@@ -555,7 +555,7 @@
         # pruned xrpc requests should all have destroyed wslock and ws (we let 
them leak here)
         var relevant_qids     := relevant.leftfetchjoin(xrpc_qids);      
xrpc_qids.delete().append(relevant_qids);
         var relevant_timeouts := relevant.leftfetchjoin(xrpc_qtimeouts); 
xrpc_timeouts.delete().append(relevant_timeouts);
-        var relevant_wsids    := relevant.leftfetchjoin(xrpc_wsids);     
xrpc_wsids.delete().append(relevant_wsids);
+        var relevant_wsbats   := relevant.leftfetchjoin(xrpc_wsbats);    
xrpc_wsbats.delete().append(relevant_wsbats);
         var relevant_locks    := relevant.leftfetchjoin(xrpc_locks);     
xrpc_locks.delete().append(relevant_locks);
         var relevant_statuses := relevant.leftfetchjoin(xrpc_statuses);  
xrpc_statuses.delete().append(relevant_statuses);
     }
@@ -563,7 +563,7 @@
 
 PROC xrpc_status() : void {
     lock_set(xrpc_lock);
-    var err := 
CATCH(print(xrpc_qids,xrpc_statuses,xrpc_timeouts,xrpc_locks,xrpc_wsids));
+    var err := 
CATCH(print(xrpc_qids,xrpc_statuses,xrpc_timeouts,xrpc_locks,xrpc_wsbats));
     lock_unset(xrpc_lock);
     if (not(isnil(err))) GDKerror(err);
 }
@@ -1120,7 +1120,7 @@
 bat[void,bat]. ws-IDs are globally and temporally unique, i.e. 
 we can talk about a ws-ID even after the transaction has finished.
 
-The collection pins record which transactions (wsids) are using 
+The collection pins record which transactions (wsbats) are using 
 which bats. This information is deleted only at the end of
 the transaction. 
 
@@ -1461,6 +1461,7 @@
 {
     var xrpc := 0;
     xrpc_seqnr := (xrpc_querynr :+= 1LL);
+    xrpc_coord := false;
     if (xrpc_mode.search("repeatable") >= 0) {
         xrpc := -(int(count(xrpc_qids))+1); # default: create a new ws
         if (xrpc_qid = "") {
@@ -1478,7 +1479,7 @@
 {
     var ws;
     if (xrpc > 0) {
-        ws := bat(str(xrpc_wsids.find(idx)));
+        ws := xrpc_wsbats.find(idx);
         xrpc_statuses.inplace(idx,"exec");
     } else {
         var id := and(lng(newoid(1)), 2147483647LL);
@@ -1489,7 +1490,7 @@
             var wslock := lock_create();
             xrpc_qids.append(xrpc_qid);
             xrpc_timeouts.append(usec() + *(1000LL, xrpc_timeout));
-            xrpc_wsids.append(wsid);
+            xrpc_wsbats.append(ws);
             xrpc_locks.append(wslock);
             xrpc_statuses.append("exec");
         }
@@ -1503,7 +1504,12 @@
 var xrpc_seqnr := 1LL;
 var xrpc_timeout := 30000LL;
 var xrpc_mode := "repeatable";
-var ws, wsid, xrpc, idx; ws := _ws_new(xrpc := _ws_xrpcget(), idx := 
oid(abs(xrpc)), update); wsid := ws_id(ws);
+var xrpc_coord := false;
+var ws, wsid, xrpc, idx; 
+xrpc := _ws_xrpcget();
+idx := oid(abs(xrpc));
+ws := _ws_new(xrpc, idx, update);
+wsid := ws_id(ws);
 
 PROC ws_end(BAT[void,bat] ws, str err) : int 
 {
@@ -1514,7 +1520,7 @@
         return 1;
     }
     lock_set(xrpc_lock);
-    CATCH(_ws_xrpc_end(ws_log_wsid, err));
+    CATCH(_ws_xrpc_end(xrpc_qid, err));
     lock_unset(xrpc_lock);
     return 2; # do not auto-restart 2PC transactions ever
 }
@@ -4187,7 +4193,7 @@
 
 /* exports for XRPC */
 pathfinder_export void  xquery_client_engine(mapi_client*);
-pathfinder_export char* xquery_method(mapi_client*, int, char*, char*, char*, 
char*, char*, char*, lng, lng, lng, lng**, int*, str*, BAT*);
+pathfinder_export char* xquery_method(mapi_client*, int, lng*, char*, char*, 
char*, char*, char*, char*, lng, lng, lng, lng**, int*, str*, BAT*);
 pathfinder_export void  xquery_client_end(mapi_client *, char *); 
 pathfinder_export char* xquery_parse_val(int, char*, BAT*, BAT*, BAT* , BAT* , 
BAT* , char*, oid);
 
@@ -4402,6 +4408,7 @@
     char **xrpc_method;
     char **xrpc_mode;
     lng   *xrpc_timeout;
+    lng   *xrpc_seqnr;
     char **xrpc_qid;
     char **xrpc_caller;
 
@@ -4696,8 +4703,8 @@
  * - resolve a method call in the current xquery context (return NULL if not 
resolved)
  *
  * char*
- * xquery_function_call(xquery_client *ctx, lng usec, char *ns, char *module, 
char *method, 
- *                      char* qid, char* caller, lng timeout, char* mode, 
+ * xquery_function_call(xquery_client *ctx, lng usec, lng *seqnr, char *ns, 
char *module, char *method, 
+ *                      lng seqnr, char* qid, char* caller, lng timeout, char* 
mode, 
  *                      int argc, int itercnt, int** argcnt, int* argtpe, 
char** argval, BAT *shredBAT)
  * - call a function ns:method(). try to use the function cache (ie re-use a 
cached MIL tree).
  *   otherwise generate MIL yourself, interpret it (and cache it). Returns 
error string (NULL if ok).
@@ -4912,6 +4919,7 @@
                      char *ns, 
                      char *module, 
                      char *method, 
+                     lng *seqnr, 
                      char *qid,
                      char *caller,
                      lng timeout,
@@ -5064,6 +5072,7 @@
 
     /* Done preparing the query. Time to (re-)execute the MIL tree */
     if (xquery_tree_exec(ctx, prepfun->lt, 1)) {
+        *seqnr = (*ctx->xrpc_seqnr);
         return NULL;
     }
     return xquery_function_error;
@@ -5329,17 +5338,20 @@
         return "xquery_client_alloc: failed to execute init script.\n"; 
 
     @:find_var(genType,str,sval)@
+
     @:find_var(xrpc_shredBAT,int,ival)@
-    @:find_var(xrpc_method,str,sval)@
     @:find_var(xrpc_module,str,sval)@
+    @:find_var(xrpc_method,str,sval)@
+    @:find_var(xrpc_caller,str,sval)@
+    @:find_var(xrpc_qid,str,sval)@
     @:find_var(xrpc_mode,str,sval)@
+    @:find_var(xrpc_seqnr,lng,lval)@
     @:find_var(xrpc_timeout,lng,lval)@
-    @:find_var(xrpc_qid,str,sval)@
-    @:find_var(xrpc_caller,str,sval)@
-    @:find_var(time_compile,lng,lval)@
+
     @:find_var(time_exec,lng,lval)@
     @:find_var(time_print,lng,lval)@
     @:find_var(time_shred,lng,lval)@
+    @:find_var(time_compile,lng,lval)@
 
     @:find_bat(proc_vid)@
     @:find_bat(var_usage)@
@@ -5526,7 +5538,12 @@
     BATclear(ctx->str_values);
     BUNappend(ctx->str_values, "", FALSE);
     *ctx->xrpc_shredBAT = int_nil; 
-
+    (*ctx->xrpc_qid)[0] = 0;
+    (*ctx->xrpc_caller)[0] = 0;
+    (*ctx->xrpc_module)[0] = 0;
+    (*ctx->xrpc_method)[0] = 0;
+    (*ctx->xrpc_mode)[0] = 0;
+    *ctx->xrpc_timeout = 30000LL;
 
     MT_set_lock(pf_cache_lock, "xquery_client_reset");
     /* only deactivate the loaded modules */
@@ -5934,7 +5951,7 @@
                     lng* cnt_ptr = cnt;
                     char nsbak = *nsend, locbak = *locend;
                     *nsend = 0; *locend = 0;
-                    err = xquery_function_call(ctx, usec, ns, url1, loc, "", 
"", 0LL, "", argc, 1, &cnt_ptr, tpe, param, NULL);
+                    err = xquery_function_call(ctx, usec, ns, url1, loc, 0LL, 
"", "", 0LL, "", argc, 1, &cnt_ptr, tpe, param, NULL);
                     *nsend = nsbak; *locend = locbak;
                 }
             }
@@ -6285,6 +6302,7 @@
 char*
 xquery_method(mapi_client *mc,
               int flags,
+              lng *seqnr,
               char* mode,
               char* module,
               char* location,
@@ -6325,7 +6343,7 @@
     }
 
     if (err == NULL) { 
-        err = xquery_function_call(ctx, usec, ns, module, method, qid, caller, 
timeout, mode, argc, itercnt, argcnt, argtpe, argval, shredBAT);
+        err = xquery_function_call(ctx, usec, ns, module, method, seqnr, qid, 
caller, timeout, mode, argc, itercnt, argcnt, argtpe, argval, shredBAT);
         if (err == (char*) -1) err = "xquery_method: function could not be 
resolved.\n";
         else if (err == xquery_function_error) err = 
xquery_nondescriptive_error;
     }


-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to