Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv30982/runtime

Modified Files:
      Tag: xrpcdemo
        pathfinder.mx serialize_dflt.mx xrpc_client.mx xrpc_server.mx 
Log Message:
another premature checkin... it does not work fully yet.. just to share 



U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.45.4.6
retrieving revision 1.45.4.7
diff -u -d -r1.45.4.6 -r1.45.4.7
--- xrpc_client.mx      5 Jun 2008 21:17:54 -0000       1.45.4.6
+++ xrpc_client.mx      6 Jun 2008 14:39:53 -0000       1.45.4.7
@@ -47,8 +47,8 @@
         str genType,
         str mode,
         str qid,
-        lng timeout,
         lng seqnr,
+        lng timeout,
         str dst,
         str module,
         str location,
@@ -145,7 +145,7 @@
         }
 
         var local_name := "rpc_res_00" + str(int($h)+off);
-        var rpc_res, timeout := max(0LL,xrpc_timeout - (usec() - timer_start));
+        var rpc_res, timeout := max(0LL,xrpc_timeout - ((usec() - time_start) 
/ 1000));
         var rpc_err := CATCH(rpc_res := http_post(genType, xrpc_mode, 
xrpc_qid, xrpc_seqnr, timeout, $t, modname, location, method,
                     updCall, arity, itercnt, ws, fun_vid, fun_iter, fun_item,
                     fun_kind, int_values, dbl_values, dec_values, str_values));
@@ -166,21 +166,13 @@
             }
         } else {
             printf("doLoopLiftedRPC: error occurred at %s:\n%s\n", $t, 
rpc_err);
-            if(xrpc_qid != "") {
-                # abort a 2PC-XRPC query on error
-                lock_set(xrpc_lock);
-                CATCH({xrpc_statuses.replace(idx, "abort");});
-                lock_unset(xrpc_lock);
-                # TODO: stop immediately further execution and clean up
-            } else {
-                # A none-isolation XRPC query: we do not want to discard
-                # results from other destinations where executions might
-                # have succeeded, so we only print a message by error,
-                # iso. terminate the execution with 'ERROR'
-                lock_set(rpcerr_lock);
-                CATCH({ rpc_errors := rpc_errors + $t + ": " + rpc_err + "\n"; 
});
-                lock_unset(rpcerr_lock);
-            }
+            # A none-isolation XRPC query: we do not want to discard
+            # results from other destinations where executions might
+            # have succeeded, so we only print a message by error,
+            # iso. terminate the execution with 'ERROR'
+            lock_set(rpcerr_lock);
+            CATCH({ rpc_errors := rpc_errors + $t + ": " + rpc_err + "\n"; });
+            lock_unset(rpcerr_lock);
         }
     }
     if (xrpc_qid != "") lock_set(wslock);
