Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv2219/runtime
Modified Files:
Tag: xrpcdemo
pathfinder.mx serialize.mx xrpc_server.mx
Log Message:
- some more work on XRPC, 2pc and nested read-only calls are a bit
tested and seem to work
U serialize.mx
Index: serialize.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize.mx,v
retrieving revision 1.109.4.3
retrieving revision 1.109.4.4
diff -u -d -r1.109.4.3 -r1.109.4.4
--- serialize.mx 8 Jun 2008 01:02:37 -0000 1.109.4.3
+++ serialize.mx 9 Jun 2008 05:27:18 -0000 1.109.4.4
@@ -2022,7 +2022,7 @@
(void) driverArg;
if (trace) {
stream* logstream = xquery_print_trace("res",*xrpc_seqnr);
- fp = logstream?open_teestream(fp, logstream):NULL;
+ fp = logstream?attach_teestream(fp, logstream):NULL;
if (fp == NULL) return GDK_FAIL;
}
ret = xquery_print_result_driver (
@@ -2048,10 +2048,7 @@
xrpc_timeout,
xrpc_start);
- if (trace) {
- stream_close(fp);
- stream_destroy(fp);
- }
+ if (trace) detach_teestream(fp);
return ret;
}
U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.4.10
retrieving revision 1.68.4.11
diff -u -d -r1.68.4.10 -r1.68.4.11
--- xrpc_server.mx 8 Jun 2008 09:01:22 -0000 1.68.4.10
+++ xrpc_server.mx 9 Jun 2008 05:27:18 -0000 1.68.4.11
@@ -208,7 +208,7 @@
" xsi:schemaLocation=\"" XRPC_NS " " XRPC_LOC "\">"
#define XRPC_WS_QID\
- "<wscoor:CoordinationContext xmlns:wscoor=\""WSCOOR_NS"\""\
+ "<wscoor:CoordinationContext xmlns:wscoor=\""WSCOOR_NS"\" "\
"env:mustUnderstand=\"true\">"\
"<wscoor:Identifier>%s</wscoor:Identifier>"\
"<wscoor:Expires>"LLFMT"</wscoor:Expires>"\
@@ -533,7 +533,7 @@
BUN p,q;
BATiter xrpc_trustedi = bat_iterator(xrpc_trusted);
BATloop(xrpc_trusted, p, q) {
- char* prefix = BUNhead(xrpc_trustedi, p);
+ char* prefix = BUNtail(xrpc_trustedi, p);
len_x += strlen(prefix) + 4;
}
err = alloca(len + len_x);
@@ -545,7 +545,7 @@
} else {
pos += snprintf(err+pos, len-pos, "\n'%s' not in", location);
BATloop(xrpc_trusted, p, q) {
- char* prefix = BUNhead(xrpc_trustedi, p);
+ char* prefix = BUNtail(xrpc_trustedi, p);
pos += snprintf(err+pos, len-pos, "\n'%s',", prefix);
}
err[pos-1] = '.';
@@ -571,15 +571,6 @@
return -1;
}
-static BAT*
-get_elt_qn(BAT* pre_kind, BAT* pre_prop) {
- int elt = ELEMENT;
- BAT *elt_qn, *tmp = BATselect(pre_kind, &elt, &elt);
- if (!tmp) return NULL;
- elt_qn = BATsemijoin(pre_prop, tmp);
- BBPreclaim(tmp);
- return elt_qn;
-}
XRPCreq_t *
xrpc_parse_message(stream *out,
@@ -590,20 +581,19 @@
XRPCreq_t *req = NULL, *res = NULL;
char* msg = participants?XRPC_RESPONSE:XRPC_REQUEST;
char *module = NULL, *method = NULL, *location = NULL;
- char *qid = NULL, *host = NULL, *caller = "query,0";
- char *querynr_str = NULL, *timeout_str = NULL, *mode_str = NULL;
+ char *mode = NULL, *qid = NULL, *caller = "query,0";
char *arity_str = NULL, *itercnt_str = NULL;
char *pul = NULL, *val = NULL;
int updCall = FALSE;
- lng timeout = 300000, argc = GDK_lng_min, iterc = -1, i = 0, j = 0, k = 0;
+ lng timeout = 30000, argc = GDK_lng_min, iterc = -1, i = 0, j = 0, k = 0;
char errstr[1024];
- BATiter shredBATi, prop_vali, qn_uli;
+ BATiter shredBATi, prop_vali, qn_uli, qn_histi;
BAT *pre_size = NULL, *pre_level = NULL, *pre_kind = NULL, *pre_prop =
NULL;
- BAT *qn_loc = NULL, *qn_uri_loc = NULL;
+ BAT *qn_loc = NULL, *qn_uri_loc = NULL, *qn_histo = NULL;
BAT *prop_val = NULL, *prop_text = NULL;
BAT *attr_own = NULL, *attr_qn = NULL, *attr_prop = NULL;
- BAT *frag_root = NULL, *elt_qn = NULL;
+ BAT *frag_root = NULL;
int *pre_sizeT = NULL; /* Arrays holding the Tail values of some of the
BATs above. */
char *pre_levelT = NULL, *pre_kindT = NULL;
char *text_base = NULL; /* text base of prop_text tail values */
@@ -611,7 +601,7 @@
oid *pre_propT = NULL;
var_t *prop_textT = NULL;
- oid msg_node_pre = 0, qid_node_pre = 0, val_node_pre = 0;
+ oid msg_node_pre = 0, hdr_node_pre = 0, val_node_pre = 0;
oid call_node_pre = 0, seq_node_pre = 0, tpe_node_pre = 0;
oid next_call_node_pre = 0, next_seq_node_pre = 0, next_tpe_node_pre = 0;
oid ao_ptr = 0; /* cursor in the attr_own bat */
@@ -640,6 +630,7 @@
@:getbat(prop_text,PROP_TEXT)@
@:getbat(prop_val,PROP_VAL)@
@:getbat(qn_uri_loc,QN_URI_LOC)@
+ @:getbat(qn_histo,QN_HISTOGRAM)@
@:getbat(qn_loc,QN_LOC)@
@:getbat(frag_root,FRAG_ROOT)@
@:getbat(attr_own,ATTR_OWN)@
@@ -648,6 +639,7 @@
prop_vali = bat_iterator(prop_val);
qn_uli = bat_iterator(qn_uri_loc);
+ qn_histi = bat_iterator(qn_histo);
pre_sizeT = (int*) Tloc(pre_size, BUNfirst(pre_size));
pre_levelT = (char*) Tloc(pre_level, BUNfirst(pre_level));
@@ -663,6 +655,58 @@
frag_root = BATsetaccess(frag_root, BAT_APPEND);
nattrs = BATcount(attr_prop);
+ if (participants) {
+ /* parse a SOAP header for participants (XRPC response case) */
+ if ((hdr_node_pre = get_pre_by_qname(XRPC_NS"|participants",
+ msg_node_pre, msg_node_pre + pre_sizeT[msg_node_pre] +
1,
+ pre_propT, pre_kindT, qn_uri_loc)))
+ {
+ oid hdr_node_end = hdr_node_pre + pre_sizeT[hdr_node_pre];
+ while(++hdr_node_pre <= hdr_node_end) {
+ if (pre_kindT[hdr_node_pre] == ELEMENT) {
+ char *s = (char*) BUNtail(qn_uli, pre_propT[hdr_node_pre]);
+ if (strcmp(s, XRPC_NS"|participant") == 0) {
+ oid end_pre = hdr_node_pre + pre_sizeT[hdr_node_pre];
+ while(++hdr_node_pre <= end_pre) {
+ if (pre_kindT[hdr_node_pre] == TEXT) {
+ s = text_base +
prop_textT[pre_propT[hdr_node_pre]];
+ BUNappend(participants, s, FALSE);
+ }
+ }
+ }
+ }
+ }
+ }
+ } else {
+ /* parse a SOAP header for qid and timeout (XRPC request case)*/
+ if ((hdr_node_pre = get_pre_by_qname(WSCOOR_NS"|CoordinationContext",
+ msg_node_pre, msg_node_pre + pre_sizeT[msg_node_pre] +
1,
+ pre_propT, pre_kindT, qn_uri_loc)))
+ {
+ oid hdr_node_end = hdr_node_pre + pre_sizeT[hdr_node_pre];
+ while(++hdr_node_pre <= hdr_node_end) {
+ if (pre_kindT[hdr_node_pre] == ELEMENT) {
+ char *s = (char*) BUNtail(qn_uli, pre_propT[hdr_node_pre]);
+ int isqid = (strcmp(s, WSCOOR_NS"|Identifier") == 0);
+ if (isqid || strcmp(s, WSCOOR_NS"|Expires") == 0) {
+ oid end_pre = hdr_node_pre + pre_sizeT[hdr_node_pre];
+ while(++hdr_node_pre <= end_pre) {
+ if (pre_kindT[hdr_node_pre] == TEXT) {
+ s = text_base +
prop_textT[pre_propT[hdr_node_pre]];
+ if (isqid) {
+ qid = GDKstrdup(s);
+ } else {
+ timeout = my_strtoll(out, FALSE, s,
"timeout");
+ if (timeout == GDK_lng_min) goto cleanup;
+ }
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
if (!(msg_node_pre = get_pre_by_qname(msg, 2, BATcount(pre_size),
pre_propT, pre_kindT, qn_uri_loc))) {
send_err(out, ERR404, "env:Sender", msg);
@@ -670,8 +714,11 @@
}
call_node_pre = msg_node_pre;
- if (participants == NULL) {
- /* XRPC request message parsing */
+ /* analyze the XRPC request information */
+ if (participants) {
+ argc = 1; /* responses have only a out param (argc=1) ie a result */
+ } else {
+ /* more XRPC request message parsing: the XRPC Request in the body
contains module/method etc info */
while(ao_ptr < nattrs && attr_ownT[ao_ptr] < msg_node_pre) ao_ptr++;
while(ao_ptr < nattrs && attr_ownT[ao_ptr] == msg_node_pre) {
pul = (char*) BUNtail(qn_uli, BUNfirst(qn_uri_loc) +
attr_qnT[ao_ptr]);
@@ -687,7 +734,7 @@
} else if(strcmp(pul, XRPC_NS"|iter-count") == 0) {
itercnt_str = val;
} else if(strcmp(pul, XRPC_NS"|mode") == 0) {
- mode_str = val;
+ mode = val;
} else if(strcmp(pul, XRPC_NS"|caller") == 0) {
caller = val;
} else if(strcmp(pul,XRPC_NS"|updCall") == 0) {
@@ -710,93 +757,21 @@
if(iterc == GDK_lng_min) goto cleanup;
}
- /* Does this request require any isolation support? */
- if ((qid_node_pre = get_pre_by_qname(XRPC_NS"|queryID",
- msg_node_pre, msg_node_pre + pre_sizeT[msg_node_pre] +
1,
- pre_propT, pre_kindT, qn_uri_loc))) {
- while(ao_ptr < nattrs && attr_ownT[ao_ptr] < qid_node_pre)
ao_ptr++;
- while(ao_ptr < nattrs && attr_ownT[ao_ptr] == qid_node_pre) {
- pul = (char*) BUNtail(qn_uli, BUNfirst(qn_uri_loc) +
attr_qnT[ao_ptr]);
- val = (char*) BUNtail(prop_vali, BUNfirst(prop_val) +
attr_propT[ao_ptr]);
- if(strcmp(pul, XRPC_NS"|host") == 0) {
- host = val;
- } else if(strcmp(pul, XRPC_NS"|querynr") == 0) {
- querynr_str = val;
- } else if(strcmp(pul, XRPC_NS"|timeout") == 0) {
- timeout_str = val;
- timeout = my_strtoll(out, FALSE, timeout_str, "timeout");
- if (timeout == GDK_lng_min) goto cleanup;
- } else {
- snprintf(errstr, 1024,
- "Invalid attribute for the \""XRPC_NS":queryID\"
element: %s\n", pul);
- send_err(out, ERR404, "env:Sender", errstr);
- goto cleanup;
- }
- ao_ptr++;
- }
- if(!(host && querynr_str && timeout_str)) {
- send_err(out, ERR404, "env:Sender", "A \""XRPC_NS":queryID\"
element "
- "must contain all three attributes: host, querynr,
timeout\n");
- goto cleanup;
- }
-
- k = strlen(host) + strlen(querynr_str) + 2;
- if(!(qid = GDKmalloc(k))) {
- send_err(out, ERR500, "env:Receiver", OUT_OF_MEM);
- goto cleanup;
- }
-
- val = qid;
- pul = qid + k;
- while(*host && val < pul) *val++ = *host++;
- *val++ = '|';
- while(*querynr_str && val < pul) *val++ = *querynr_str++;
- *val = '\0';
- }
call_node_pre = get_pre_by_qname(XRPC_NS"|call",
MAX(call_node_pre,msg_node_pre),
msg_node_pre + pre_sizeT[msg_node_pre] + 1,
pre_propT, pre_kindT, qn_uri_loc);
argc = my_strtoll(out, FALSE, arity_str, "arity");
+ if (argc == GDK_lng_min) goto cleanup;
}
if (iterc == -1) {
/* if no iterc was found in the request (and always for XRPC
responses), just count sequence elements */
- BAT *qn_seq = BATselect(qn_uri_loc, XRPC_NS"|sequence",
XRPC_NS"|sequence");
- elt_qn = get_elt_qn(pre_kind, pre_prop);
- if (elt_qn && qn_seq) {
- BAT* elt_seq = BATmirror(BATsemijoin(BATmirror(elt_qn), qn_seq));
- if (elt_seq) {
- iterc = BATcount(elt_seq);
- BBPreclaim(elt_seq);
- }
- }
- if (iterc == -1) goto cleanup;
+ BUN p = BUNfnd(BATmirror(qn_uri_loc), XRPC_NS"|sequence");
+ iterc = (p == BUN_NONE)?1:(*(lng*) BUNtail(qn_histi, p) / argc);
}
- if (participants) {
- /* for XRPC response messages we add all participants to the
participant bat */
- BAT *qn_part = BATselect(qn_uri_loc, XRPC_NS"|participant",
XRPC_NS"|participant");
- if (qn_part) {
- BAT *elt_part = BATmirror(BATsemijoin(BATmirror(elt_qn),qn_part));
- if (elt_part) {
- BATiter pi = bat_iterator(elt_part);
- BUN p,q;
- BATloop(elt_part,p,q) {
- oid pre = 1 + *(oid*) BUNhead(pi,p);
- if (pre_kindT[pre] == TEXT) {
- str part = text_base + pre_propT[pre];
- BUNappend(participants, part, FALSE);
- }
- }
- BBPreclaim(elt_part);
- argc = 1;
- }
- BBPreclaim(qn_part);
- }
- }
- if (argc == GDK_lng_min) goto cleanup;
/* the req struct contains all parsed data (we use it also for response
messages) */
- if(!(req = XRPCreq_new(qid, caller, timeout, mode_str, module, method,
location, updCall, iterc, argc))) {
+ if(!(req = XRPCreq_new(qid, caller, timeout, mode, module, method,
location, updCall, iterc, argc))) {
send_err(out, ERR500, "env:Receiver", OUT_OF_MEM);
goto cleanup;
}
@@ -925,7 +900,6 @@
@c
cleanup:
if (req) XRPCreq_free(req);
- if (elt_qn) BBPreclaim(elt_qn);
@:delbat(pre_level)@
@:delbat(pre_prop)@
@:delbat(pre_kind)@
@@ -933,6 +907,7 @@
@:delbat(prop_val)@
@:delbat(qn_uri_loc)@
@:delbat(qn_loc)@
+ @:delbat(qn_histo)@
@:delbat(frag_root)@
@:delbat(attr_own)@
@:delbat(attr_qn)@
@@ -1047,10 +1022,9 @@
/* send empty HTTP OK header for updating request */
stream_write(mc->c->fdout, "HTTP/1.1 200 OK\r\n"
"Content-type: text/xml; charset=\"utf-8\"\r\n\r\n", 1,
60);
+ stream_flush(mc->c->fdout);
}
-
- stream_flush(mc->c->fdout);
- GDKsetbuf(errbuf_bak);
+ GDKsetbuf(errbuf_bak);
return ret;
}
@@ -1319,9 +1293,6 @@
return -1;
}
-/* the thread that checks every 5 seconds for expired queries */
-MT_Id checker;
-
int
CMDrpcd_start(int *port, bit *open, str options)
{
U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.11
retrieving revision 1.416.2.1.2.12
diff -u -d -r1.416.2.1.2.11 -r1.416.2.1.2.12
--- pathfinder.mx 8 Jun 2008 01:02:34 -0000 1.416.2.1.2.11
+++ pathfinder.mx 9 Jun 2008 05:27:17 -0000 1.416.2.1.2.12
@@ -514,7 +514,7 @@
var xrpc_statuses := bat("xrpc_statuses"); # bat[void,str] 2PC status of this
query: "exec", "wait", "prepare", "commit", "abort" or "timeout"
var xrpc_lock := pflock_get(6); # master XRPC lock, to protect above
xrpc_* BATs
var xrpc_hostport := my_hostname() + ":" + str(get_xrpc_port());
-var xrpc_querynr := 999999LL;
+var xrpc_querynr := (lng(get_xrpc_port()) * 100000LL) - 1LL;
# abort a 2PC transaction, keeping the record (note ws should have been
destroyed already)
PROC _ws_xrpc_abort(oid idx, str status) : void
@@ -523,7 +523,7 @@
var ws := xrpc_wsbats.find(idx);
ws_destroy(ws);
xrpc_statuses.inplace(idx, status);
- xrpc_wsbats.inplace(idx, bat("xrpc_statuses"));
+ xrpc_wsbats.inplace(idx, xrpc_statuses); # use xrpc_statuses as a dummy
bat, replacing the ws (that gets freed)
xrpc_locks.inplace(idx, lock_nil);
lock_destroy(wslock);
}
@@ -1498,19 +1498,6 @@
return ws;
}
-
-var update := 0;
-var xrpc_qid := "1";
-var xrpc_seqnr := 1LL;
-var xrpc_timeout := 30000LL;
-var xrpc_mode := "repeatable";
-var xrpc_coord := false;
-var ws, wsid, xrpc, idx;
-xrpc := _ws_xrpcget();
-idx := oid(abs(xrpc));
-ws := _ws_new(xrpc, idx, update);
-wsid := ws_id(ws);
-
PROC ws_end(BAT[void,bat] ws, str err) : int
{
ws_log_wsid := ws_id(ws);
-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins