Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv15778/runtime
Modified Files:
Tag: xrpcdemo
pathfinder.mx serialize_dflt.mx xrpc_client.mx xrpc_server.mx
Log Message:
- more bug fixes. repeatable read does not work yet
U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.45.4.7
retrieving revision 1.45.4.8
diff -u -d -r1.45.4.7 -r1.45.4.8
--- xrpc_client.mx 6 Jun 2008 14:39:53 -0000 1.45.4.7
+++ xrpc_client.mx 7 Jun 2008 00:38:28 -0000 1.45.4.8
@@ -102,7 +102,7 @@
var rpcerr_lock := lock_create(); # guards rpc_errors
var rpc_errors := ""; # holds errors from each destination
var rpcres_lock := lock_create(); # guards rpc_results and rpc_iter
- var rpc_results := bat(str,oid);
+ var rpc_results := bat(str,bat);
var rpc_iter := bat(void,bat).seqbase([EMAIL PROTECTED]);
var off := count(ws.fetch(CONT_NAME));
@@ -110,7 +110,7 @@
var wslock;
if (xrpc_qid != "") {
lock_set(xrpc_lock);
- var err := CATCH(wslock :=
xrpc_locks.fetch(reverse(xrpc_wsids).find(xrpc_id)));
+ var err := CATCH(wslock :=
xrpc_locks.fetch(reverse(xrpc_qids).find(xrpc_qid)));
lock_unset(xrpc_lock);
if (not(isnil(err))) ERROR(err);
lock_unset(wslock);
@@ -145,7 +145,7 @@
}
var local_name := "rpc_res_00" + str(int($h)+off);
- var rpc_res, timeout := max(0LL,xrpc_timeout - ((usec() - time_start)
/ 1000));
+ var rpc_res, timeout := max(0LL,xrpc_timeout - ((usec() -
time_start)/1000));
var rpc_err := CATCH(rpc_res := http_post(genType, xrpc_mode,
xrpc_qid, xrpc_seqnr, timeout, $t, modname, location, method,
updCall, arity, itercnt, ws, fun_vid, fun_iter, fun_item,
fun_kind, int_values, dbl_values, dec_values, str_values));
@@ -202,7 +202,7 @@
# retrieve results for this destination, and map the results back to
# the original iteration number
var proc_res := [get_rpc_res](
- [ws_opencoll](const ws, rpc_results.tmark([EMAIL PROTECTED]),
local_name, TEMP_DOC),
+ [ws_opencoll](const ws, rpc_results.tmark([EMAIL PROTECTED]),
rpc_results.hmark([EMAIL PROTECTED]), TEMP_DOC),
const ws, const int_values, const dbl_values, const str_values);
res_iter := proc_res.[fetch](0).[leftfetchjoin](rpc_iter);
@@ -228,7 +228,6 @@
printf("XRPC_Client_DeSerialisation (get_rpc_res): %lld microsec\n",
time_xrpcClntDeSeria);
}
-
return res_bats;
}
ADDHELP("doLoopLiftedRPC", "zhang", "April 2006",
@@ -257,7 +256,7 @@
var wslock;
if (xrpc_qid != "") {
lock_set(xrpc_lock);
- var err := CATCH(wslock :=
xrpc_locks.fetch(reverse(xrpc_wsids).find(xrpc_id)));
+ var err := CATCH(wslock :=
xrpc_locks.fetch(reverse(xrpc_qids).find(xrpc_qid)));
lock_unset(xrpc_lock);
if (not(isnil(err))) ERROR(err);
}
@@ -369,11 +368,11 @@
BAT[void, str] str_values) : BAT[void,bat]
{
if (search(xrpc_mode,"iterative") >= 0)
- doIterativeRPC(modname, location, method, updCall, arity, niters,
- ws, dsts, fun_vid, fun_iter, fun_item, fun_kind,
int_values, dbl_values, dec_values, str_values);
+ return doIterativeRPC(modname, location, method, updCall, arity,
niters,
+ ws, dsts, fun_vid, fun_iter, fun_item, fun_kind,
int_values, dbl_values, dec_values, str_values);
else
- doLoopLiftedRPC(modname, location, method, updCall, arity, niters,
- ws, dsts, fun_vid, fun_iter, fun_item, fun_kind,
int_values, dbl_values, dec_values, str_values);
+ return doLoopLiftedRPC(modname, location, method, updCall, arity,
niters,
+ ws, dsts, fun_vid, fun_iter, fun_item,
fun_kind, int_values, dbl_values, dec_values, str_values);
}
@@ -414,9 +413,14 @@
if (shredBAT) {
XRPCreq_t *req;
/* create a list of all main document bats (as parse_message likes
it that way) */
- for(i=0; i < ATTR_OWN; i++)
- BUNappend(shredBAT, BUNtail(wi, *cont), FALSE);
-
+ for(i=0; i <= ATTR_PROP; i++) {
+ BAT *b = BATdescriptor(*(bat*) BUNtail(wi, i));
+ if (b) {
+ BATiter bi = bat_iterator(b);
+ BUNappend(shredBAT, BUNtail(bi, *cont), FALSE);
+ BBPunfix(b->batCacheid);
+ }
+ }
req = xrpc_parse_message(NULL, shredBAT, participants, FALSE);
if (req) {
BAT *res_iter = BATnew(TYPE_void, TYPE_oid, req->nr_args);
@@ -905,7 +909,7 @@
return clean_up(bs, argcnt, iterc);
if(qid && *qid) { /* an XRPC query that requires 2PC */
- ret = stream_printf(bs, "<env:Header" XRPC_WS_QID "</env:Header" ,
qid, timeout);
+ ret = stream_printf(bs, "<env:Header>" XRPC_WS_QID "</env:Header>" ,
qid, timeout);
if (ret < 0)
return clean_up(bs, argcnt, iterc);
}
U serialize_dflt.mx
Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.46.4.5
retrieving revision 1.46.4.6
diff -u -d -r1.46.4.5 -r1.46.4.6
--- serialize_dflt.mx 6 Jun 2008 14:39:52 -0000 1.46.4.5
+++ serialize_dflt.mx 7 Jun 2008 00:38:27 -0000 1.46.4.6
@@ -1017,11 +1017,11 @@
if (b == NULL) return PROBLEM;
ret = stream_printf(ctx->out, "<env:Header>");
- if (ret == 0 && ctx->xrpc_qid[0])
+ if (ret >= 0 && ctx->xrpc_qid[0])
ret = stream_printf(ctx->out, XRPC_WS_QID, ctx->xrpc_qid,
ctx->xrpc_timeout);
/* XRPC participants lists for nested transactions - append an extra
item for this call */
- if(ret == 0)
+ if(ret >= 0)
ret = stream_printf(ctx->out,
"<xrpc:participants>\n <xrpc:participant>%s,%s,"
LLFMT ",%s:%d," LLFMT "</xrpc:participant>\n",
@@ -1034,7 +1034,7 @@
if (ret < 0) break;
ret = stream_printf(ctx->out, "
<xrpc:participant>%s</xrpc:participant>\n", BUNtail(bi,p));
}
- if(ret == 0)
+ if (ret >= 0)
ret = stream_printf(ctx->out,
"</xrpc:participants>\n</env:Header>\n");
BBPunfix(bid);
U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.9
retrieving revision 1.416.2.1.2.10
diff -u -d -r1.416.2.1.2.9 -r1.416.2.1.2.10
--- pathfinder.mx 6 Jun 2008 14:39:51 -0000 1.416.2.1.2.9
+++ pathfinder.mx 7 Jun 2008 00:38:26 -0000 1.416.2.1.2.10
@@ -520,19 +520,21 @@
PROC _ws_xrpc_abort(oid idx, str status) : void
{
var wslock := xrpc_locks.find(idx);
+ var ws := bat(str(xrpc_wsids.find(idx)));
+ ws_destroy(ws);
xrpc_statuses.inplace(idx, status);
xrpc_wsids.inplace(idx, lng_nil);
- xrpc_wslocks.inplace(idx, lock_nil);
+ xrpc_locks.inplace(idx, lock_nil);
lock_destroy(wslock);
}
# end a 2PC request. On success, unlock ws and keep waiting for more. On
failure, abort it.
PROC _ws_xrpc_end(lng wsid, str errmsg) : void
{
- var idx := xrpc_qids.find(xrpc_qid);
+ var idx := reverse(xrpc_qids).find(xrpc_qid);
if (isnil(errmsg)) {
xrpc_statuses.inplace(idx, "wait");
- lock_unset(xrpc_wslocks.find(idx));
+ lock_unset(xrpc_locks.find(idx));
} else {
_ws_xrpc_abort(idx, "abort");
}
@@ -546,12 +548,10 @@
var aborted :=
mirror(xrpc_wsids.ord_uselect(lng_nil,lng_nil)).leftfetchjoin(xrpc_statuses).ord_select("abort");
kunion(aborted,timeout)@batloop() {
- var ws := bat(str(xrpc_wsids.find($h)));
- ws_destroy(ws);
_ws_xrpc_abort($h,$t);
}
- var relevant := xrpc_timeout.ord_uselect(lim -
3600000000LL,lng_nil).hmark([EMAIL PROTECTED]);
- if (count(relevant) < count(xrpc_timeout)) {
+ var relevant := xrpc_timeouts.ord_uselect(lim -
3600000000LL,lng_nil).hmark([EMAIL PROTECTED]);
+ if (count(relevant) < count(xrpc_timeouts)) {
# pruned xrpc requests should all have destroyed wslock and ws (we let
them leak here)
var relevant_qids := relevant.leftfetchjoin(xrpc_qids);
xrpc_qids.delete().append(relevant_qids);
var relevant_timeouts := relevant.leftfetchjoin(xrpc_qtimeouts);
xrpc_timeouts.delete().append(relevant_timeouts);
@@ -1465,9 +1465,9 @@
xrpc := -(int(count(xrpc_qids))+1); # default: create a new ws
if (xrpc_qid = "") {
xrpc_qid := xrpc_hostport + ":" + str(xrpc_seqnr); #init
- xpc_coord := true;
+ xrpc_coord := true;
} else if (xrpc_qids.texist(xrpc_qid)) {
- xrpc := int(reverse(xrpc_qids).find(qid)); # reuse an old ws
+ xrpc := int(reverse(xrpc_qids).find(xrpc_qid)); # reuse an old ws
}
}
return xrpc;
@@ -1478,7 +1478,7 @@
{
var ws;
if (xrpc > 0) {
- ws := bat(xrpc_wsids.find(idx));
+ ws := bat(str(xrpc_wsids.find(idx)));
xrpc_statuses.inplace(idx,"exec");
} else {
var id := and(lng(newoid(1)), 2147483647LL);
@@ -1488,7 +1488,7 @@
if (xrpc < 0) {
var wslock := lock_create();
xrpc_qids.append(xrpc_qid);
- xrpc_timeouts.append(usec() + *(1000LL * xrpc_timeout));
+ xrpc_timeouts.append(usec() + *(1000LL, xrpc_timeout));
xrpc_wsids.append(wsid);
xrpc_locks.append(wslock);
xrpc_statuses.append("exec");
@@ -1499,19 +1499,22 @@
var update := 0;
-var xrpc_qid := "";
-var xrpc_seqnr := "";
-var xrpc_mode := "";
+var xrpc_qid := "1";
+var xrpc_seqnr := 1LL;
+var xrpc_timeout := 30000LL;
+var xrpc_mode := "repeatable";
var ws, wsid, xrpc, idx; ws := _ws_new(xrpc := _ws_xrpcget(), idx :=
oid(abs(xrpc)), update); wsid := ws_id(ws);
PROC ws_end(BAT[void,bat] ws, str err) : int
{
ws_log_wsid := ws_id(ws);
if (not(isnil(err))) ws_log(ws, err);
- ws_destroy(ws);
- if (xrpc_qid = "") return 1;
+ if (xrpc_qid = "") {
+ ws_destroy(ws);
+ return 1;
+ }
lock_set(xrpc_lock);
- CATCH(_ws_xrpc_end(wsid, err));
+ CATCH(_ws_xrpc_end(ws_log_wsid, err));
lock_unset(xrpc_lock);
return 2; # do not auto-restart 2PC transactions ever
}
U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.4.7
retrieving revision 1.68.4.8
diff -u -d -r1.68.4.7 -r1.68.4.8
--- xrpc_server.mx 6 Jun 2008 14:39:54 -0000 1.68.4.7
+++ xrpc_server.mx 7 Jun 2008 00:38:29 -0000 1.68.4.8
@@ -211,7 +211,7 @@
"<wscoor:CoordinationContext xmlns:wscoor=\""WSCOOR_NS"\""\
"env:mustUnderstand=\"true\">"\
"<wscoor:Identifier>%s</wscoor:Identifier>"\
- "<wscoor:Expires>%s</wscoor:Expires>"\
+ "<wscoor:Expires>"LLFMT"</wscoor:Expires>"\
"<wscoor:CoordinationType>" WSAT_NS "</wscoor:CoordinationType>"\
"</wscoor:CoordinationContext>"
@@ -764,9 +764,9 @@
if (iterc == -1) {
/* if no iterc was found in the request (and always for XRPC
responses), just count sequence elements */
BAT *qn_seq = BATselect(qn_uri_loc, XRPC_NS"|sequence",
XRPC_NS"|sequence");
- elt_qn = get_elt_qn(pre_prop, qn_seq);
+ elt_qn = get_elt_qn(pre_kind, pre_prop);
if (elt_qn && qn_seq) {
- BAT* elt_seq = BATmirror(BATsemijoin(BATmirror(elt_qn), elt_seq));
+ BAT* elt_seq = BATmirror(BATsemijoin(BATmirror(elt_qn), qn_seq));
if (elt_seq) {
iterc = BATcount(elt_seq);
BBPreclaim(elt_seq);
@@ -778,7 +778,7 @@
/* for XRPC response messages we add all participants to the
participant bat */
BAT *qn_part = BATselect(qn_uri_loc, XRPC_NS"|participant",
XRPC_NS"|participant");
if (qn_part) {
- BAT *elt_part = BATmirror(BATsemijoin(BATmirror(elt_qn),
qn_part));
+ BAT *elt_part = BATmirror(BATsemijoin(BATmirror(elt_qn),qn_part));
if (elt_part) {
BATiter pi = bat_iterator(elt_part);
BUN p,q;
@@ -1005,7 +1005,7 @@
BAT *shredBAT)
{
int ret = GDK_SUCCEED;
- char errbuf[GDKMAXERRLEN], errbuf_all[GDKMAXERRLEN*2], *errbuf_bak =
GDKerrbuf;
+ char errbuf[GDKMAXERRLEN], errbuf_all[GDKMAXERRLEN*2], *errbuf_bak =
GDKerrbuf;
/* Possible values of flags:
* 0: xml-noheader-xrpc
@@ -1023,8 +1023,8 @@
* Get generated MIL code in "/tmp/xrpc.mil", hence, this option
* is not portable.
*/
- *errbuf = 0;
- GDKsetbuf(errbuf);
+ *errbuf = 0;
+ GDKsetbuf(errbuf);
char *err = xquery_method(mc, flags, req->mode, req->module,
req->location, req->method, req->qid, req->caller, req->timeout,
req->argc, req->iterc, req->argcnt, req->argtpe, req->argval,
-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins