Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv30982/runtime
Modified Files:
Tag: xrpcdemo
pathfinder.mx serialize_dflt.mx xrpc_client.mx xrpc_server.mx
Log Message:
another premature checkin... it does not work fully yet.. just to share
U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.45.4.6
retrieving revision 1.45.4.7
diff -u -d -r1.45.4.6 -r1.45.4.7
--- xrpc_client.mx 5 Jun 2008 21:17:54 -0000 1.45.4.6
+++ xrpc_client.mx 6 Jun 2008 14:39:53 -0000 1.45.4.7
@@ -47,8 +47,8 @@
str genType,
str mode,
str qid,
- lng timeout,
lng seqnr,
+ lng timeout,
str dst,
str module,
str location,
@@ -145,7 +145,7 @@
}
var local_name := "rpc_res_00" + str(int($h)+off);
- var rpc_res, timeout := max(0LL,xrpc_timeout - (usec() - timer_start));
+ var rpc_res, timeout := max(0LL,xrpc_timeout - ((usec() - time_start)
/ 1000));
var rpc_err := CATCH(rpc_res := http_post(genType, xrpc_mode,
xrpc_qid, xrpc_seqnr, timeout, $t, modname, location, method,
updCall, arity, itercnt, ws, fun_vid, fun_iter, fun_item,
fun_kind, int_values, dbl_values, dec_values, str_values));
@@ -166,21 +166,13 @@
}
} else {
printf("doLoopLiftedRPC: error occurred at %s:\n%s\n", $t,
rpc_err);
- if(xrpc_qid != "") {
- # abort a 2PC-XRPC query on error
- lock_set(xrpc_lock);
- CATCH({xrpc_statuses.replace(idx, "abort");});
- lock_unset(xrpc_lock);
- # TODO: stop immediately further execution and clean up
- } else {
- # A none-isolation XRPC query: we do not want to discard
- # results from other destinations where executions might
- # have succeeded, so we only print a message by error,
- # iso. terminate the execution with 'ERROR'
- lock_set(rpcerr_lock);
- CATCH({ rpc_errors := rpc_errors + $t + ": " + rpc_err + "\n";
});
- lock_unset(rpcerr_lock);
- }
+ # A none-isolation XRPC query: we do not want to discard
+ # results from other destinations where executions might
+ # have succeeded, so we only print a message by error,
+ # iso. terminate the execution with 'ERROR'
+ lock_set(rpcerr_lock);
+ CATCH({ rpc_errors := rpc_errors + $t + ": " + rpc_err + "\n"; });
+ lock_unset(rpcerr_lock);
}
}
if (xrpc_qid != "") lock_set(wslock);
@@ -204,7 +196,7 @@
if(count(rpc_results) = 0) {
ERROR("doLoopLiftedRPC: execution failed at all destinations:\n%s\n",
rpc_errors);
} else if(rpc_errors != "") {
- printf("doLoopLiftedRPC: execution failed at some
destinations:\n%s\n", rpc_errors);
+ ERROR("doLoopLiftedRPC: execution failed at some destinations:\n%s\n",
rpc_errors);
}
# retrieve results for this destination, and map the results back to
@@ -290,7 +282,7 @@
var local_name := "rpc_res_00" + str(h+1);
if (xrpc_qid != "") lock_unset(wslock);
- var rpc_res, timeout := max(0LL,xrpc_timeout - (usec() - timer_start));
+ var rpc_res, timeout := max(0LL,xrpc_timeout - ((usec() -
time_start)/1000));
var rpc_err := CATCH(rpc_res := http_post(genType, xrpc_mode,
xrpc_qid, xrpc_seqnr, timeout, $t, modname, location, method,
updCall, arity, lng(1), ws, cur_fun_vid, cur_fun_iter,
cur_fun_item,
cur_fun_kind, int_values, dbl_values, dec_values,
str_values));
@@ -380,7 +372,7 @@
doIterativeRPC(modname, location, method, updCall, arity, niters,
ws, dsts, fun_vid, fun_iter, fun_item, fun_kind,
int_values, dbl_values, dec_values, str_values);
else
- doLoopliftedRPC(modname, location, method, updCall, arity, niters,
+ doLoopLiftedRPC(modname, location, method, updCall, arity, niters,
ws, dsts, fun_vid, fun_iter, fun_item, fun_kind,
int_values, dbl_values, dec_values, str_values);
}
@@ -814,7 +806,7 @@
str genType,
str rpc_mode,
str qid,
- str seqnr,
+ lng seqnr,
lng timeout,
str rpc_module,
str rpc_uri,
@@ -918,7 +910,7 @@
return clean_up(bs, argcnt, iterc);
}
ret = stream_printf(bs, XRPC_REQ_HEADER, rpc_module, rpc_uri, rpc_method,
- arity, iterc, seqnr, rpc_mode,
updCall?"true":"false");
+ arity, iterc, xrpc_hostname, xrpc_port, seqnr,
rpc_mode, updCall?"true":"false");
if (ret < 0)
return clean_up(bs, argcnt, iterc);
@@ -1124,7 +1116,7 @@
str genType,
str rpc_mode,
str qid,
- str seqnr,
+ lng *seqnr,
lng *timeout,
str dst,
str rpc_module,
@@ -1170,7 +1162,7 @@
return GDK_FAIL;
}
- if(!(req = byvalue_request(timing, genType, rpc_mode, qid, seqnr,
*timeout, rpc_module, rpc_uri, rpc_method,
+ if(!(req = byvalue_request(timing, genType, rpc_mode, qid, *seqnr,
*timeout, rpc_module, rpc_uri, rpc_method,
*updCall, *arity, *itercnt, ws, fun_vid, fun_iter,
fun_item, fun_kind, int_values, dbl_values,
dec_values, str_values))){
U serialize_dflt.mx
Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.46.4.4
retrieving revision 1.46.4.5
diff -u -d -r1.46.4.4 -r1.46.4.5
--- serialize_dflt.mx 5 Jun 2008 21:17:53 -0000 1.46.4.4
+++ serialize_dflt.mx 6 Jun 2008 14:39:52 -0000 1.46.4.5
@@ -994,7 +994,7 @@
xrpc_startSerialize(XqueryCtx *ctx)
{
int len, ret = 0;
- lng time_exec = GDKusec() - ctx->xrpc_start;
+ lng time_exec = (GDKusec() - ctx->xrpc_start) / 1000;
if (ctx->xrpc_qid[0] && time_exec > ctx->xrpc_timeout) {
GDKerror("xrpc_startSerialize: xrpc-query timed out\n");
@@ -1041,7 +1041,7 @@
if (ret < 0) return PROBLEM;
}
ret = stream_printf(ctx->out, XRPC_RES_HEADER, ctx->xrpc_module,
ctx->xrpc_method);
- return ret?PROBLEM:SUCCESS;
+ return (ret < 0)?PROBLEM:SUCCESS;
}
/**
U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.8
retrieving revision 1.416.2.1.2.9
diff -u -d -r1.416.2.1.2.8 -r1.416.2.1.2.9
--- pathfinder.mx 5 Jun 2008 21:17:50 -0000 1.416.2.1.2.8
+++ pathfinder.mx 6 Jun 2008 14:39:51 -0000 1.416.2.1.2.9
@@ -511,11 +511,64 @@
var xrpc_timeouts := bat("xrpc_timeouts"); # bat[void,lng] query timeout in
usec
var xrpc_wsids := bat("xrpc_wsids"); # bat[void,lng] ID of the
workingset associated with this query
var xrpc_locks := bat("xrpc_locks"); # bat[void,lock] lock for the
workingset of this query
-var xrpc_statuses := bat("xrpc_statuses"); # bat[void,str] 2PC status of this
query: "exec", "prepare", "commit" or "abort"
+var xrpc_statuses := bat("xrpc_statuses"); # bat[void,str] 2PC status of this
query: "exec", "wait", "prepare", "commit", "abort" or "timeout"
var xrpc_lock := pflock_get(6); # master XRPC lock, to protect above
xrpc_* BATs
var xrpc_hostport := my_hostname() + ":" + str(get_xrpc_port());
var xrpc_querynr := 9999999LL;
+# abort a 2PC transaction, keeping the record (note ws should have been
destroyed already)
+PROC _ws_xrpc_abort(oid idx, str status) : void
+{
+ var wslock := xrpc_locks.find(idx);
+ xrpc_statuses.inplace(idx, status);
+ xrpc_wsids.inplace(idx, lng_nil);
+ xrpc_wslocks.inplace(idx, lock_nil);
+ lock_destroy(wslock);
+}
+
+# end a 2PC request. On success, unlock ws and keep waiting for more. On
failure, abort it.
+PROC _ws_xrpc_end(lng wsid, str errmsg) : void
+{
+ var idx := xrpc_qids.find(xrpc_qid);
+ if (isnil(errmsg)) {
+ xrpc_statuses.inplace(idx, "wait");
+ lock_unset(xrpc_wslocks.find(idx));
+ } else {
+ _ws_xrpc_abort(idx, "abort");
+ }
+}
+
+# background check to time-out waiting 2PC transactions, and prune them fully
after an hour
+PROC _ws_xrpc_prune() : void
+{
+ var lim := usec();
+ var timeout :=
mirror(xrpc_statuses.ord_select("wait")).leftfetchjoin(xrpc_timeouts).ord_uselect(0LL,lim).project("timeout");
+ var aborted :=
mirror(xrpc_wsids.ord_uselect(lng_nil,lng_nil)).leftfetchjoin(xrpc_statuses).ord_select("abort");
+
+ kunion(aborted,timeout)@batloop() {
+ var ws := bat(str(xrpc_wsids.find($h)));
+ ws_destroy(ws);
+ _ws_xrpc_abort($h,$t);
+ }
+ var relevant := xrpc_timeout.ord_uselect(lim -
3600000000LL,lng_nil).hmark([EMAIL PROTECTED]);
+ if (count(relevant) < count(xrpc_timeout)) {
+ # pruned xrpc requests should all have destroyed wslock and ws (we let
them leak here)
+ var relevant_qids := relevant.leftfetchjoin(xrpc_qids);
xrpc_qids.delete().append(relevant_qids);
+ var relevant_timeouts := relevant.leftfetchjoin(xrpc_qtimeouts);
xrpc_timeouts.delete().append(relevant_timeouts);
+ var relevant_wsids := relevant.leftfetchjoin(xrpc_wsids);
xrpc_wsids.delete().append(relevant_wsids);
+ var relevant_locks := relevant.leftfetchjoin(xrpc_locks);
xrpc_locks.delete().append(relevant_locks);
+ var relevant_statuses := relevant.leftfetchjoin(xrpc_statuses);
xrpc_statuses.delete().append(relevant_statuses);
+ }
+}
+
+PROC xrpc_status() : void {
+ lock_set(xrpc_lock);
+ var err :=
CATCH(print(xrpc_qids,xrpc_statuses,xrpc_timeouts,xrpc_locks,xrpc_wsids));
+ lock_unset(xrpc_lock);
+ if (not(isnil(err))) GDKerror(err);
+}
+
+
# transaction debugging / performance profiling
var ws_log_lock := lock_create();
var ws_log := bat(lng,str,1000000).rename("ws_log");
@@ -732,21 +785,7 @@
# remove working sets of non-busy XRPC requests that timed out or
aborted
lock_set(xrpc_lock);
- var timedout := project(xrpc_timeouts.ord_uselect(0LL,usec()),
"timedout");
- var aborted :=
xrpc_wsids.ord_uselect(lng_nil,lng_nil).mirror().leftfetchjoin(xrpc_statuses).ord_select("abort");
- kunion(aborted,timedout)@batloop() {
- var wslock := xrpc_locks.find($h);
- if (lock_try(ws_lock) = 0) {
- # we just leave the list to grow for now :(
- var ws := ws_id(xrpc_wsids.find($h));
- xrpc_statuses.inplace($h, $t);
- xrpc_timeout.inplace($h, lng_nil);
- xrpc_wsids.inplace($h, lng_nil);
- xrpc_wslocks.inplace($h, lock_nil);
- lock_destroy(wslock);
- ws_destroy(ws);
- }
- }
+ CATCH(_ws_xrpc_prune());
lock_unset(xrpc_lock);
}
}
@@ -848,6 +887,7 @@
# of those, all master rid_* bats, and their (remapped) pre_*
views
ws_mem := ws_mem.slice(NID_RID, INT_MAX).rename("ws_mem");
+
# scans for empty collections, and produces a list of kill-bats (fast - done
inside the short lock)
PROC _collection_cleanup() : BAT[str,str]
{
@@ -1433,34 +1473,54 @@
return xrpc;
}
-# create a new ws if necessary (res <= 0), returns wsid
-PROC _ws_new(int xrpc, oid idx, int update) : lng
+# create a new ws if necessary (res <= 0)
+PROC _ws_new(int xrpc, oid idx, int update) : BAT[void,bat]
{
- var wsid;
+ var ws;
if (xrpc > 0) {
- wsid := xrpc_wsids.find(idx);
+ ws := bat(xrpc_wsids.find(idx));
+ xrpc_statuses.inplace(idx,"exec");
} else {
var id := and(lng(newoid(1)), 2147483647LL);
- var ws := [ws_new](mirror(ws_tpe), ws_tpe, ws_htp, ws_ttp, ws_seq);
+ ws := [ws_new](mirror(ws_tpe), ws_tpe, ws_htp, ws_ttp, ws_seq);
wsid := <<(id, 32) + (lng(update) << 30) + lng(ws);
ws.access(BAT_READ).rename(str(wsid));
if (xrpc < 0) {
var wslock := lock_create();
xrpc_qids.append(xrpc_qid);
- xrpc_timeouts.append(usec() + xrpc_timeout);
+ xrpc_timeouts.append(usec() + *(1000LL * xrpc_timeout));
xrpc_wsids.append(wsid);
xrpc_locks.append(wslock);
xrpc_statuses.append("exec");
}
}
- return wsid;
+ return ws;
+}
+
+
+var update := 0;
+var xrpc_qid := "";
+var xrpc_seqnr := "";
+var xrpc_mode := "";
+var ws, wsid, xrpc, idx; ws := _ws_new(xrpc := _ws_xrpcget(), idx :=
oid(abs(xrpc)), update); wsid := ws_id(ws);
+
+PROC ws_end(BAT[void,bat] ws, str err) : int
+{
+ ws_log_wsid := ws_id(ws);
+ if (not(isnil(err))) ws_log(ws, err);
+ ws_destroy(ws);
+ if (xrpc_qid = "") return 1;
+ lock_set(xrpc_lock);
+ CATCH(_ws_xrpc_end(wsid, err));
+ lock_unset(xrpc_lock);
+ return 2; # do not auto-restart 2PC transactions ever
}
PROC ws_create(int update) : BAT[void,bat]
{
# NOTE: use pre-query MIL variables xrpc_qid/xrpc_timeout to possibly
re-use an existing ws
lock_set(xrpc_lock);
- var ws, xrpc, idx, err := CATCH(wsid := _ws_new(xrpc := _ws_xrpcget(), idx
:= oid(abs(xrpc)), update), ws := bat(str(wsid)));
+ var ws, wsid, xrpc, idx, err := CATCH(ws := _ws_new(xrpc := _ws_xrpcget(),
idx := oid(abs(xrpc)), update), wsid := ws_id(ws));
lock_unset(xrpc_lock);
if (not(isnil(err))) ERROR(err);
@@ -4801,7 +4861,7 @@
int mt = xquery_types[t].monet_tpe;
int k = xquery_types[t].kind;
int vallen = sizeof(oid)+sizeof(oid);
- size_t m = strlen(mil);
+ size_t m = mil?strlen(mil):0;
oid buf[2], id;
void *val = (void*) buf;
@@ -4812,7 +4872,10 @@
} else if (ATOMfromstr(mt, &val, &vallen, v) <= 0) {
return "xquery_parse_val: illegal value.\n";
}
- if (mt == TYPE_int) {
+ if (mt == TYPE_bit) {
+ id = (*(bit*) val == TRUE);
+ @:bunappend(item, &id, (size_t) *(oid*), SZFMT, "@0")@
+ } else if (mt == TYPE_lng) {
@:bunappend(int_values, val, *(lng*), LLFMT, "LL")@
BATiter ivi = bat_iterator(int_values);
id = *(oid*) BUNhead(ivi, BUNfnd(BATmirror(int_values), val));
@@ -5915,7 +5978,7 @@
}
}
@= setvar
- if (strcmp(*(ctx->@1), @2)) {
+ if (@2 && strcmp(*(ctx->@1), @2)) {
char* newval = GDKstrdup(@2);
if (newval == NULL) return "setvar(@1): malloc failure";
GDKfree(*(ctx->@1));
U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.4.6
retrieving revision 1.68.4.7
diff -u -d -r1.68.4.6 -r1.68.4.7
--- xrpc_server.mx 5 Jun 2008 21:17:55 -0000 1.68.4.6
+++ xrpc_server.mx 6 Jun 2008 14:39:54 -0000 1.68.4.7
@@ -134,9 +134,6 @@
\"monet_environment\"",
"xrpc_server");
-PROC print_xrpc() : void {
- print(xrpc_qids,xrpc_statuses,xrpc_timeouts,xrpc_locks,xrpc_wsids);
-}
@h
#ifndef XRPC_SERVER_H
#define XRPC_SERVER_H
@@ -225,7 +222,7 @@
" xrpc:method=\"%s\""\
" xrpc:arity=\"%lld\""\
" xrpc:iter-count=\"%lld\""\
- " xrpc:caller=\"%s\""\
+ " xrpc:caller=\"%s:%d:"LLFMT"\""\
" xrpc:mode=\"%s\""\
" xrpc:updCall=\"%s\">"
@@ -593,7 +590,7 @@
bit isAdmin)
{
XRPCreq_t *req = NULL, *res = NULL;
- char* msg = participants?XRPC_REQUEST:XRPC_RESPONSE;
+ char* msg = participants?XRPC_RESPONSE:XRPC_REQUEST;
char *module = NULL, *method = NULL, *location = NULL;
char *qid = NULL, *host = NULL, *caller = "query,0";
char *querynr_str = NULL, *timeout_str = NULL, *mode_str = NULL;
@@ -997,38 +994,6 @@
return xrpc_parse_message(out, shredBAT, NULL, isAdmin);
}
-/* return: 1: timedout; 0: not timedout; -1: not found; -2: other errors */
-int
-check_timeout_by_qid(char *qid, lng timeout)
-{
- BATiter qidsi = bat_iterator(xrpc_qids);
- oid qid_idx = oid_nil;
- BUN bun_qid = BUN_NONE;
- int ret = 1;
-
- if(timeout > GDKusec())
- return 0; /* not timed out */
-
- MT_set_lock(PF_XRPC_LOCK, "check_timeout_by_qid");
- if((bun_qid = BUNfnd(BATmirror(xrpc_qids), qid)) == BUN_NONE) {
- GDKerror("check_timeout_by_qid: "
- "could not find QID \"%s\" in the meta-BAT "
- "\"xrpc_qids\"\n", qid);
- ret = -1;
- }
- qid_idx = *(oid*)BUNtail(qidsi, bun_qid);
- if(!BUNreplace(xrpc_statuses, &qid_idx, "timedout", FALSE)){
- MT_unset_lock(PF_XRPC_LOCK, "check_timeout_by_qid");
- GDKerror("check_timeout_by_qid: "
- "failed to change the status of query \"%s\" into "
- "\"timedout\"\n", qid);
- ret = -2;
- }
- MT_unset_lock(PF_XRPC_LOCK, "check_timeout_by_qid");
- return ret; /* timed out */
-}
-
-
/**
* @return GDK_SUCCEED, or
* GDK_FAIL if an error has occurred.
@@ -1075,32 +1040,15 @@
ret = GDK_FAIL;
/* if error was caused by other problems, we haven't checked
* query expiration time */
- int tdo = check_timeout_by_qid(req->qid, req->start + req->timeout);
snprintf(errbuf_all, GDKMAXERRLEN*2,
- "Error occurred during execution:\n%s\n%s\n%s",
- tdo > 0 ? "xrpc query timed out" : "",
+ "Error occurred during execution:s\n%s\n%s",
*errbuf ? errbuf : "",
err == ((char*)-1) ? "no further error message" : err);
- send_err(mc->c->fdout, tdo > 0 ? ERR504 : ERR404,
- tdo > 0 ? "env:Receiver" : "env:Sender",
- errbuf_all);
+ send_err(mc->c->fdout, ERR404, "env:Sender", errbuf_all);
} else if (req->updCall) {
- /* updating functions donot involve the serializer, hence,
- * time-out not checked yet */
- ret = check_timeout_by_qid(req->qid, req->start + req->timeout);
- if(ret){
- snprintf(errbuf_all, GDKMAXERRLEN*2,
- "Error occurred when checking query expiration
time:\n%s\n%s",
- ret > 0 ? "xrpc query timed out" : "",
- *errbuf ? errbuf : "no further error message.");
- send_err(mc->c->fdout, ret > 0 ? ERR504 : ERR404,
- ret > 0 ? "env:Receiver" : "env:Sender",
- errbuf_all);
- ret = GDK_FAIL;
- } else { /* send empty HTTP OK header for updating request */
- stream_write(mc->c->fdout, "HTTP/1.1 200 OK\r\n"
+ /* send empty HTTP OK header for updating request */
+ stream_write(mc->c->fdout, "HTTP/1.1 200 OK\r\n"
"Content-type: text/xml; charset=\"utf-8\"\r\n\r\n", 1,
60);
- }
}
stream_flush(mc->c->fdout);
-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins