Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv31646
Modified Files:
Tag: xquery-decomposition
rt_projection.mx xrpc_client.mx xrpc_common.mx xrpc_server.mx
Log Message:
try to reduce the time of serializing xrpc request messages
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.41.2.13
retrieving revision 1.41.2.14
diff -u -d -r1.41.2.13 -r1.41.2.14
--- xrpc_client.mx 12 Mar 2008 02:28:41 -0000 1.41.2.13
+++ xrpc_client.mx 13 Mar 2008 03:12:36 -0000 1.41.2.14
@@ -171,7 +171,7 @@
var pre_prop := ws.fetch(PRE_PROP).fetch(cont).access(BAT_WRITE);
var pre_kind := ws.fetch(PRE_KIND).fetch(cont).access(BAT_WRITE);
- var qn_prefix := ws.fetch(QN_PREFIX).fetch(cont);
+ var qn_prefix_uri_loc := ws.fetch(QN_PREFIX_URI_LOC).fetch(cont);
var qn_uri := ws.fetch(QN_URI).fetch(cont);
var qn_loc := ws.fetch(QN_LOC).fetch(cont);
var prop_text := ws.fetch(PROP_TEXT).fetch(cont);
@@ -181,9 +181,13 @@
var attr_prop := ws.fetch(ATTR_PROP).fetch(cont);
var prop_val := ws.fetch(PROP_VAL).fetch(cont);
+ var xrpc_ns := "http://monetdb.cwi.nl/XQuery";
+ var xsi_ns := "http://www.w3.org/2001/XMLSchema-instance";
+
# Get "pre_prop" value of an "sequence" node.
var seq_prop;
- var err := CATCH ({ seq_prop :=
qn_loc.ord_uselect("sequence").reverse().fetch(0); });
+ var seqQname := "xrpc|" + xrpc_ns + "|sequence";
+ var err := CATCH ({ seq_prop :=
qn_prefix_uri_loc.ord_uselect(seqQname).reverse().fetch(0); });
if (not(isnil(err))) {
if(count(qn_loc.uselect("Fault")) = 0) {
ERROR("get_rpc_res expects at least *one* \"<sequence>\" tag.\n");
@@ -215,8 +219,6 @@
var itercnt := 0;
var tpe := "";
var subtpe := "";
- var xrpc_ns := "http://monetdb.cwi.nl/XQuery";
- var xsi_ns := "http://www.w3.org/2001/XMLSchema-instance";
# To reduce #iter: everything before the first "sequence" is
# not used, so do not iterate over them.
@@ -231,9 +233,8 @@
var prop := pre_prop.fetch(pre);
if ( and(level = seq_node_level, kind='\000') ){
- var loc := qn_loc.fetch(prop);
- var uri := qn_uri.fetch(prop);
- if (and((loc = "sequence"), (uri = xrpc_ns))) {
+ var pul := qn_prefix_uri_loc.fetch(prop);
+ if (pul = seqQname) {
itercnt :+= 1; # A new iteration starts
}
} else if ( and(level = tpe_node_level, kind='\000') ){
@@ -354,11 +355,6 @@
} # END 'if (not(isnil(pre_size.fetch(i))))'
i := i + inc;
} # END 'while(i < size)'
-
- pre_level := ws.fetch(PRE_LEVEL).fetch(cont);
- pre_prop := ws.fetch(PRE_PROP).fetch(cont);
- pre_kind := ws.fetch(PRE_KIND).fetch(cont);
-
return
bat(void,bat,4).append(res_iter).append(res_item).append(res_kind).access(BAT_READ);
}
ADDHELP("get_rpc_res", "zhang", "April 2006",
@@ -605,30 +601,16 @@
"CMDhttp_post: 'used_item' and 'used_kind' must have same
count.\n"); \
}
-#define str2buf(b, str) { \
- unsigned int len = strlen(str) + 1; \
- if (len > (b->len - b->pos)) { \
- GDKerror("CMDhttp_post: buffer overflow\n"); \
- return GDK_FAIL; \
- } \
- len = snprintf((b->buf + b->pos), len, str); \
- b->pos += len; \
-}
-
-#define clean_up(sock, out, bs, b, argcnt, iterc, \
- pm) { \
+#define clean_up(sock, out, bs, argcnt, iterc, pm) { \
lng i = 0; \
if (sock > 0) close(sock); \
if(out){ \
stream_close(out); stream_destroy(out); \
} \
if(bs){ \
+ buffer_destroy(stream_get_buffer(bs)); \
stream_close(bs); stream_destroy(bs); \
} \
- if (b) { \
- if (b->len > 0) buffer_destroy(b); \
- else free(b); \
- } \
if(argcnt) { \
for(i = 0; i < iterc; i++) GDKfree(argcnt[i]); \
GDKfree(argcnt); \
@@ -738,6 +720,51 @@
}
/**
+ * @return bat batID from container contID of given ws, or
+ * NULL by error
+ */
+static INLINE BAT *
+getBatFromContainer(BAT *ws, int batID, int contID)
+{
+ BAT *b = NULL, *batbat = NULL;
+ BUN bun = BUN_NONE, bbun = BUN_NONE;
+ oid CONTid = (oid) contID;
+ oid BATid = (oid) batID;
+ BATiter wsi, batbati;
+
+ assert(ws && contID >=0 && batID >= 0);
+
+ wsi = bat_iterator(ws);
+ BUNfndOID(bun, wsi, (ptr)&BATid);
+ if(bun == BUN_NONE) {
+ GDKerror("getBatFromContainer: NOT FOUND WS[%d]\n",batID);
+ return NULL;
+ }
+ if( !(batbat = BATdescriptor(*(bat*)Tloc(ws,bun))) ) {
+ GDKerror("getBatFromContainer: FETCH BATBAT for WS[%d] FAILED\n",
batID);
+ return NULL;
+ }
+
+ batbati = bat_iterator(batbat);
+ BUNfndOID(bbun, batbati, (ptr)&CONTid);
+ if(bbun == BUN_NONE) {
+ BBPunfix(BBPcacheid(batbat));
+ GDKerror("getBatFromContainer:NOT FOUND WS[%d][%d]\n",
+ batID, contID);
+ return NULL;
+ }
+ if( !(b = BATdescriptor(*(bat*)Tloc(batbat,bbun))) ) {
+ BBPunfix(BBPcacheid(batbat));
+ GDKerror("getBatFromContainer:FETCH WS[%d][%d] FAILED\n",
+ batID, contID);
+ return NULL;
+ }
+
+ BBPunfix(BBPcacheid(batbat));
+ return b;
+}
+
+/**
* Find a node's pre_kind value by finding in the bat
* ws[PRE_KIND][contID] the tail of 'item_index'-th BUN.
*
@@ -747,7 +774,7 @@
* @return the 'pre_kind' value, or
* GDK_chr_min by error
*/
-static chr
+static INLINE chr
getELEMkind(BAT *ws, int contID, oid item_index)
{
BATiter bi;
@@ -755,29 +782,18 @@
BUN bun = BUN_NONE;
chr ret;
- if(!ws) {
- GDKerror("getELEMkind: invalid working set ws.\n");
- return GDK_chr_min;
- }
+ assert(ws);
if(!(b = getBatFromContainer(ws, PRE_KIND, contID)))
return GDK_chr_min;
- if (!b->batCount){
- GDKerror("getELEMkind: ws[PRE_KIND][%d] (%s) is empty.\n",
- b?BBPname(b->batCacheid):"NULL");
- return GDK_chr_min;
- }
-
bi = bat_iterator(b);
BUNfndVOID(bun, bi, &item_index);
if(bun == BUN_NONE) {
- GDKerror("getELEMkind: ws[PRE_KIND][%d] (%s) does not contain "
- "a head value " OIDFMT ".\n",
- b?BBPname(b->batCacheid):"NULL", item_index);
+ GDKerror("getELEMkind: ws[PRE_KIND][%d] (%s) does not contain a
"OIDFMT" head.\n",
+ contID, item_index);
return GDK_chr_min;
}
-
ret = *(chr*)BUNtail(bi,bun);
BBPunfix(BBPcacheid(b));
return ret;
@@ -854,10 +870,9 @@
}
/* We only speak HTTP/1.1 */
- if( ((strptr = strstr(respStatus, "HTTP/1.1 ")) != respStatus) ||
+ if( ret < 13 || (strptr = strstr(respStatus, "HTTP/1.1 ")) != respStatus ||
(respStatus[8] != ' ') || (respStatus[12] != ' ') ||
- /* the '\n' is not returned by stream_readline() */
- (respStatus[ret -1] != '\r') ) {
+ (respStatus[ret -1] != '\r') ) { /* the '\n' is not returned by
stream_readline() */
GDKerror("response2bat: invalid response from %s:%d\n",
host, port);
/* read and print everything we can receive */
@@ -878,7 +893,7 @@
ret = 0; /* end-of-HTTP-header found */
} while (ret > 0);
if (ret < 0) {
- GDKerror("response2bat: failed to receive response from %s:%d",
+ GDKerror("response2bat: failed to receive HTTP response header from
%s:%d",
host, port);
if(errno) GDKerror("response2bat: %s", strerror(errno));
stream_close(in); stream_destroy(in);
@@ -897,7 +912,7 @@
if (ret > 0) b->pos += ret;
} while (ret > 0);
if (ret < 0) {
- GDKerror("response2bat: failed to receive response from %s:%d",
+ GDKerror("response2bat: failed to receive SOAP Fault message from
%s:%d",
host, port);
if(errno) GDKerror("response2bat: %s", strerror(errno));
return NULL;
@@ -947,7 +962,6 @@
static INLINE projection_mapping*
serialize_fragments(
str options,
- buffer *b,
stream *bs,
BAT *ws,
BAT *used_item,
@@ -965,7 +979,7 @@
BAT *uitems = NULL, *ukinds = NULL, *ritems = NULL, *rkinds = NULL;
BAT *pre_nid = NULL;
BUN uitem_base, ukind_base, ritem_base, rkind_base;
- int contIDnew = int_nil;
+ int contIDnew = int_nil, ret = -1;
char printmode[64] = "xml-noheader-root-xrpc:fragment\0";
assert(BATcount(used_item) == BATcount(returned_item)); /* must be aligned
*/
@@ -996,12 +1010,12 @@
ritems = BATdescriptor(*(bat*)BUNtail(ritemsi, ritem_base + i));
rkinds = BATdescriptor(*(bat*)BUNtail(rkindsi, rkind_base + i));
- str2buf(b, "<xrpc:fragments>");
+ ret = stream_write(bs, "<xrpc:fragments>", 1, 16); assert(ret == 16);
contIDnew = runtime_doc_projection2stream(options,
printmode, bs, FALSE, ws,
uitems, ukinds, ritems, rkinds,
int_values, dbl_values, dec_values, str_values);
- str2buf(b, "</xrpc:fragments>");
+ ret = stream_write(bs, "</xrpc:fragments>", 1, 17); assert(ret == 17);
BBPunfix(BBPcacheid(uitems));
BBPunfix(BBPcacheid(ukinds));
BBPunfix(BBPcacheid(ritems));
@@ -1053,9 +1067,8 @@
BAT *dec_values,
BAT *str_values)
{
- int sock = -1, port = -1;
+ int sock = -1, port = -1, ret = -1;
buffer *b = NULL;
- str str_val = NULL;
stream *out = NULL, *bs = NULL;
/* Hold the temp BATs after the RPC response has been shredded: */
BAT *shredBAT = NULL;
@@ -1073,9 +1086,9 @@
/* BATs needed for serializing attributes */
BAT *pre_nid = NULL, *attr_own = NULL, *attr_qn = NULL;
- BAT *qn_prefix = NULL, *qn_uri = NULL, *qn_loc = NULL;
+ BAT *qn_prefix_uri_loc = NULL;
oid *pre_nid_lst = NULL, *attr_own_lst = NULL, *attr_qn_lst = NULL;
- BATiter strValsi, qn_prefixi, qn_urii, qn_loci;
+ BATiter strValsi, qn_puli;
/* BATiter upathi, rpathi; */
/* BUN upath_base, rpath_base; */
@@ -1098,22 +1111,29 @@
/* Start timing Client Serialisation */
time_xrpcClntSeria = GDKusec();
+ fvid_lst = (oid*) Tloc(fun_vid, BUNfirst(fun_vid));
+ fiter_lst = (oid*) Tloc(fun_iter, BUNfirst(fun_iter));
+ fitem_lst = (oid*) Tloc(fun_item, BUNfirst(fun_item));
+ fkind_lst = (int*) Tloc(fun_kind, BUNfirst(fun_kind));
+ intVals = (lng*) Tloc(int_values, BUNfirst(int_values));
+ dblVals = (dbl*) Tloc(dbl_values, BUNfirst(dbl_values));
+ strValsi = bat_iterator(str_values);
+
/* Create buffer for the RPC request message */
b = buffer_create(MAX_BUF_SIZE);
if (!b || b->len == 0) {
GDKerror("CMDhttp_post: failed to create outgoing buffer\n");
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
return GDK_FAIL;
}
- if (!(bs = buffer_wastream(b, "shred_fragments"))) {
+ if (!(bs = buffer_wastream(b, "xrpcrequest"))) {
GDKerror("CMDhttp_post: failed to create buffer_wastream\n");
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
+ buffer_destroy(b);
return GDK_FAIL;
}
if (!(argcnt = GDKmalloc(iterc * sizeof(lng*)))) {
GDKerror("CMDhttp_post: failed to malloc argcnt\n");
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
+ clean_up(sock, out, bs, argcnt, iterc, pm);
return GDK_FAIL;
}
for (i = 0; i < iterc; i++) {
@@ -1121,7 +1141,7 @@
* we have a place to note that a function has zero parameter */
if (!(argcnt[i] = GDKmalloc((argc>0?argc:1) * sizeof(lng)))) {
GDKerror("CMDhttp_post: failed to malloc argcnt[" SZFMT "]\n", i);
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
+ clean_up(sock, out, bs, argcnt, iterc, pm);
return GDK_FAIL;
}
argcnt[i][0] = 0;
@@ -1132,8 +1152,8 @@
/* Calculate the number of values contained by every parameter of
* every iteration. */
- fvid_lst = (oid*) Tloc(fun_vid, BUNfirst(fun_vid));
- fiter_lst = (oid*) Tloc(fun_iter, BUNfirst(fun_iter));
+ /* TODO: can we use something like BATrangesplit() to make this
+ * faster? */
i = a = 0; /* i: iter counter; a: arg counter */
for (cnt = 0; cnt < BATcount(fun_vid); cnt++){
i = fiter_lst[cnt];
@@ -1141,89 +1161,34 @@
argcnt[i-1][a]++;
}
- fvid_lst = (oid*) Tloc(fun_vid, BUNfirst(fun_vid));
- fiter_lst = (oid*) Tloc(fun_iter, BUNfirst(fun_iter));
- fitem_lst = (oid*) Tloc(fun_item, BUNfirst(fun_item));
- fkind_lst = (int*) Tloc(fun_kind, BUNfirst(fun_kind));
- intVals = (lng*) Tloc(int_values, BUNfirst(int_values));
- dblVals = (dbl*) Tloc(dbl_values, BUNfirst(dbl_values));
- strValsi = bat_iterator(str_values);
-
/* soap env header and xrpc request header */
- b->pos = snprintf(b->buf, b->len, XRPC_HEADER,
- SOAP_NS, XRPC_NS, XDT_NS, XS_NS, XSI_NS, XRPC_NS, XRPC_LOC,
- rpc_module, rpc_uri, rpc_method, argc, iterc,
- updCall?"true":"false");
- assert((b->pos > 0) && (b->pos < b->len));
-
+ ret = stream_printf(bs, XRPC_HEADER, SOAP_NS, XRPC_NS, XDT_NS,
+ XS_NS, XSI_NS, XRPC_NS, XRPC_LOC, rpc_module, rpc_uri,
+ rpc_method, argc, iterc, updCall?"true":"false");
+ assert(ret > 0);
- bytes_prj = b->pos;
- if(!(pm = serialize_fragments(options, b, bs, ws,
- used_item, used_kind, returned_item, returned_kind,
- int_values, dbl_values, dec_values, str_values))) {
+ bytes_prj = (stream_get_buffer(bs))->pos;
+ if(!(pm = serialize_fragments(options, bs, ws, used_item, used_kind,
returned_item,
+ returned_kind, int_values, dbl_values, dec_values,
str_values))) {
GDKerror("CMDhttp_post: failed to serialize fragments.\n");
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
+ clean_up(sock, out, bs, argcnt, iterc, pm);
return GDK_FAIL;
}
- if(options && strstr(options, "timing")){
- stream_printf(GDKout,
- "XRPC_Fragments_Size: " SZFMT "
bytes\n\n",
- (b->pos - bytes_prj));
- }
-
-#if 0 /* Temporarily turn it off, as it is not handled */
- /* Serialize used/returned paths */
- i = BATcount(upath_item);
- a = BATcount(rpath_item);
- if(i > 0 || a > 0) {
- str2buf(b, "<xrpc:projection-paths>");
- if(i > 0) {
- upathi = bat_iterator(upath_item);
- upath_base = BUNfirst(upath_item);
- for(cnt = 0; cnt < (size_t) i; cnt++){
- b->pos += snprintf((b->buf + b->pos), (b->len - b->pos),
- "<xrpc:used-path>%s</xrpc:used-path>",
- BUNtail(strValsi, *(oid*)BUNtail(upathi, upath_base +
cnt)));
- }
- }
- if(a > 0) {
- rpathi = bat_iterator(rpath_item);
- rpath_base = BUNfirst(rpath_item);
- for(cnt = 0; cnt < (size_t) a; cnt++){
- b->pos += snprintf((b->buf + b->pos), (b->len - b->pos),
- "<xrpc:returned-path>%s</xrpc:returned-path>",
- BUNtail(strValsi, *(oid*)BUNtail(rpathi, rpath_base +
cnt)));
- }
- }
- str2buf(b, "</xrpc:projection-paths>");
- }
-#endif
+ bytes_prj = (stream_get_buffer(bs))->pos - bytes_prj;
for (my_iter = 0; my_iter < iterc; my_iter++) {
if (argc == 0) {
- str2buf(b, "<xrpc:call/>");
+ assert(stream_write(bs, "<xrpc:call/>", 1, 12) == 12);
continue;
}
- str2buf(b, "<xrpc:call>"); /* start an iteration */
+ assert(stream_write(bs, "<xrpc:call>", 1, 11) == 11); /* start an
iteration */
for (my_argc = 0; my_argc < argc; my_argc++) {
if (argcnt[my_iter][my_argc] == 0) {
- str2buf(b, "<xrpc:sequence/>");
+ assert(stream_write(bs, "<xrpc:sequence/>", 1, 16) == 16);
continue;
}
- /* Enlarge the buffer if it is filled more than 80%. */
- if (b->pos > (b->len * 0.8)) {
- char *bptr = realloc(b->buf, b->len * 2);
- if (!bptr) {
- GDKerror("CMDhttp_post: failed to enlarge request
buffer.\n");
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
- return GDK_FAIL;
- }
- b->buf = bptr;
- b->len *= 2;
- }
-
arg_offset = 0;
/* add the 'argcnt' of all arg-s before 'me': */
for (a = 0; a < my_argc; a++) {
@@ -1237,12 +1202,14 @@
/* now 'arg_offset' contains the start position of the
* values of my_iter and my_argc in the fun_* BATs */
- str2buf(b, "<xrpc:sequence>"); /* start a parameter */
+ assert(stream_write(bs, "<xrpc:sequence>", 1, 15) == 15); /* start
a parameter */
int k;
+ chr elem_kind = 0;
+ oid contID = 0, item = 0, owner = 0;
+ BUN bun;
+ BATiter bi;
+ int cmbn_cont_kind = 0, cur_kind = 0, pmoff = 0;
for (k= 0; k < argcnt[my_iter][my_argc]; k++) {
- int cmbn_cont_kind = 0, cur_kind = 0, contID = 0;
- chr elem_kind = 0;
- oid item, owner;
a = arg_offset + k; /* index into fun_item|fun_kind */
item = fitem_lst[a];
@@ -1252,169 +1219,177 @@
switch (cur_kind) {
case BOOL:
- b->pos += snprintf((b->buf + b->pos), (b->len -
b->pos),
+ ret = stream_printf(bs,
"<xrpc:atomic-value
xsi:type=\"xs:boolean\">%s</xrpc:atomic-value>",
intVals[item] == TRUE ? "true" : "false");
+ assert(ret > 0);
break;
case INT:
- b->pos += snprintf((b->buf + b->pos), (b->len -
b->pos),
+ ret = stream_printf(bs,
"<xrpc:atomic-value
xsi:type=\"xs:integer\">%lld</xrpc:atomic-value>",
intVals[item]);
+ assert(ret > 0);
break;
case DEC:
- b->pos += snprintf((b->buf + b->pos), (b->len -
b->pos),
+ ret = stream_printf(bs,
"<xrpc:atomic-value
xsi:type=\"xs:decimal\">%f</xrpc:atomic-value>",
dblVals[item]);
+ assert(ret > 0);
break;
case DBL:
- b->pos += snprintf((b->buf + b->pos), (b->len -
b->pos),
+ ret = stream_printf(bs,
"<xrpc:atomic-value
xsi:type=\"xs:double\">%f</xrpc:atomic-value>",
dblVals[item]);
+ assert(ret > 0);
break;
case STR:
case U_A:
- str_val = BUNtail(strValsi, item);
- b->pos += snprintf((b->buf + b->pos), (b->len -
b->pos),
- "<xrpc:atomic-value xsi:type=\"xs:string\">%s"
- "</xrpc:atomic-value>", str_val);
+ ret = stream_printf(bs,
+ "<xrpc:atomic-value
xsi:type=\"xs:string\">%s</xrpc:atomic-value>",
+ (char*)BUNtail(strValsi, item));
+ assert(ret > 0);
+ break;
case ATTR:
{
pre_nid = getBatFromContainer(ws, PRE_NID, contID);
attr_own = getBatFromContainer(ws, ATTR_OWN, contID);
attr_qn = getBatFromContainer(ws, ATTR_QN, contID);
- qn_prefix = getBatFromContainer(ws, QN_PREFIX, contID);
- qn_uri = getBatFromContainer(ws, QN_URI, contID);
- qn_loc = getBatFromContainer(ws, QN_LOC, contID);
- if( !pre_nid || !attr_own || !attr_qn || !qn_prefix ||
!qn_uri || !qn_loc ){
+ qn_prefix_uri_loc = getBatFromContainer(ws,
QN_PREFIX_URI_LOC, contID);
+ if( !pre_nid || !attr_own || !attr_qn ||
!qn_prefix_uri_loc){
if(pre_nid) BBPunfix(BBPcacheid(pre_nid));
if(attr_own) BBPunfix(BBPcacheid(attr_own));
if(attr_qn) BBPunfix(BBPcacheid(attr_qn));
- if(qn_prefix) BBPunfix(BBPcacheid(qn_prefix));
- if(qn_uri) BBPunfix(BBPcacheid(qn_uri));
- if(qn_loc) BBPunfix(BBPcacheid(qn_loc));
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
+ if(qn_prefix_uri_loc)
BBPunfix(BBPcacheid(qn_prefix_uri_loc));
+ clean_up(sock, out, bs, argcnt, iterc, pm);
return GDK_FAIL;
}
pre_nid_lst = (oid*) Tloc(pre_nid,
BUNfirst(pre_nid)); /* might be NULL!! */
attr_own_lst = (oid*) Tloc(attr_own,
BUNfirst(attr_own));
attr_qn_lst = (oid*) Tloc(attr_qn, BUNfirst(attr_qn));
- qn_prefixi = bat_iterator(qn_prefix);
- qn_urii = bat_iterator(qn_uri);
- qn_loci = bat_iterator(qn_loc);
+ qn_puli = bat_iterator(qn_prefix_uri_loc);
- owner = attr_own_lst[item]; /* nid of attr_own */
- if(pre_nid_lst) {
- for(cnt = 0; cnt < BATcount(pre_nid); cnt++){
- if(pre_nid_lst[cnt] == owner){
- owner = cnt; /* pre of attr_own */
- break;
- }
- }
- }
+ bi = bat_iterator(attr_own);
+ BUNfndOID(bun, bi, (ptr)&item);
+ assert(bun != BUN_NONE);
+ owner = *(oid*)BUNtail(bi, bun); /* nid of attr_own */
- char *prefix = (char*)BUNtail(qn_prefixi,
attr_qn_lst[item]);
- char *uri = (char*)BUNtail(qn_urii,
attr_qn_lst[item]);
- char *loc = (char*)BUNtail(qn_loci,
attr_qn_lst[item]);
+ bi = bat_iterator(BATmirror(pre_nid));
+ BUNfndOID(bun, bi, (ptr)&owner);
+ assert(bun != BUN_NONE);
+ owner = *(oid*)BUNtail(bi, bun); /* pre of attr_own */
+
+ bi = bat_iterator(attr_qn);
+ BUNfndOID(bun, bi, (ptr)&item);
+ assert(bun != BUN_NONE);
+ oid qn = *(oid*)BUNtail(bi, bun);
+
+ pmoff = -1;
for(i = 0; pm[i].contIDold != oid_nil; i++){
- /* in a projected doc, 'pre' becomes 'nid',
- * so need to find the new 'pre' of the old
- * 'pre' */
- for(a = 0; (size_t) a < BATcount(pm[i].pre_nid);
a++) {
- if(owner == pm[i].pre_nidT[a]) {
- owner = a;
- break;
- }
- }
- if(pm[i].contIDold == (unsigned int)contID){
- if(prefix && *prefix)
- b->pos += snprintf((b->buf + b->pos),
(b->len - b->pos),
- "<xrpc:attribute
xrpc:fragsid=\"%lld\""
- " xrpc:nodeid=\""OIDFMT"\" "
- " xrpc:attr-qname=\"%s:%s:%s\"/>",
- i+1, owner+1, prefix, uri, loc);
- else
- b->pos += snprintf((b->buf + b->pos),
(b->len - b->pos),
- "<xrpc:attribute
xrpc:fragsid=\"%lld\""
- " xrpc:nodeid=\""OIDFMT"\" "
- " xrpc:attr-qname=\"%s\"/>",
- i+1, owner+1, loc);
+ if(pm[i].contIDold == contID) {
+ pmoff = i;
break;
}
}
+ assert(pmoff > -1);
+ /* in a projected doc, 'pre' becomes 'nid', so
+ * need to find the new 'pre' of the old 'pre' */
+ bi = bat_iterator(BATmirror(pm[pmoff].pre_nid));
+ BUNfndOID(bun, bi, (ptr)&owner);
+ assert(bun != BUN_NONE);
+ owner = *(oid*)BUNtail(bi, bun); /* pre of attr_own */
+ /* find prefix|uri|loc info from the old container */
+ char *pul = (char*)BUNtail(qn_puli,
BUNfirst(qn_prefix_uri_loc)+qn);
+ assert(pul);
+ size_t len = strlen(pul);
+ for(i = 0; (size_t)i < len; i++){
+ if(pul[i] == '|') pul[i] = ':';
+ }
+ ret = stream_printf(bs,
+ "<xrpc:attribute xrpc:fragsid=\"%lld\""
+ " xrpc:nodeid=\""OIDFMT"\" "
+ " xrpc:attr-qname=\"%s\"/>",
+ pmoff+1, owner+1, pul);
+ assert(ret > 0);
+ for(i = 0; (size_t)i < len; i++){
+ if(pul[i] == ':') pul[i] = '|';
+ }
BBPunfix(BBPcacheid(pre_nid));
BBPunfix(BBPcacheid(attr_own));
BBPunfix(BBPcacheid(attr_qn));
- BBPunfix(BBPcacheid(qn_prefix));
- BBPunfix(BBPcacheid(qn_loc));
+ BBPunfix(BBPcacheid(qn_prefix_uri_loc));
break;
}
case ELEM:
{
- elem_kind = getELEMkind(ws, contID, item);
- if (elem_kind == GDK_chr_min) {
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
- return GDK_FAIL;
- }
+ pmoff = -1;
for(i = 0; pm[i].contIDold != oid_nil; i++){
- /* found the 'pm' entry */
- if(pm[i].contIDold == (oid) contID) break;
+ if(pm[i].contIDold == contID) {
+ pmoff = i;
+ break;
+ }
}
+ assert(pmoff > -1);
+
/* in a projected doc, 'pre' becomes 'nid', so
* need to find the new 'pre' of the old 'pre' */
- for(a = 0; (size_t) a < BATcount(pm[i].pre_nid); a++) {
- if(item == pm[i].pre_nidT[a]) {
- item = a;
- break;
- }
+ bi = bat_iterator(BATmirror(pm[pmoff].pre_nid));
+ BUNfndOID(bun, bi, (ptr)&item);
+ assert(bun != BUN_NONE);
+ item = *(oid*)BUNtail(bi, bun);
+
+ /* get ELEM kind from the projected container,
i.e.smaller */
+ elem_kind = getELEMkind(ws, pm[pmoff].contIDnew, item);
+ if (elem_kind == GDK_chr_min) {
+ clean_up(sock, out, bs, argcnt, iterc, pm);
+ return GDK_FAIL;
}
+
switch(elem_kind) {
case 0: /* ELEMENT */
- b->pos += snprintf((b->buf + b->pos), (b->len
- b->pos),
+ ret = stream_printf(bs,
"<xrpc:element xrpc:fragsid=\"%lld\""
- " xrpc:nodeid=\""OIDFMT"\"/>",
+ "
xrpc:nodeid=\""OIDFMT"\"/>",
i+1, item+1);
+ assert(ret > 0);
break;
case 1: /* TEXT */
- /*
- b->pos += snprintf((b->buf + b->pos), (b->len
- b->pos),
- "<xrpc:text xrpc:fragsid=\"%lld\""
- " xrpc:nodeid=\""OIDFMT"\"/>",
- i+1, item+1);
- */
GDKerror("CMDhttp_post:
call%d/sequence%d/item%d has "
"type TEXT, not supported yet!\n",
my_iter, my_argc);
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
+ clean_up(sock, out, bs, argcnt, iterc, pm);
return GDK_FAIL;
break;
case 2: /* COMMENT */
- b->pos += snprintf((b->buf + b->pos), (b->len
- b->pos),
+ ret = stream_printf(bs,
"<xrpc:comment xrpc:fragsid=\"%lld\""
- " xrpc:nodeid=\""OIDFMT"\"/>",
+ "
xrpc:nodeid=\""OIDFMT"\"/>",
i+1, item+1);
+ assert(ret > 0);
break;
case 3: /* PI */
- b->pos += snprintf((b->buf + b->pos), (b->len
- b->pos),
+ ret = stream_printf(bs,
"<xrpc:pi xrpc:fragsid=\"%lld\""
- " xrpc:nodeid=\""OIDFMT"\"/>",
+ " xrpc:nodeid=\""OIDFMT"\"/>",
i+1, item+1);
+ assert(ret > 0);
break;
case 4: /* DOCUMENT */
- b->pos += snprintf((b->buf + b->pos), (b->len
- b->pos),
+ ret = stream_printf(bs,
"<xrpc:document xrpc:fragsid=\"%lld\""
" xrpc:nodeid=\""OIDFMT"\"/>",
i+1, item+1);
+ assert(ret > 0);
break;
case 5: /* COLLECTION */
GDKerror("CMDhttp_post:
call%d/sequence%d/item%d has "
"type COLLECTION, not allowed!\n",
my_iter, my_argc);
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
+ clean_up(sock, out, bs, argcnt, iterc, pm);
return GDK_FAIL;
break;
default:
- GDKerror("CMDhttp_post:
call%d/sequence%d/item%d has "
- "invalid 'elem_kind' %d\n", my_iter,
my_argc, elem_kind);
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
+ GDKerror("CMDhttp_post:
call[%d]/sequence[%d]/item[%d] has "
+ "an invalid kind '%d'\n", my_iter,
my_argc, elem_kind);
+ clean_up(sock, out, bs, argcnt, iterc, pm);
return GDK_FAIL;
}
break;
@@ -1422,21 +1397,22 @@
default:
GDKerror("CMDhttp_post: call%d/sequence%d/item%d has
invalid type %d\n",
my_iter, my_argc, k, cur_kind);
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
+ clean_up(sock, out, bs, argcnt, iterc, pm);
return GDK_FAIL;
}
}
- str2buf(b, "</xrpc:sequence>");
+ assert(stream_write(bs, "</xrpc:sequence>", 1, 16) == 16);
}
- str2buf(b, "</xrpc:call>");
+ assert(stream_write(bs, "</xrpc:call>", 1, 12) == 12);
}
- str2buf(b, XRPC_FOOTER);
- b->buf[b->pos] = 0;
+ assert(stream_write(bs, "</xrpc:request></env:Body></env:Envelope>\n", 1,
42) == 42);
/* Stop timing Client Serialisation */
time_xrpcClntSeria = GDKusec() - time_xrpcClntSeria;
/* Start timing Network Send Client2Server */
time_xrpcClnt2Serv = GDKusec();
+ b = stream_get_buffer(bs);
+ b->buf[b->pos] = '\0';
bytes_sent = stream_printf(out,
"POST %s HTTP/1.1\r\n"
"Host: %s:%d\r\n"
@@ -1448,7 +1424,7 @@
XRPC_REQ_CALLBACK, dst, port, b->pos, b->buf);
if (bytes_sent < b->pos) {
GDKerror("CMDhttp_post: failed to send XRPC request.");
- clean_up(sock, out, bs, b, argcnt, iterc, pm);
+ clean_up(sock, out, bs, argcnt, iterc, pm);
return GDK_FAIL;
}
/* Stop timing Network Send Client2Server */
@@ -1460,17 +1436,18 @@
if (options && strstr(options, "timing")) {
stream_printf(GDKout,
+ "XRPC_Data_Sent: " SZFMT " bytes\n"
+ " XRPC_Fragments_Size: " SZFMT " bytes\n"
"XRPC_Client_Serialisation (create_req_msg): %10.2f msec\n"
"XRPC_Network_Client_2_Server: %10.2f msec\n"
- "XRPC_Data_Sent: " SZFMT "
bytes\n\n"
"XRPC_Remote_Execution_Total (response2bat): %10.2f msec\n",
+ bytes_sent, (b->pos - bytes_prj),
(time_xrpcClntSeria/1000.0),
(time_xrpcClnt2Serv/1000.0),
- bytes_sent,
(time_remoteExec/1000.0));
}
- clean_up(-1, out, bs, b, argcnt, iterc, pm);
+ clean_up(-1, out, bs, argcnt, iterc, pm);
if(!shredBAT) return GDK_FAIL;
*res = shredBAT;
Index: xrpc_common.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_common.mx,v
retrieving revision 1.1.2.2
retrieving revision 1.1.2.3
diff -u -d -r1.1.2.2 -r1.1.2.3
--- xrpc_common.mx 16 Feb 2008 01:02:21 -0000 1.1.2.2
+++ xrpc_common.mx 13 Mar 2008 03:12:36 -0000 1.1.2.3
@@ -99,9 +99,6 @@
"</xrpc:sequence>" \
"</xrpc:call>"
-#define XRPC_FOOTER "</xrpc:request>" \
- "</env:Body>" \
- "</env:Envelope>\n"
#endif /* XRPC_COMMON_H */
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.58.2.11
retrieving revision 1.58.2.12
diff -u -d -r1.58.2.11 -r1.58.2.12
--- xrpc_server.mx 12 Mar 2008 18:21:26 -0000 1.58.2.11
+++ xrpc_server.mx 13 Mar 2008 03:12:36 -0000 1.58.2.12
@@ -621,7 +621,7 @@
while(attr < nattrs && attr_ownT[attr] == tpe_node_pre){
str = (char*)BUNtail(qn_puli,
BUNfirst(qn_prefix_uri_loc)+attr_qnT[attr]);
if(strcmp(str, "xrpc|"XRPC_NS"|attr-qname") == 0) {
- attr_name = (char*)BUNtail(prop_vali,
BUNfirst(prop_val)+attr_qnT[attr]);
+ attr_name = (char*)BUNtail(prop_vali,
BUNfirst(prop_val)+attr_propT[attr]);
}
attr++;
}
@@ -716,7 +716,6 @@
oid call_node_pre = 0, seq_node_pre = 0, tpe_node_pre = 0;
oid next_call_node_pre = 0, next_seq_node_pre = 0;
oid ao_ptr = 0; /* cursor in the attr_own bat */
- lng tpe_node_size = 0;
oid *frags_pre = NULL;
size_t nrFrags = 0;
size_t nattrs = 0;
@@ -887,10 +886,12 @@
if(strcmp(argtpe[nr_args], "xrpc:atomic-value") == 0) {
GDKfree(argtpe[nr_args]); /* find sub-type of the
atomic-value */
char *tptr = NULL;
- while(ao_ptr < nattrs && attr_ownT[ao_ptr++] ==
tpe_node_pre) {
+ while(ao_ptr < nattrs && attr_ownT[ao_ptr] ==
tpe_node_pre) {
if(strcmp((char*)BUNtail(qn_puli,
BUNfirst(qn_prefix_uri_loc)+attr_qnT[ao_ptr]),
- "xsi|"XSI_NS"|type") == 0)
- tptr = (char*)BUNtail(prop_vali,
BUNfirst(prop_val)+attr_qnT[ao_ptr]);
+ "xsi|"XSI_NS"|type") == 0) {
+ tptr = (char*)BUNtail(prop_vali,
BUNfirst(prop_val)+attr_propT[ao_ptr]);
+ }
+ ao_ptr++;
}
if(!tptr){
snprintf(errstr, 1024, "XRPC request:
iter["LLFMT"]/param["LLFMT"]/value["LLFMT"]"
@@ -904,7 +905,7 @@
argtpe[nr_args] = GDKstrdup(tptr);
val_node_pre = tpe_node_pre + 1;
- if( (tpe_node_size != 1) || (pre_kindT[val_node_pre] !=
TEXT) ) {
+ if( (pre_sizeT[tpe_node_pre] != 1) ||
(pre_kindT[val_node_pre] != TEXT) ) {
snprintf(errstr, 1024, "XRPC request:
iter["LLFMT"]/param["LLFMT"]/value["LLFMT"]"
"of type \"%s\" is expected to have a simple
value",
i+1, j+1, k, argtpe[nr_args]);
@@ -1047,6 +1048,8 @@
* Again, print time info at the XRPC server side, but is usually
* not used.
*/
+ lng time_servExec = GDKusec();
+
stream_printf(mc->c->fdout, "HTTP/1.1 200 OK\r\n"
"Content-type: text/xml; "
"charset=\"utf-8\"\r\n\r\n");
@@ -1066,6 +1069,12 @@
stream_flush(mc->c->fdout);
GDKsetbuf(errbuf_bak);
+ time_servExec = GDKusec() - time_servExec;
+ if(timing) {
+ fprintf(stdout, "XRPC_Server_Execution: %10.2f msec\n",
+ (time_servExec/1000.0));
+ }
+
return GDK_SUCCEED;
}
@@ -1192,7 +1201,8 @@
xrpc_handle_file_request(mapi_client *mc, struct shttpd_callback_arg *arg)
{
char *method = NULL, *uri = shttpd_get_uri(arg);
- char location[1024];
+ //char location[1024];
+ char *location = "/net/andorea/export/scratch0/zhang/admin.xq\0";
int ret = GDK_FAIL;
lng **argcnt = NULL;
@@ -1207,7 +1217,7 @@
send_err(mc->c->fdout, TRUE, ERR403, "env:Sender", "Directory listing
denied");
return GDK_FAIL;
}
- snprintf(location, 1024, "http://127.0.0.1:%d/admin/admin.xq", xrpc_port);
+ //snprintf(location, 1024, "http://127.0.0.1:%d/xrpc/admin/admin.xq",
xrpc_port);
method = shttpd_get_method(arg);
argcnt = GDKmalloc(sizeof(lng*));
if (!argcnt){
Index: rt_projection.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/Attic/rt_projection.mx,v
retrieving revision 1.1.2.17
retrieving revision 1.1.2.18
diff -u -d -r1.1.2.17 -r1.1.2.18
--- rt_projection.mx 12 Mar 2008 18:21:25 -0000 1.1.2.17
+++ rt_projection.mx 13 Mar 2008 03:12:35 -0000 1.1.2.18
@@ -38,9 +38,6 @@
bte *knd;
} pcontext;
-BAT *
-getBatFromContainer(BAT *ws, int batID, int contID);
-
int runtime_doc_projection2stream(
str options,
str printmode,
@@ -316,24 +313,20 @@
* @return bat batID from container contID of given ws, or
* NULL by error
*/
-BAT *
+static INLINE BAT *
getBatFromContainer(BAT *ws, int batID, int contID)
{
BAT *b = NULL, *batbat = NULL;
BUN bun = BUN_NONE, bbun = BUN_NONE;
oid CONTid = (oid) contID;
oid BATid = (oid) batID;
+ BATiter wsi, batbati;
- if(!ws) {
- GDKerror("getBatFromContainer: invalid working set ws\n");
- return NULL;
- }
- if(contID < 0 || batID < 0){
- GDKerror("getBatFromContainer: contID and batID must NOT be
negative\n");
- return NULL;
- }
+ assert(ws && contID >=0 && batID >= 0);
- if( (bun = BUNfnd(ws, &BATid)) == BUN_NONE ) {
+ wsi = bat_iterator(ws);
+ BUNfndOID(bun, wsi, (ptr)&BATid);
+ if(bun == BUN_NONE) {
GDKerror("getBatFromContainer: NOT FOUND WS[%d]\n",batID);
return NULL;
}
@@ -342,7 +335,9 @@
return NULL;
}
- if( (bbun = BUNfnd(batbat, &CONTid)) == BUN_NONE ) {
+ batbati = bat_iterator(batbat);
+ BUNfndOID(bbun, batbati, (ptr)&CONTid);
+ if(bbun == BUN_NONE) {
BBPunfix(BBPcacheid(batbat));
GDKerror("getBatFromContainer:NOT FOUND WS[%d][%d]\n",
batID, contID);
-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins