Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv13386/runtime
Modified Files:
Tag: xrpcdemo
pathfinder.mx serialize.mx serialize_dflt.mx xrpc_client.mx
xrpc_server.mx
Log Message:
ok, some things start to work now regarding
- read-only repeatable read
- XRPC message tracing
not fully tested yet, though..
U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.45.4.8
retrieving revision 1.45.4.9
diff -u -d -r1.45.4.8 -r1.45.4.9
--- xrpc_client.mx 7 Jun 2008 00:38:28 -0000 1.45.4.8
+++ xrpc_client.mx 8 Jun 2008 01:02:38 -0000 1.45.4.9
@@ -920,14 +920,17 @@
for (my_iter = 0; my_iter < iterc; my_iter++) {
if (arity == 0) {
- assert(stream_write(bs, "<xrpc:call/>", 1, 12) == 12);
+ ret = stream_write(bs, "<xrpc:call/>", 1, 12);
+ if (ret != 12) return clean_up(bs, argcnt, iterc);
continue;
}
- assert(stream_write(bs, "<xrpc:call>", 1, 11) == 11); /* start an
iteration */
+ ret = stream_write(bs, "<xrpc:call>", 1, 11); /* start an iteration */
+ if (ret != 11) return clean_up(bs, argcnt, iterc);
for (my_argc = 0; my_argc < arity; my_argc++) {
if (argcnt[my_iter][my_argc] == 0) {
- assert(stream_write(bs, "<xrpc:sequence/>", 1, 16) == 16);
+ ret = stream_write(bs, "<xrpc:sequence/>", 1, 16);
+ if (ret != 16) return clean_up(bs, argcnt, iterc);
continue;
}
arg_offset = 0;
@@ -943,7 +946,8 @@
/* now 'arg_offset' contains the start position of the
* values of my_iter and my_argc in the fun_* BATs */
- assert(stream_write(bs, "<xrpc:sequence>", 1, 15) == 15); /* start
a parameter */
+ ret = stream_write(bs, "<xrpc:sequence>", 1, 15); /* start a
parameter */
+ if (ret != 15) return clean_up(bs, argcnt, iterc);
int k;
chr elem_kind = 0;
oid item = 0;
@@ -961,32 +965,32 @@
ret = stream_printf(bs,
"<xrpc:atomic-value
xsi:type=\"xs:boolean\">%s</xrpc:atomic-value>",
intVals[item] == TRUE ? "true" : "false");
- assert(ret > 0);
+ if (ret <= 0) return clean_up(bs, argcnt, iterc);
break;
case INT:
ret = stream_printf(bs,
"<xrpc:atomic-value
xsi:type=\"xs:integer\">%lld</xrpc:atomic-value>",
intVals[item]);
- assert(ret > 0);
+ if (ret <= 0) return clean_up(bs, argcnt, iterc);
break;
case DEC:
ret = stream_printf(bs,
"<xrpc:atomic-value
xsi:type=\"xs:decimal\">%f</xrpc:atomic-value>",
dblVals[item]);
- assert(ret > 0);
+ if (ret <= 0) return clean_up(bs, argcnt, iterc);
break;
case DBL:
ret = stream_printf(bs,
"<xrpc:atomic-value
xsi:type=\"xs:double\">%f</xrpc:atomic-value>",
dblVals[item]);
- assert(ret > 0);
+ if (ret <= 0) return clean_up(bs, argcnt, iterc);
break;
case STR:
case U_A:
ret = stream_printf(bs,
"<xrpc:atomic-value
xsi:type=\"xs:string\">%s</xrpc:atomic-value>",
(char*)BUNtail(strValsi, item));
- assert(ret > 0);
+ if (ret <= 0) return clean_up(bs, argcnt, iterc);
break;
case ATTR:
{
@@ -1015,13 +1019,14 @@
a = *(oid*)BUNtail(attr_propi, i);
char *attr_val = (char*)BUNtail(prop_vali, a);
- assert(stream_write(bs, "<xrpc:attribute ", 1, 16) ==
16);
+ ret = stream_write(bs, "<xrpc:attribute ", 1, 16);
+ if (ret != 16) return clean_up(bs, argcnt, iterc);
if (prefix && *prefix){
ret = stream_printf(bs, "%s:", prefix);
- assert(ret > 0);
+ if (ret <= 0) return clean_up(bs, argcnt, iterc);
}
ret = stream_printf(bs, "%s=\"%s\"/>", loc, attr_val);
- assert(ret > 0);
+ if (ret <= 0) return clean_up(bs, argcnt, iterc);
break;
}
case ELEM:
@@ -1032,15 +1037,20 @@
return clean_up(bs, argcnt, iterc);
}
switch(elem_kind) {
- case 0: assert(stream_write(bs, "<xrpc:element>",
1, 14) == 14);
+ case 0: ret = stream_write(bs, "<xrpc:element>",
1, 14);
+ if (ret != 14) return clean_up(bs, argcnt,
iterc);
break;
- case 1: assert(stream_write(bs, "<xrpc:text>", 1,
11) == 11);
+ case 1: ret = stream_write(bs, "<xrpc:text>", 1,
11);
+ if (ret != 11) return clean_up(bs, argcnt,
iterc);
break;
- case 2: assert(stream_write(bs, "<xrpc:comment>",
1, 14) == 14);
+ case 2: ret = stream_write(bs, "<xrpc:comment>",
1, 14);
+ if (ret != 14) return clean_up(bs, argcnt,
iterc);
break;
- case 3: assert(stream_write(bs,
"<xrpc:processing-instruction>", 1, 29) == 29);
+ case 3: ret = stream_write(bs,
"<xrpc:processing-instruction>", 1, 29);
+ if (ret != 29) return clean_up(bs, argcnt,
iterc);
break;
- case 4: assert(stream_write(bs, "<xrpc:document>",
1, 15) == 15);
+ case 4: ret = stream_write(bs, "<xrpc:document>",
1, 15);
+ if (ret != 15) return clean_up(bs, argcnt,
iterc);
break;
case 5: /* COLLECTION: nothing to be done, here */
break;
default:
@@ -1076,15 +1086,20 @@
}
switch(elem_kind) {
- case 0: assert(stream_write(bs, "</xrpc:element>",
1, 15) == 15);
+ case 0: ret = stream_write(bs, "</xrpc:element>",
1, 15);
+ if (ret != 15) return clean_up(bs, argcnt,
iterc);
break;
- case 1: assert(stream_write(bs, "</xrpc:text>", 1,
12) == 12);
+ case 1: ret = stream_write(bs, "</xrpc:text>", 1,
12);
+ if (ret != 12) return clean_up(bs, argcnt,
iterc);
break;
- case 2: assert(stream_write(bs, "</xrpc:comment>",
1, 15) == 15);
+ case 2: ret = stream_write(bs, "</xrpc:comment>",
1, 15);
+ if (ret != 15) return clean_up(bs, argcnt,
iterc);
break;
- case 3: assert(stream_write(bs,
"</xrpc:processing-instruction>", 1, 30) == 30);
+ case 3: ret = stream_write(bs,
"</xrpc:processing-instruction>", 1, 30);
+ if (ret != 30) return clean_up(bs, argcnt,
iterc);
break;
- case 4: assert(stream_write(bs,
"</xrpc:document>", 1, 16) == 16);
+ case 4: ret = stream_write(bs, "</xrpc:document>",
1, 16);
+ if (ret != 16) return clean_up(bs, argcnt,
iterc);
break;
case 5: /* COLLECTION: nothing to be done, here */
break;
default:
@@ -1098,11 +1113,14 @@
return clean_up(bs, argcnt, iterc);
}
}
- assert(stream_write(bs, "</xrpc:sequence>", 1, 16) == 16);
+ ret = stream_write(bs, "</xrpc:sequence>", 1, 16);
+ if (ret != 16) return clean_up(bs, argcnt, iterc);
}
- assert(stream_write(bs, "</xrpc:call>", 1, 12) == 12);
+ ret = stream_write(bs, "</xrpc:call>", 1, 12);
+ if (ret != 12) return clean_up(bs, argcnt, iterc);
}
- assert(stream_write(bs, "</xrpc:request></env:Body></env:Envelope>\n", 1,
42) == 42);
+ ret = stream_write(bs, "</xrpc:request></env:Body></env:Envelope>\n", 1,
42);
+ if (ret != 42) return clean_up(bs, argcnt, iterc);
/* Stop timing Client Serialisation */
time_xrpcClntSeria = GDKusec() - time_xrpcClntSeria;
if (timing) {
U serialize_dflt.mx
Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.46.4.6
retrieving revision 1.46.4.7
diff -u -d -r1.46.4.6 -r1.46.4.7
--- serialize_dflt.mx 7 Jun 2008 00:38:27 -0000 1.46.4.6
+++ serialize_dflt.mx 8 Jun 2008 01:02:38 -0000 1.46.4.7
@@ -1001,7 +1001,7 @@
return PROBLEM;
}
len = strlen(HTTP_200_OK);
- ret = stream_write(ctx->out, HTTP_200_OK, 1, len);
+ ret = stream_write(GDKout, HTTP_200_OK, 1, len);
if(ret != len) return PROBLEM;
len = strlen(SOAP_ENVELOPE);
@@ -1023,10 +1023,8 @@
/* XRPC participants lists for nested transactions - append an extra
item for this call */
if(ret >= 0)
ret = stream_printf(ctx->out,
- "<xrpc:participants>\n <xrpc:participant>%s,%s,"
- LLFMT ",%s:%d," LLFMT "</xrpc:participant>\n",
- ctx->xrpc_caller, xrpc_hostname, xrpc_port,
ctx->xrpc_seqnr,
- ctx->xrpc_method, time_exec);
+ "<xrpc:participants>\n
<xrpc:participant>%s,%s:%d:"LLFMT",%s,"LLFMT"</xrpc:participant>\n",
+ ctx->xrpc_caller, xrpc_hostname, xrpc_port, ctx->xrpc_seqnr,
ctx->xrpc_method, time_exec);
/* print all nested calls (obtained from response messages) */
bi = bat_iterator(b);
U serialize.mx
Index: serialize.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize.mx,v
retrieving revision 1.109.4.2
retrieving revision 1.109.4.3
diff -u -d -r1.109.4.2 -r1.109.4.3
--- serialize.mx 5 Jun 2008 12:55:20 -0000 1.109.4.2
+++ serialize.mx 8 Jun 2008 01:02:37 -0000 1.109.4.3
@@ -300,6 +300,8 @@
lng *xrpc_timeout,
lng *xrpc_start);
+stream* xquery_print_trace(str msg, lng seqnr);
+
#endif /* SERIALIZE_H */
@c
@@ -1971,6 +1973,25 @@
return GDK_SUCCEED;
}
+stream*
+xquery_print_trace(str msg, lng xrpc_seqnr) {
+ char logdir[1024], logfile[1024], *docroot = GDKgetenv("datadir");
+ stream* fp;
+ if (docroot) {
+ snprintf(logdir, 1024, "%s%cMonetDB%cxrpc%clogs%c", docroot, DIR_SEP,
DIR_SEP, DIR_SEP, DIR_SEP);
+ snprintf(logfile, 1024, "%s%s_" LLFMT ".xml", logdir, msg, xrpc_seqnr);
+ GDKcreatedir(logdir);
+ fp = open_wastream(logfile);
+ }
+ if (!fp || stream_errnr(fp)) {
+ GDKerror("print_result: could not open logfile %s for writing\n",
logfile);
+ if (fp) stream_destroy(fp);
+ return NULL;
+ }
+ return fp;
+}
+
+
int
xquery_print_result_DRIVER (
str mode,
@@ -1996,20 +2017,15 @@
lng *xrpc_start)
{
stream *fp = GDKout;
+ int ret, trace = xrpc_method && *xrpc_method && xrpc_mode &&
strstr(xrpc_mode, "trace");
(void) driverFun;
(void) driverArg;
- if (xrpc_mode && strstr(xrpc_mode, "trace")) {
- char logfile[1024], *docroot = GDKgetenv("xrpc_http_docroot");
- snprintf(logfile, 1024, "%s%clogs%cres_" LLFMT ".xml", docroot,
DIR_SEP, DIR_SEP, *xrpc_seqnr);
- stream *logstream = open_wstream(logfile);
- fp = open_teestream(fp, logstream);
- if (!logstream || !fp) {
- GDKerror("print_result: could not open logfile %s for writing\n",
logfile);
- if (logstream) stream_destroy(logstream);
- return GDK_FAIL;
- }
+ if (trace) {
+ stream* logstream = xquery_print_trace("res",*xrpc_seqnr);
+ fp = logstream?open_teestream(fp, logstream):NULL;
+ if (fp == NULL) return GDK_FAIL;
}
- return xquery_print_result_driver (
+ ret = xquery_print_result_driver (
fp,
mode,
driverFun, /* set of printing callback function */
@@ -2031,6 +2047,12 @@
xrpc_seqnr,
xrpc_timeout,
xrpc_start);
+
+ if (trace) {
+ stream_close(fp);
+ stream_destroy(fp);
+ }
+ return ret;
}
#define FNDEEPEQTRACE 0
U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.4.8
retrieving revision 1.68.4.9
diff -u -d -r1.68.4.8 -r1.68.4.9
--- xrpc_server.mx 7 Jun 2008 00:38:29 -0000 1.68.4.8
+++ xrpc_server.mx 8 Jun 2008 01:02:39 -0000 1.68.4.9
@@ -257,6 +257,7 @@
typedef struct {
char *qid;
char *caller;
+ lng seqnr;
lng start;
lng timeout;
char *mode;
@@ -291,7 +292,7 @@
xrpc_server_export BAT *xrpc_timeouts;
xrpc_server_export BAT *xrpc_statuses;
xrpc_server_export BAT *xrpc_locks;
-xrpc_server_export BAT *xrpc_wsids;
+xrpc_server_export BAT *xrpc_wsbats;
xrpc_server_export BAT *xrpc_trusted;
xrpc_server_export BAT *xrpc_admin;
@@ -361,6 +362,7 @@
}
}
+lng xrpc_reqnr = 1000000;
XRPCreq_t *
XRPCreq_new(
@@ -380,6 +382,7 @@
XRPCreq_t *req = (XRPCreq_t*) GDKmalloc(sizeof(XRPCreq_t));
if(!req) return NULL;
+ req->seqnr = 0;
req->qid = qid;
req->caller = caller;
req->timeout = timeout;
@@ -534,13 +537,8 @@
len_x += strlen(prefix) + 4;
}
err = alloca(len + len_x);
- if (err) {
- len += len_x;
- } else {
- len_x = 0;
- err = alloca(len);
- }
assert(err);
+ len += len_x;
pos += snprintf(err+pos, len-pos, "%s", msg);
if (!len_x) {
pos += snprintf(err+pos, len-pos, " '%s'.", location);
@@ -1025,7 +1023,7 @@
*/
*errbuf = 0;
GDKsetbuf(errbuf);
- char *err = xquery_method(mc, flags, req->mode, req->module,
+ char *err = xquery_method(mc, flags, &(req->seqnr), req->mode,
req->module,
req->location, req->method, req->qid, req->caller, req->timeout,
req->argc, req->iterc, req->argcnt, req->argtpe, req->argval,
req->hasNodeParam?shredBAT:NULL);
@@ -1069,23 +1067,35 @@
BAT *shredBAT;
lng time_xrpcServDeSeria;
int flags = timing|debug;
+ char *msg;
time_xrpcServDeSeria = GDKusec();
- if(!(shredBAT = request2bat(mc->c->fdout, shttpd_get_msg(arg))))
+ if(!(shredBAT = request2bat(mc->c->fdout, msg=shttpd_get_msg(arg))))
return GDK_FAIL;
if(!(req = parse_request(mc->c->fdout, shredBAT, 0))) {
BBPreclaim(shredBAT);
return GDK_FAIL;
}
+
time_xrpcServDeSeria = GDKusec() - time_xrpcServDeSeria;
-
+
/* Execute the query and send XRPC response */
if(execQuery(mc, flags, req, shredBAT) == GDK_FAIL) {
BBPreclaim(shredBAT);
XRPCreq_free(req);
return GDK_FAIL;
}
+ if (req->mode && strstr(req->mode,"trace")) {
+ stream *logstream = xquery_print_trace("req", req->seqnr);
+ if (logstream == NULL) return GDK_FAIL;
+ if (stream_errnr(logstream) || stream_write(logstream, msg, 1,
strlen(msg)) <= 0) {
+ stream_destroy(logstream);
+ return GDK_FAIL;
+ }
+ stream_close(logstream);
+ stream_destroy(logstream);
+ }
if (timing) {
fprintf(stdout,
@@ -1368,7 +1378,7 @@
BATseqbase([EMAIL PROTECTED], 1);
BBPrename([EMAIL PROTECTED]>batCacheid, "[EMAIL PROTECTED]");
@c
-BAT *xrpc_qids = NULL, *xrpc_statuses = NULL, *xrpc_timeouts = NULL,
*xrpc_locks = NULL, *xrpc_wsids = NULL;
+BAT *xrpc_qids = NULL, *xrpc_statuses = NULL, *xrpc_timeouts = NULL,
*xrpc_locks = NULL, *xrpc_wsbats = NULL;
BAT *xrpc_trusted = NULL, *xrpc_admin = NULL, *xrpc_user = NULL;
bat* xrpc_prelude(void) {
@@ -1376,7 +1386,7 @@
@:xrpc_bat(statuses,str)@
@:xrpc_bat(timeouts,lng)@
@:xrpc_bat(locks,lock)@
- @:xrpc_bat(wsids,lng)@
+ @:xrpc_bat(wsbats,bat)@
@:xrpc_bat(trusted,str)@
@:xrpc_bat(admin,str)@
@:xrpc_bat(user,str)@
U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.10
retrieving revision 1.416.2.1.2.11
diff -u -d -r1.416.2.1.2.10 -r1.416.2.1.2.11
--- pathfinder.mx 7 Jun 2008 00:38:26 -0000 1.416.2.1.2.10
+++ pathfinder.mx 8 Jun 2008 01:02:34 -0000 1.416.2.1.2.11
@@ -509,27 +509,27 @@
# meta-data BATs for multi-request XRPC transactions
var xrpc_qids := bat("xrpc_qids"); # bat[void,str] query-id
(host|timestamp)
var xrpc_timeouts := bat("xrpc_timeouts"); # bat[void,lng] query timeout in
usec
-var xrpc_wsids := bat("xrpc_wsids"); # bat[void,lng] ID of the
workingset associated with this query
+var xrpc_wsbats := bat("xrpc_wsbats"); # bat[void,lng] ID of the
workingset associated with this query
var xrpc_locks := bat("xrpc_locks"); # bat[void,lock] lock for the
workingset of this query
var xrpc_statuses := bat("xrpc_statuses"); # bat[void,str] 2PC status of this
query: "exec", "wait", "prepare", "commit", "abort" or "timeout"
var xrpc_lock := pflock_get(6); # master XRPC lock, to protect above
xrpc_* BATs
var xrpc_hostport := my_hostname() + ":" + str(get_xrpc_port());
-var xrpc_querynr := 9999999LL;
+var xrpc_querynr := 999999LL;
# abort a 2PC transaction, keeping the record (note ws should have been
destroyed already)
PROC _ws_xrpc_abort(oid idx, str status) : void
{
var wslock := xrpc_locks.find(idx);
- var ws := bat(str(xrpc_wsids.find(idx)));
+ var ws := xrpc_wsbats.find(idx);
ws_destroy(ws);
xrpc_statuses.inplace(idx, status);
- xrpc_wsids.inplace(idx, lng_nil);
+ xrpc_wsbats.inplace(idx, bat("xrpc_statuses"));
xrpc_locks.inplace(idx, lock_nil);
lock_destroy(wslock);
}
# end a 2PC request. On success, unlock ws and keep waiting for more. On
failure, abort it.
-PROC _ws_xrpc_end(lng wsid, str errmsg) : void
+PROC _ws_xrpc_end(str xrpc_qid, str errmsg) : void
{
var idx := reverse(xrpc_qids).find(xrpc_qid);
if (isnil(errmsg)) {
@@ -545,7 +545,7 @@
{
var lim := usec();
var timeout :=
mirror(xrpc_statuses.ord_select("wait")).leftfetchjoin(xrpc_timeouts).ord_uselect(0LL,lim).project("timeout");
- var aborted :=
mirror(xrpc_wsids.ord_uselect(lng_nil,lng_nil)).leftfetchjoin(xrpc_statuses).ord_select("abort");
+ var aborted :=
mirror(xrpc_locks.ord_uselect(lock_nil,lock_nil)).leftfetchjoin(xrpc_statuses).ord_select("abort");
kunion(aborted,timeout)@batloop() {
_ws_xrpc_abort($h,$t);
@@ -555,7 +555,7 @@
# pruned xrpc requests should all have destroyed wslock and ws (we let
them leak here)
var relevant_qids := relevant.leftfetchjoin(xrpc_qids);
xrpc_qids.delete().append(relevant_qids);
var relevant_timeouts := relevant.leftfetchjoin(xrpc_qtimeouts);
xrpc_timeouts.delete().append(relevant_timeouts);
- var relevant_wsids := relevant.leftfetchjoin(xrpc_wsids);
xrpc_wsids.delete().append(relevant_wsids);
+ var relevant_wsbats := relevant.leftfetchjoin(xrpc_wsbats);
xrpc_wsbats.delete().append(relevant_wsbats);
var relevant_locks := relevant.leftfetchjoin(xrpc_locks);
xrpc_locks.delete().append(relevant_locks);
var relevant_statuses := relevant.leftfetchjoin(xrpc_statuses);
xrpc_statuses.delete().append(relevant_statuses);
}
@@ -563,7 +563,7 @@
PROC xrpc_status() : void {
lock_set(xrpc_lock);
- var err :=
CATCH(print(xrpc_qids,xrpc_statuses,xrpc_timeouts,xrpc_locks,xrpc_wsids));
+ var err :=
CATCH(print(xrpc_qids,xrpc_statuses,xrpc_timeouts,xrpc_locks,xrpc_wsbats));
lock_unset(xrpc_lock);
if (not(isnil(err))) GDKerror(err);
}
@@ -1120,7 +1120,7 @@
bat[void,bat]. ws-IDs are globally and temporally unique, i.e.
we can talk about a ws-ID even after the transaction has finished.
-The collection pins record which transactions (wsids) are using
+The collection pins record which transactions (wsbats) are using
which bats. This information is deleted only at the end of
the transaction.
@@ -1461,6 +1461,7 @@
{
var xrpc := 0;
xrpc_seqnr := (xrpc_querynr :+= 1LL);
+ xrpc_coord := false;
if (xrpc_mode.search("repeatable") >= 0) {
xrpc := -(int(count(xrpc_qids))+1); # default: create a new ws
if (xrpc_qid = "") {
@@ -1478,7 +1479,7 @@
{
var ws;
if (xrpc > 0) {
- ws := bat(str(xrpc_wsids.find(idx)));
+ ws := xrpc_wsbats.find(idx);
xrpc_statuses.inplace(idx,"exec");
} else {
var id := and(lng(newoid(1)), 2147483647LL);
@@ -1489,7 +1490,7 @@
var wslock := lock_create();
xrpc_qids.append(xrpc_qid);
xrpc_timeouts.append(usec() + *(1000LL, xrpc_timeout));
- xrpc_wsids.append(wsid);
+ xrpc_wsbats.append(ws);
xrpc_locks.append(wslock);
xrpc_statuses.append("exec");
}
@@ -1503,7 +1504,12 @@
var xrpc_seqnr := 1LL;
var xrpc_timeout := 30000LL;
var xrpc_mode := "repeatable";
-var ws, wsid, xrpc, idx; ws := _ws_new(xrpc := _ws_xrpcget(), idx :=
oid(abs(xrpc)), update); wsid := ws_id(ws);
+var xrpc_coord := false;
+var ws, wsid, xrpc, idx;
+xrpc := _ws_xrpcget();
+idx := oid(abs(xrpc));
+ws := _ws_new(xrpc, idx, update);
+wsid := ws_id(ws);
PROC ws_end(BAT[void,bat] ws, str err) : int
{
@@ -1514,7 +1520,7 @@
return 1;
}
lock_set(xrpc_lock);
- CATCH(_ws_xrpc_end(ws_log_wsid, err));
+ CATCH(_ws_xrpc_end(xrpc_qid, err));
lock_unset(xrpc_lock);
return 2; # do not auto-restart 2PC transactions ever
}
@@ -4187,7 +4193,7 @@
/* exports for XRPC */
pathfinder_export void xquery_client_engine(mapi_client*);
-pathfinder_export char* xquery_method(mapi_client*, int, char*, char*, char*,
char*, char*, char*, lng, lng, lng, lng**, int*, str*, BAT*);
+pathfinder_export char* xquery_method(mapi_client*, int, lng*, char*, char*,
char*, char*, char*, char*, lng, lng, lng, lng**, int*, str*, BAT*);
pathfinder_export void xquery_client_end(mapi_client *, char *);
pathfinder_export char* xquery_parse_val(int, char*, BAT*, BAT*, BAT* , BAT* ,
BAT* , char*, oid);
@@ -4402,6 +4408,7 @@
char **xrpc_method;
char **xrpc_mode;
lng *xrpc_timeout;
+ lng *xrpc_seqnr;
char **xrpc_qid;
char **xrpc_caller;
@@ -4696,8 +4703,8 @@
* - resolve a method call in the current xquery context (return NULL if not
resolved)
*
* char*
- * xquery_function_call(xquery_client *ctx, lng usec, char *ns, char *module,
char *method,
- * char* qid, char* caller, lng timeout, char* mode,
+ * xquery_function_call(xquery_client *ctx, lng usec, lng *seqnr, char *ns,
char *module, char *method,
+ * lng seqnr, char* qid, char* caller, lng timeout, char*
mode,
* int argc, int itercnt, int** argcnt, int* argtpe,
char** argval, BAT *shredBAT)
* - call a function ns:method(). try to use the function cache (ie re-use a
cached MIL tree).
* otherwise generate MIL yourself, interpret it (and cache it). Returns
error string (NULL if ok).
@@ -4912,6 +4919,7 @@
char *ns,
char *module,
char *method,
+ lng *seqnr,
char *qid,
char *caller,
lng timeout,
@@ -5064,6 +5072,7 @@
/* Done preparing the query. Time to (re-)execute the MIL tree */
if (xquery_tree_exec(ctx, prepfun->lt, 1)) {
+ *seqnr = (*ctx->xrpc_seqnr);
return NULL;
}
return xquery_function_error;
@@ -5329,17 +5338,20 @@
return "xquery_client_alloc: failed to execute init script.\n";
@:find_var(genType,str,sval)@
+
@:find_var(xrpc_shredBAT,int,ival)@
- @:find_var(xrpc_method,str,sval)@
@:find_var(xrpc_module,str,sval)@
+ @:find_var(xrpc_method,str,sval)@
+ @:find_var(xrpc_caller,str,sval)@
+ @:find_var(xrpc_qid,str,sval)@
@:find_var(xrpc_mode,str,sval)@
+ @:find_var(xrpc_seqnr,lng,lval)@
@:find_var(xrpc_timeout,lng,lval)@
- @:find_var(xrpc_qid,str,sval)@
- @:find_var(xrpc_caller,str,sval)@
- @:find_var(time_compile,lng,lval)@
+
@:find_var(time_exec,lng,lval)@
@:find_var(time_print,lng,lval)@
@:find_var(time_shred,lng,lval)@
+ @:find_var(time_compile,lng,lval)@
@:find_bat(proc_vid)@
@:find_bat(var_usage)@
@@ -5526,7 +5538,12 @@
BATclear(ctx->str_values);
BUNappend(ctx->str_values, "", FALSE);
*ctx->xrpc_shredBAT = int_nil;
-
+ (*ctx->xrpc_qid)[0] = 0;
+ (*ctx->xrpc_caller)[0] = 0;
+ (*ctx->xrpc_module)[0] = 0;
+ (*ctx->xrpc_method)[0] = 0;
+ (*ctx->xrpc_mode)[0] = 0;
+ *ctx->xrpc_timeout = 30000LL;
MT_set_lock(pf_cache_lock, "xquery_client_reset");
/* only deactivate the loaded modules */
@@ -5934,7 +5951,7 @@
lng* cnt_ptr = cnt;
char nsbak = *nsend, locbak = *locend;
*nsend = 0; *locend = 0;
- err = xquery_function_call(ctx, usec, ns, url1, loc, "",
"", 0LL, "", argc, 1, &cnt_ptr, tpe, param, NULL);
+ err = xquery_function_call(ctx, usec, ns, url1, loc, 0LL,
"", "", 0LL, "", argc, 1, &cnt_ptr, tpe, param, NULL);
*nsend = nsbak; *locend = locbak;
}
}
@@ -6285,6 +6302,7 @@
char*
xquery_method(mapi_client *mc,
int flags,
+ lng *seqnr,
char* mode,
char* module,
char* location,
@@ -6325,7 +6343,7 @@
}
if (err == NULL) {
- err = xquery_function_call(ctx, usec, ns, module, method, qid, caller,
timeout, mode, argc, itercnt, argcnt, argtpe, argval, shredBAT);
+ err = xquery_function_call(ctx, usec, ns, module, method, seqnr, qid,
caller, timeout, mode, argc, itercnt, argcnt, argtpe, argval, shredBAT);
if (err == (char*) -1) err = "xquery_method: function could not be
resolved.\n";
else if (err == xquery_function_error) err =
xquery_nondescriptive_error;
}
-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins