Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv15778/runtime

Modified Files:
      Tag: xrpcdemo
        pathfinder.mx serialize_dflt.mx xrpc_client.mx xrpc_server.mx 
Log Message:
- more bug fixes. repeatable read does not work yet 


U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.45.4.7
retrieving revision 1.45.4.8
diff -u -d -r1.45.4.7 -r1.45.4.8
--- xrpc_client.mx      6 Jun 2008 14:39:53 -0000       1.45.4.7
+++ xrpc_client.mx      7 Jun 2008 00:38:28 -0000       1.45.4.8
@@ -102,7 +102,7 @@
     var rpcerr_lock := lock_create(); # guards rpc_errors
     var rpc_errors := ""; # holds errors from each destination
     var rpcres_lock := lock_create(); # guards rpc_results and rpc_iter
-    var rpc_results := bat(str,oid);
+    var rpc_results := bat(str,bat);
     var rpc_iter := bat(void,bat).seqbase([EMAIL PROTECTED]);
     var off := count(ws.fetch(CONT_NAME));
 
@@ -110,7 +110,7 @@
     var wslock;
     if (xrpc_qid != "") {  
         lock_set(xrpc_lock);
-        var err := CATCH(wslock := 
xrpc_locks.fetch(reverse(xrpc_wsids).find(xrpc_id)));
+        var err := CATCH(wslock := 
xrpc_locks.fetch(reverse(xrpc_qids).find(xrpc_qid)));
         lock_unset(xrpc_lock);
         if (not(isnil(err))) ERROR(err);
         lock_unset(wslock);
@@ -145,7 +145,7 @@
         }
 
         var local_name := "rpc_res_00" + str(int($h)+off);
-        var rpc_res, timeout := max(0LL,xrpc_timeout - ((usec() - time_start) 
/ 1000));
+        var rpc_res, timeout := max(0LL,xrpc_timeout - ((usec() - 
time_start)/1000));
         var rpc_err := CATCH(rpc_res := http_post(genType, xrpc_mode, 
xrpc_qid, xrpc_seqnr, timeout, $t, modname, location, method,
                     updCall, arity, itercnt, ws, fun_vid, fun_iter, fun_item,
                     fun_kind, int_values, dbl_values, dec_values, str_values));
@@ -202,7 +202,7 @@
     # retrieve results for this destination, and map the results back to
     # the original iteration number
     var proc_res := [get_rpc_res](
-            [ws_opencoll](const ws, rpc_results.tmark([EMAIL PROTECTED]), 
local_name, TEMP_DOC),
+            [ws_opencoll](const ws, rpc_results.tmark([EMAIL PROTECTED]), 
rpc_results.hmark([EMAIL PROTECTED]), TEMP_DOC),
             const ws, const int_values, const dbl_values, const str_values);
 
     res_iter := proc_res.[fetch](0).[leftfetchjoin](rpc_iter);
@@ -228,7 +228,6 @@
         printf("XRPC_Client_DeSerialisation (get_rpc_res):    %lld microsec\n",
                 time_xrpcClntDeSeria);
     }
-
     return res_bats;
 }
 ADDHELP("doLoopLiftedRPC", "zhang", "April 2006",
@@ -257,7 +256,7 @@
     var wslock;
     if (xrpc_qid != "") {  
         lock_set(xrpc_lock);
-        var err := CATCH(wslock := 
xrpc_locks.fetch(reverse(xrpc_wsids).find(xrpc_id)));
+        var err := CATCH(wslock := 
xrpc_locks.fetch(reverse(xrpc_qids).find(xrpc_qid)));
         lock_unset(xrpc_lock);
         if (not(isnil(err))) ERROR(err);
     }
@@ -369,11 +368,11 @@
         BAT[void, str] str_values) : BAT[void,bat]
 {
     if (search(xrpc_mode,"iterative") >= 0) 
-        doIterativeRPC(modname, location, method, updCall, arity, niters,
-                        ws, dsts, fun_vid, fun_iter, fun_item, fun_kind, 
int_values, dbl_values, dec_values, str_values);
+        return doIterativeRPC(modname, location, method, updCall, arity, 
niters,
+                              ws, dsts, fun_vid, fun_iter, fun_item, fun_kind, 
int_values, dbl_values, dec_values, str_values);
     else
-        doLoopLiftedRPC(modname, location, method, updCall, arity, niters,
-                        ws, dsts, fun_vid, fun_iter, fun_item, fun_kind, 
int_values, dbl_values, dec_values, str_values);
+        return doLoopLiftedRPC(modname, location, method, updCall, arity, 
niters,
+                               ws, dsts, fun_vid, fun_iter, fun_item, 
fun_kind, int_values, dbl_values, dec_values, str_values);
 }
 
 
@@ -414,9 +413,14 @@
         if (shredBAT) {
             XRPCreq_t *req;
             /* create a list of all main document bats (as parse_message likes 
it that way) */
-            for(i=0; i < ATTR_OWN; i++)
-                BUNappend(shredBAT, BUNtail(wi, *cont), FALSE);
-
+            for(i=0; i <= ATTR_PROP; i++) {
+                BAT *b = BATdescriptor(*(bat*) BUNtail(wi, i));
+                if (b) {
+                    BATiter bi = bat_iterator(b);
+                    BUNappend(shredBAT, BUNtail(bi, *cont), FALSE);
+                    BBPunfix(b->batCacheid);
+                }
+            }
             req = xrpc_parse_message(NULL, shredBAT, participants, FALSE);
             if (req) { 
                 BAT *res_iter = BATnew(TYPE_void, TYPE_oid, req->nr_args);
@@ -905,7 +909,7 @@
         return clean_up(bs, argcnt, iterc);
 
     if(qid && *qid) { /* an XRPC query that requires 2PC */
-        ret = stream_printf(bs, "<env:Header" XRPC_WS_QID "</env:Header" , 
qid, timeout);
+        ret = stream_printf(bs, "<env:Header>" XRPC_WS_QID "</env:Header>" , 
qid, timeout);
         if (ret < 0) 
             return clean_up(bs, argcnt, iterc);
     }

U serialize_dflt.mx
Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.46.4.5
retrieving revision 1.46.4.6
diff -u -d -r1.46.4.5 -r1.46.4.6
--- serialize_dflt.mx   6 Jun 2008 14:39:52 -0000       1.46.4.5
+++ serialize_dflt.mx   7 Jun 2008 00:38:27 -0000       1.46.4.6
@@ -1017,11 +1017,11 @@
         if (b == NULL) return PROBLEM;
 
         ret = stream_printf(ctx->out, "<env:Header>");
-        if (ret == 0 && ctx->xrpc_qid[0])
+        if (ret >= 0 && ctx->xrpc_qid[0])
             ret = stream_printf(ctx->out, XRPC_WS_QID, ctx->xrpc_qid, 
ctx->xrpc_timeout);
 
         /* XRPC participants lists for nested transactions - append an extra 
item for this call */
-        if(ret == 0)
+        if(ret >= 0)
             ret = stream_printf(ctx->out, 
                            "<xrpc:participants>\n  <xrpc:participant>%s,%s," 
                            LLFMT ",%s:%d," LLFMT "</xrpc:participant>\n",
@@ -1034,7 +1034,7 @@
             if (ret < 0) break;
             ret = stream_printf(ctx->out, "  
<xrpc:participant>%s</xrpc:participant>\n", BUNtail(bi,p));
         }
-        if(ret == 0) 
+        if (ret >= 0) 
             ret = stream_printf(ctx->out, 
"</xrpc:participants>\n</env:Header>\n");
 
         BBPunfix(bid);

U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.9
retrieving revision 1.416.2.1.2.10
diff -u -d -r1.416.2.1.2.9 -r1.416.2.1.2.10
--- pathfinder.mx       6 Jun 2008 14:39:51 -0000       1.416.2.1.2.9
+++ pathfinder.mx       7 Jun 2008 00:38:26 -0000       1.416.2.1.2.10
@@ -520,19 +520,21 @@
 PROC _ws_xrpc_abort(oid idx, str status) : void
 {
     var wslock := xrpc_locks.find(idx);
+    var ws := bat(str(xrpc_wsids.find(idx)));
+    ws_destroy(ws);
     xrpc_statuses.inplace(idx, status);
     xrpc_wsids.inplace(idx, lng_nil);
-    xrpc_wslocks.inplace(idx, lock_nil);
+    xrpc_locks.inplace(idx, lock_nil);
     lock_destroy(wslock);
 }
 
 # end a 2PC request. On success, unlock ws and keep waiting for more. On 
failure, abort it.
 PROC _ws_xrpc_end(lng wsid, str errmsg) : void
 {
-    var idx := xrpc_qids.find(xrpc_qid);
+    var idx := reverse(xrpc_qids).find(xrpc_qid);
     if (isnil(errmsg)) { 
         xrpc_statuses.inplace(idx, "wait");
-        lock_unset(xrpc_wslocks.find(idx));
+        lock_unset(xrpc_locks.find(idx));
     } else {
         _ws_xrpc_abort(idx, "abort");
     }
@@ -546,12 +548,10 @@
     var aborted  := 
mirror(xrpc_wsids.ord_uselect(lng_nil,lng_nil)).leftfetchjoin(xrpc_statuses).ord_select("abort");
 
     kunion(aborted,timeout)@batloop() {
-       var ws := bat(str(xrpc_wsids.find($h)));
-       ws_destroy(ws);
        _ws_xrpc_abort($h,$t);
     }
-    var relevant := xrpc_timeout.ord_uselect(lim - 
3600000000LL,lng_nil).hmark([EMAIL PROTECTED]);
-    if (count(relevant) < count(xrpc_timeout)) {
+    var relevant := xrpc_timeouts.ord_uselect(lim - 
3600000000LL,lng_nil).hmark([EMAIL PROTECTED]);
+    if (count(relevant) < count(xrpc_timeouts)) {
         # pruned xrpc requests should all have destroyed wslock and ws (we let 
them leak here)
         var relevant_qids     := relevant.leftfetchjoin(xrpc_qids);      
xrpc_qids.delete().append(relevant_qids);
         var relevant_timeouts := relevant.leftfetchjoin(xrpc_qtimeouts); 
xrpc_timeouts.delete().append(relevant_timeouts);
@@ -1465,9 +1465,9 @@
         xrpc := -(int(count(xrpc_qids))+1); # default: create a new ws
         if (xrpc_qid = "") {
             xrpc_qid := xrpc_hostport + ":" + str(xrpc_seqnr); #init
-            xpc_coord := true;
+            xrpc_coord := true;
         } else if (xrpc_qids.texist(xrpc_qid)) {
-            xrpc := int(reverse(xrpc_qids).find(qid)); # reuse an old ws
+            xrpc := int(reverse(xrpc_qids).find(xrpc_qid)); # reuse an old ws
         }
     }
     return xrpc;
@@ -1478,7 +1478,7 @@
 {
     var ws;
     if (xrpc > 0) {
-        ws := bat(xrpc_wsids.find(idx));
+        ws := bat(str(xrpc_wsids.find(idx)));
         xrpc_statuses.inplace(idx,"exec");
     } else {
         var id := and(lng(newoid(1)), 2147483647LL);
@@ -1488,7 +1488,7 @@
         if (xrpc < 0) {
             var wslock := lock_create();
             xrpc_qids.append(xrpc_qid);
-            xrpc_timeouts.append(usec() + *(1000LL * xrpc_timeout));
+            xrpc_timeouts.append(usec() + *(1000LL, xrpc_timeout));
             xrpc_wsids.append(wsid);
             xrpc_locks.append(wslock);
             xrpc_statuses.append("exec");
@@ -1499,19 +1499,22 @@
 
    
 var update := 0;
-var xrpc_qid := "";
-var xrpc_seqnr := "";
-var xrpc_mode := "";
+var xrpc_qid := "1";
+var xrpc_seqnr := 1LL;
+var xrpc_timeout := 30000LL;
+var xrpc_mode := "repeatable";
 var ws, wsid, xrpc, idx; ws := _ws_new(xrpc := _ws_xrpcget(), idx := 
oid(abs(xrpc)), update); wsid := ws_id(ws);
 
 PROC ws_end(BAT[void,bat] ws, str err) : int 
 {
     ws_log_wsid := ws_id(ws);
     if (not(isnil(err))) ws_log(ws, err);
-    ws_destroy(ws);
-    if (xrpc_qid = "") return 1;
+    if (xrpc_qid = "") {
+        ws_destroy(ws);
+        return 1;
+    }
     lock_set(xrpc_lock);
-    CATCH(_ws_xrpc_end(wsid, err));
+    CATCH(_ws_xrpc_end(ws_log_wsid, err));
     lock_unset(xrpc_lock);
     return 2; # do not auto-restart 2PC transactions ever
 }

U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.4.7
retrieving revision 1.68.4.8
diff -u -d -r1.68.4.7 -r1.68.4.8
--- xrpc_server.mx      6 Jun 2008 14:39:54 -0000       1.68.4.7
+++ xrpc_server.mx      7 Jun 2008 00:38:29 -0000       1.68.4.8
@@ -211,7 +211,7 @@
       "<wscoor:CoordinationContext xmlns:wscoor=\""WSCOOR_NS"\""\
                  "env:mustUnderstand=\"true\">"\
         "<wscoor:Identifier>%s</wscoor:Identifier>"\
-        "<wscoor:Expires>%s</wscoor:Expires>"\
+        "<wscoor:Expires>"LLFMT"</wscoor:Expires>"\
         "<wscoor:CoordinationType>" WSAT_NS "</wscoor:CoordinationType>"\
       "</wscoor:CoordinationContext>"
 
@@ -764,9 +764,9 @@
     if (iterc == -1) {
         /* if no iterc was found in the request (and always for XRPC 
responses), just count sequence elements */
         BAT *qn_seq = BATselect(qn_uri_loc, XRPC_NS"|sequence", 
XRPC_NS"|sequence");
-        elt_qn = get_elt_qn(pre_prop, qn_seq);
+        elt_qn = get_elt_qn(pre_kind, pre_prop);
         if (elt_qn && qn_seq) {
-            BAT* elt_seq = BATmirror(BATsemijoin(BATmirror(elt_qn),  elt_seq));
+            BAT* elt_seq = BATmirror(BATsemijoin(BATmirror(elt_qn), qn_seq));
             if (elt_seq) {
                 iterc = BATcount(elt_seq);
                 BBPreclaim(elt_seq);
@@ -778,7 +778,7 @@
         /* for XRPC response messages we add all participants to the 
participant bat */
         BAT *qn_part = BATselect(qn_uri_loc, XRPC_NS"|participant", 
XRPC_NS"|participant");
         if (qn_part) {
-            BAT *elt_part = BATmirror(BATsemijoin(BATmirror(elt_qn),  
qn_part));
+            BAT *elt_part = BATmirror(BATsemijoin(BATmirror(elt_qn),qn_part));
             if (elt_part) {
                 BATiter pi = bat_iterator(elt_part);
                 BUN p,q;
@@ -1005,7 +1005,7 @@
           BAT *shredBAT)
 {
     int ret = GDK_SUCCEED;
-       char errbuf[GDKMAXERRLEN], errbuf_all[GDKMAXERRLEN*2], *errbuf_bak = 
GDKerrbuf;
+    char errbuf[GDKMAXERRLEN], errbuf_all[GDKMAXERRLEN*2], *errbuf_bak = 
GDKerrbuf;
 
     /* Possible values of flags:
      * 0: xml-noheader-xrpc
@@ -1023,8 +1023,8 @@
      *    Get generated MIL code in "/tmp/xrpc.mil", hence, this option
      *    is not portable.
      */
-       *errbuf = 0;
-       GDKsetbuf(errbuf);
+    *errbuf = 0;
+    GDKsetbuf(errbuf);
     char *err = xquery_method(mc, flags, req->mode, req->module, 
             req->location, req->method, req->qid, req->caller, req->timeout,
             req->argc, req->iterc, req->argcnt, req->argtpe, req->argval, 


-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to