@@ -204,7 +196,7 @@
     if(count(rpc_results) = 0) {
         ERROR("doLoopLiftedRPC: execution failed at all destinations:\n%s\n", 
rpc_errors);
     } else if(rpc_errors != "") {
-        printf("doLoopLiftedRPC: execution failed at some 
destinations:\n%s\n", rpc_errors);
+        ERROR("doLoopLiftedRPC: execution failed at some destinations:\n%s\n", 
rpc_errors);
     }
 
     # retrieve results for this destination, and map the results back to
@@ -290,7 +282,7 @@
 
         var local_name := "rpc_res_00" + str(h+1);
         if (xrpc_qid != "") lock_unset(wslock);
-        var rpc_res, timeout := max(0LL,xrpc_timeout - (usec() - timer_start));
+        var rpc_res, timeout := max(0LL,xrpc_timeout - ((usec() - 
time_start)/1000));
         var rpc_err := CATCH(rpc_res := http_post(genType, xrpc_mode, 
xrpc_qid, xrpc_seqnr, timeout, $t, modname, location, method,
                     updCall, arity, lng(1), ws, cur_fun_vid, cur_fun_iter, 
cur_fun_item,
                     cur_fun_kind, int_values, dbl_values, dec_values, 
str_values));
@@ -380,7 +372,7 @@
         doIterativeRPC(modname, location, method, updCall, arity, niters,
                         ws, dsts, fun_vid, fun_iter, fun_item, fun_kind, 
int_values, dbl_values, dec_values, str_values);
     else
-        doLoopliftedRPC(modname, location, method, updCall, arity, niters,
+        doLoopLiftedRPC(modname, location, method, updCall, arity, niters,
                         ws, dsts, fun_vid, fun_iter, fun_item, fun_kind, 
int_values, dbl_values, dec_values, str_values);
 }
 
@@ -814,7 +806,7 @@
         str genType,
         str rpc_mode,
         str qid,
-        str seqnr,
+        lng seqnr,
         lng timeout,
         str rpc_module,
         str rpc_uri,
@@ -918,7 +910,7 @@
             return clean_up(bs, argcnt, iterc);
     }
     ret = stream_printf(bs, XRPC_REQ_HEADER, rpc_module, rpc_uri, rpc_method, 
-                            arity, iterc, seqnr, rpc_mode, 
updCall?"true":"false");
+                            arity, iterc, xrpc_hostname, xrpc_port, seqnr, 
rpc_mode, updCall?"true":"false");
     if (ret < 0) 
         return clean_up(bs, argcnt, iterc);
 
@@ -1124,7 +1116,7 @@
         str genType,
         str rpc_mode,
         str qid,
-        str seqnr,
+        lng *seqnr,
         lng *timeout,
         str dst,
         str rpc_module,
@@ -1170,7 +1162,7 @@
         return GDK_FAIL;
     }
 
-    if(!(req = byvalue_request(timing, genType, rpc_mode, qid, seqnr, 
*timeout, rpc_module, rpc_uri, rpc_method,
+    if(!(req = byvalue_request(timing, genType, rpc_mode, qid, *seqnr, 
*timeout, rpc_module, rpc_uri, rpc_method,
                     *updCall, *arity, *itercnt, ws, fun_vid, fun_iter,
                     fun_item, fun_kind, int_values, dbl_values,
                     dec_values, str_values))){

U serialize_dflt.mx
Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.46.4.4
retrieving revision 1.46.4.5
diff -u -d -r1.46.4.4 -r1.46.4.5
--- serialize_dflt.mx   5 Jun 2008 21:17:53 -0000       1.46.4.4
+++ serialize_dflt.mx   6 Jun 2008 14:39:52 -0000       1.46.4.5
@@ -994,7 +994,7 @@
 xrpc_startSerialize(XqueryCtx *ctx)
 {
     int len, ret = 0;
-    lng time_exec = GDKusec() - ctx->xrpc_start;
+    lng time_exec = (GDKusec() - ctx->xrpc_start) / 1000;
    
     if (ctx->xrpc_qid[0] && time_exec > ctx->xrpc_timeout) {
         GDKerror("xrpc_startSerialize: xrpc-query timed out\n");
@@ -1041,7 +1041,7 @@
         if (ret < 0) return PROBLEM;
     }
     ret = stream_printf(ctx->out, XRPC_RES_HEADER, ctx->xrpc_module, 
ctx->xrpc_method);
-    return ret?PROBLEM:SUCCESS;
+    return (ret < 0)?PROBLEM:SUCCESS;
 }
 
 /**

U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.8
retrieving revision 1.416.2.1.2.9
diff -u -d -r1.416.2.1.2.8 -r1.416.2.1.2.9
--- pathfinder.mx       5 Jun 2008 21:17:50 -0000       1.416.2.1.2.8
+++ pathfinder.mx       6 Jun 2008 14:39:51 -0000       1.416.2.1.2.9
@@ -511,11 +511,64 @@
 var xrpc_timeouts := bat("xrpc_timeouts");  # bat[void,lng] query timeout in 
usec
 var xrpc_wsids    := bat("xrpc_wsids");     # bat[void,lng] ID of the 
workingset associated with this query
 var xrpc_locks    := bat("xrpc_locks");     # bat[void,lock] lock for the 
workingset of this query 
-var xrpc_statuses := bat("xrpc_statuses");  # bat[void,str] 2PC status of this 
query: "exec", "prepare", "commit" or "abort"
+var xrpc_statuses := bat("xrpc_statuses");  # bat[void,str] 2PC status of this 
query: "exec", "wait", "prepare", "commit", "abort" or "timeout"
 var xrpc_lock     := pflock_get(6); # master XRPC lock, to protect above 
xrpc_* BATs
 var xrpc_hostport := my_hostname() + ":" + str(get_xrpc_port());
 var xrpc_querynr  := 9999999LL;
 
+# abort a 2PC transaction, keeping the record (note ws should have been 
destroyed already)
+PROC _ws_xrpc_abort(oid idx, str status) : void
+{
+    var wslock := xrpc_locks.find(idx);
+    xrpc_statuses.inplace(idx, status);
+    xrpc_wsids.inplace(idx, lng_nil);
+    xrpc_wslocks.inplace(idx, lock_nil);
+    lock_destroy(wslock);
+}
+
+# end a 2PC request. On success, unlock ws and keep waiting for more. On 
failure, abort it.
+PROC _ws_xrpc_end(lng wsid, str errmsg) : void
+{
+    var idx := xrpc_qids.find(xrpc_qid);
+    if (isnil(errmsg)) { 
+        xrpc_statuses.inplace(idx, "wait");
+        lock_unset(xrpc_wslocks.find(idx));
+    } else {
+        _ws_xrpc_abort(idx, "abort");
+    }
+}
+
+# background check to time-out waiting 2PC transactions, and prune them fully 
after an hour
+PROC _ws_xrpc_prune() : void
+{
+    var lim      := usec();
+    var timeout  := 
mirror(xrpc_statuses.ord_select("wait")).leftfetchjoin(xrpc_timeouts).ord_uselect(0LL,lim).project("timeout");
+    var aborted  := 
mirror(xrpc_wsids.ord_uselect(lng_nil,lng_nil)).leftfetchjoin(xrpc_statuses).ord_select("abort");
+
+    kunion(aborted,timeout)@batloop() {
+       var ws := bat(str(xrpc_wsids.find($h)));
+       ws_destroy(ws);
+       _ws_xrpc_abort($h,$t);
+    }
+    var relevant := xrpc_timeout.ord_uselect(lim - 
3600000000LL,lng_nil).hmark([EMAIL PROTECTED]);
+    if (count(relevant) < count(xrpc_timeout)) {
+        # pruned xrpc requests should all have destroyed wslock and ws (we let 
them leak here)
+        var relevant_qids     := relevant.leftfetchjoin(xrpc_qids);      
xrpc_qids.delete().append(relevant_qids);
+        var relevant_timeouts := relevant.leftfetchjoin(xrpc_qtimeouts); 
xrpc_timeouts.delete().append(relevant_timeouts);
+        var relevant_wsids    := relevant.leftfetchjoin(xrpc_wsids);     
xrpc_wsids.delete().append(relevant_wsids);
+        var relevant_locks    := relevant.leftfetchjoin(xrpc_locks);     
xrpc_locks.delete().append(relevant_locks);
+        var relevant_statuses := relevant.leftfetchjoin(xrpc_statuses);  
xrpc_statuses.delete().append(relevant_statuses);
+    }
+}
+
+PROC xrpc_status() : void {
+    lock_set(xrpc_lock);
+    var err := 
CATCH(print(xrpc_qids,xrpc_statuses,xrpc_timeouts,xrpc_locks,xrpc_wsids));
+    lock_unset(xrpc_lock);
+    if (not(isnil(err))) GDKerror(err);
+}
+
+
 # transaction debugging / performance profiling 
 var ws_log_lock := lock_create();
 var ws_log := bat(lng,str,1000000).rename("ws_log");
@@ -732,21 +785,7 @@
 
         # remove working sets of non-busy XRPC requests that timed out or 
aborted
         lock_set(xrpc_lock);
-        var timedout := project(xrpc_timeouts.ord_uselect(0LL,usec()), 
"timedout");
-        var aborted := 
xrpc_wsids.ord_uselect(lng_nil,lng_nil).mirror().leftfetchjoin(xrpc_statuses).ord_select("abort");
-        kunion(aborted,timedout)@batloop() {
-            var wslock := xrpc_locks.find($h);
-            if (lock_try(ws_lock) = 0) {
-                # we just leave the list to grow for now :( 
-                var ws := ws_id(xrpc_wsids.find($h));
-                xrpc_statuses.inplace($h, $t);
-                xrpc_timeout.inplace($h, lng_nil);
-                xrpc_wsids.inplace($h, lng_nil);
-                xrpc_wslocks.inplace($h, lock_nil);
-                lock_destroy(wslock);
-                ws_destroy(ws);
-            }
-        }
+        CATCH(_ws_xrpc_prune());
         lock_unset(xrpc_lock);
     }
 }
@@ -848,6 +887,7 @@
               # of those, all master rid_* bats, and their (remapped) pre_* 
views
     ws_mem := ws_mem.slice(NID_RID, INT_MAX).rename("ws_mem");
 
+
 # scans for empty collections, and produces a list of kill-bats (fast - done 
inside the short lock)
 PROC _collection_cleanup() : BAT[str,str]
 {
@@ -1433,34 +1473,54 @@
     return xrpc;
 }
 
-# create a new ws if necessary (res <= 0), returns wsid
-PROC _ws_new(int xrpc, oid idx, int update) : lng
+# create a new ws if necessary (res <= 0)
+PROC _ws_new(int xrpc, oid idx, int update) : BAT[void,bat]
 {
-    var wsid;
+    var ws;
     if (xrpc > 0) {
-        wsid := xrpc_wsids.find(idx);
+        ws := bat(xrpc_wsids.find(idx));
+        xrpc_statuses.inplace(idx,"exec");
     } else {
         var id := and(lng(newoid(1)), 2147483647LL);
-        var ws := [ws_new](mirror(ws_tpe), ws_tpe, ws_htp, ws_ttp, ws_seq);
+        ws := [ws_new](mirror(ws_tpe), ws_tpe, ws_htp, ws_ttp, ws_seq);
         wsid := <<(id, 32) + (lng(update) << 30) + lng(ws);
         ws.access(BAT_READ).rename(str(wsid));
         if (xrpc < 0) {
             var wslock := lock_create();
             xrpc_qids.append(xrpc_qid);
-            xrpc_timeouts.append(usec() + xrpc_timeout);
+            xrpc_timeouts.append(usec() + *(1000LL * xrpc_timeout));
             xrpc_wsids.append(wsid);
             xrpc_locks.append(wslock);
             xrpc_statuses.append("exec");
         }
     }
-    return wsid;
+    return ws;
+}
+
+   
+var update := 0;
+var xrpc_qid := "";
+var xrpc_seqnr := "";
+var xrpc_mode := "";
+var ws, wsid, xrpc, idx; ws := _ws_new(xrpc := _ws_xrpcget(), idx := 
oid(abs(xrpc)), update); wsid := ws_id(ws);
+
+PROC ws_end(BAT[void,bat] ws, str err) : int 
+{
+    ws_log_wsid := ws_id(ws);
+    if (not(isnil(err))) ws_log(ws, err);
+    ws_destroy(ws);
+    if (xrpc_qid = "") return 1;
+    lock_set(xrpc_lock);
+    CATCH(_ws_xrpc_end(wsid, err));
+    lock_unset(xrpc_lock);
+    return 2; # do not auto-restart 2PC transactions ever
 }
 
 PROC ws_create(int update) : BAT[void,bat]
 {
     # NOTE: use pre-query MIL variables xrpc_qid/xrpc_timeout to possibly 
re-use an existing ws
     lock_set(xrpc_lock);
-    var ws, xrpc, idx, err := CATCH(wsid := _ws_new(xrpc := _ws_xrpcget(), idx 
:= oid(abs(xrpc)), update), ws := bat(str(wsid)));
+    var ws, wsid, xrpc, idx, err := CATCH(ws := _ws_new(xrpc := _ws_xrpcget(), 
idx := oid(abs(xrpc)), update), wsid := ws_id(ws));
     lock_unset(xrpc_lock);
     if (not(isnil(err))) ERROR(err);
 
@@ -4801,7 +4861,7 @@
     int mt = xquery_types[t].monet_tpe;
     int k = xquery_types[t].kind;
     int vallen = sizeof(oid)+sizeof(oid);
-    size_t m = strlen(mil);
+    size_t m = mil?strlen(mil):0;
     oid buf[2], id;
     void *val = (void*) buf;
 
@@ -4812,7 +4872,10 @@
     } else if (ATOMfromstr(mt, &val, &vallen, v) <= 0) {
        return "xquery_parse_val: illegal value.\n";
     }
-    if (mt == TYPE_int) {
+    if (mt == TYPE_bit) {
+        id = (*(bit*) val == TRUE);
+        @:bunappend(item, &id, (size_t) *(oid*), SZFMT, "@0")@
+    } else if (mt == TYPE_lng) {
         @:bunappend(int_values, val, *(lng*), LLFMT, "LL")@
         BATiter ivi = bat_iterator(int_values);
         id = *(oid*) BUNhead(ivi, BUNfnd(BATmirror(int_values), val));
@@ -5915,7 +5978,7 @@
     }
 }
 @= setvar
-    if (strcmp(*(ctx->@1), @2)) { 
+    if (@2 && strcmp(*(ctx->@1), @2)) { 
         char* newval = GDKstrdup(@2);
         if (newval == NULL) return "setvar(@1): malloc failure";
         GDKfree(*(ctx->@1));

U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.4.6
retrieving revision 1.68.4.7
diff -u -d -r1.68.4.6 -r1.68.4.7
--- xrpc_server.mx      5 Jun 2008 21:17:55 -0000       1.68.4.6
+++ xrpc_server.mx      6 Jun 2008 14:39:54 -0000       1.68.4.7
@@ -134,9 +134,6 @@
 \"monet_environment\"",
 "xrpc_server");
 
-PROC print_xrpc() : void {
-    print(xrpc_qids,xrpc_statuses,xrpc_timeouts,xrpc_locks,xrpc_wsids);
-}
 @h
 #ifndef XRPC_SERVER_H
 #define XRPC_SERVER_H
@@ -225,7 +222,7 @@
                    " xrpc:method=\"%s\""\
                    " xrpc:arity=\"%lld\""\
                    " xrpc:iter-count=\"%lld\""\
-                   " xrpc:caller=\"%s\""\
+                   " xrpc:caller=\"%s:%d:"LLFMT"\""\
                    " xrpc:mode=\"%s\""\
                    " xrpc:updCall=\"%s\">"
 
@@ -593,7 +590,7 @@
                    bit isAdmin)
 {
     XRPCreq_t *req = NULL, *res = NULL;
-    char* msg = participants?XRPC_REQUEST:XRPC_RESPONSE;
+    char* msg = participants?XRPC_RESPONSE:XRPC_REQUEST;
     char *module = NULL, *method = NULL, *location = NULL;
     char *qid = NULL, *host = NULL, *caller = "query,0";
     char *querynr_str = NULL, *timeout_str = NULL, *mode_str = NULL;
@@ -997,38 +994,6 @@
     return xrpc_parse_message(out, shredBAT, NULL, isAdmin);
 }
 
-/* return: 1: timedout; 0: not timedout; -1: not found; -2: other errors */
-int
-check_timeout_by_qid(char *qid, lng timeout)
-{
-    BATiter qidsi = bat_iterator(xrpc_qids);
-    oid qid_idx = oid_nil;
-    BUN bun_qid = BUN_NONE;
-    int ret = 1;
-
-    if(timeout > GDKusec())
-        return 0; /* not timed out */
-
-    MT_set_lock(PF_XRPC_LOCK, "check_timeout_by_qid");
-    if((bun_qid = BUNfnd(BATmirror(xrpc_qids), qid)) ==  BUN_NONE) {
-        GDKerror("check_timeout_by_qid: "
-                 "could not find QID \"%s\" in the meta-BAT "
-                 "\"xrpc_qids\"\n", qid);
-        ret = -1;
-    }
-    qid_idx =  *(oid*)BUNtail(qidsi, bun_qid);
-    if(!BUNreplace(xrpc_statuses, &qid_idx, "timedout", FALSE)){
-        MT_unset_lock(PF_XRPC_LOCK, "check_timeout_by_qid");
-        GDKerror("check_timeout_by_qid: "
-                 "failed to change the status of query \"%s\" into "
-                 "\"timedout\"\n", qid);
-        ret = -2;
-    }
-    MT_unset_lock(PF_XRPC_LOCK, "check_timeout_by_qid");
-    return ret; /* timed out */
-}
-
-
 /**
  * @return GDK_SUCCEED, or
  *         GDK_FAIL if an error has occurred.
@@ -1075,32 +1040,15 @@
         ret = GDK_FAIL;
         /* if error was caused by other problems, we haven't checked
          * query expiration time */
-        int tdo = check_timeout_by_qid(req->qid, req->start + req->timeout);
         snprintf(errbuf_all, GDKMAXERRLEN*2,
-                "Error occurred during execution:\n%s\n%s\n%s",
-                tdo > 0 ? "xrpc query timed out" : "",
+                "Error occurred during execution:s\n%s\n%s",
                 *errbuf ? errbuf : "",
                 err == ((char*)-1) ? "no further error message" : err);
-        send_err(mc->c->fdout, tdo > 0 ? ERR504 : ERR404,
-                               tdo > 0 ? "env:Receiver" : "env:Sender",
-                               errbuf_all);
+        send_err(mc->c->fdout, ERR404, "env:Sender", errbuf_all);
     } else if (req->updCall) {
-        /* updating functions donot involve the serializer, hence,
-         * time-out not checked yet */
-        ret = check_timeout_by_qid(req->qid, req->start + req->timeout);
-        if(ret){
-            snprintf(errbuf_all, GDKMAXERRLEN*2,
-                    "Error occurred when checking query expiration 
time:\n%s\n%s",
-                    ret > 0 ? "xrpc query timed out" : "",
-                    *errbuf ? errbuf : "no further error message.");
-            send_err(mc->c->fdout, ret > 0 ? ERR504 : ERR404,
-                                   ret > 0 ? "env:Receiver" : "env:Sender",
-                                   errbuf_all);
-            ret = GDK_FAIL;
-        } else { /* send empty HTTP OK header for updating request */
-            stream_write(mc->c->fdout, "HTTP/1.1 200 OK\r\n"
+        /* send empty HTTP OK header for updating request */
+        stream_write(mc->c->fdout, "HTTP/1.1 200 OK\r\n"
                     "Content-type: text/xml; charset=\"utf-8\"\r\n\r\n", 1, 
60);
-        }
     }
 
     stream_flush(mc->c->fdout);


-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to