Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs7.sourceforge.net:/tmp/cvs-serv16161
Modified Files:
pathfinder.mx xrpc_client.mx xrpc_server.mx
Log Message:
use shred_stream iso. shred_str to avoid 1 intermediate buffering step.
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -d -r1.14 -r1.15
--- xrpc_client.mx 28 Feb 2007 14:45:43 -0000 1.14
+++ xrpc_client.mx 2 Mar 2007 18:23:59 -0000 1.15
@@ -139,7 +139,18 @@
var seq_prop;
var err := CATCH ({ seq_prop :=
qn_loc.ord_select("sequence").reverse().fetch(0); });
if (not(isnil(err))) {
- ERROR("get_rpc_res expects at least *one* \"<sequence>\" tag.\n");
+ if(count(qn_loc.ord_select("Fault")) = 0) {
+ ERROR("get_rpc_res expects at least *one* \"<sequence>\" tag.\n");
+ } else {
+ var fault_value_pre :=
qn_loc.ord_select("Value").reverse().fetch(0);
+ fault_valut_pre :=
pre_prop.ord_select(fault_valut_pre).reverse().fetch(0);
+ var fault_value := prop_text.fetch(fault_value_pre+1);
+
+ var fault_text_pre := qn_loc.ord_select("Text").reverse().fetch(0);
+ fault_text_pre :=
pre_prop.ord_select(fault_text_pre).reverse().fetch(0);
+ var fault_text := prop_text.fetch(fault_text_pre+1);
+ ERROR("SOAP Fault Code : %s\nSOAP Fault Reason: %d\n",
fault_value, fault_text);
+ }
}
# Fetch the position of the first "pre_prop" value of the "sequence"
@@ -385,9 +396,8 @@
} else {
# We do not want to discard results from other destinations
# where executions might have succeeded, so we only print a
- # WARNING by error, iso. terminate the execution with 'ERROR'
- printf("!WARNING: doLoopLiftedRPC: ");
- printf("error occurred during RPC call to \"%s\"\n%s\n",
+ # message by error, iso. terminate the execution with 'ERROR'
+ printf("!ERROR: during RPC call to \"%s\"\n%s\n",
$t, rpc_err);
}
}
@@ -499,7 +509,7 @@
b->pos += len; \
}
-#define clean_up(sock, s1, s2, s3, b, argcnt, iterc) { \
+#define clean_up(sock, s1, s2, b, argcnt, iterc) { \
lng i = 0; \
if (sock > 0) close(sock); \
if(s1){ \
@@ -508,9 +518,6 @@
if(s2){ \
stream_close(s2); stream_destroy(s2); \
} \
- if(s3){ \
- stream_close(s3); stream_destroy(s3); \
- } \
if (b) { \
if (b->len > 0) buffer_destroy(b); \
else free(b); \
@@ -730,6 +737,136 @@
return ret;
}
+/**
+ * try to receive the response message and shred it into shredBAT
+ * Returns shredBAT, or
+ * NULL on error
+ */
+static BAT *
+response2bat(int sock,
+ char *dst,
+ buffer *b,
+ int updCall,
+ lng *time_xrpcClntDeSeria)
+{
+ char *strptr = NULL, *ptr = NULL;
+ int ret;
+ stream *in;
+ BAT *shredBAT;
+ if (!(in = socket_rastream(sock, "http_receive"))) {
+ GDKerror("response2bat: failed to create socket_rastream\n");
+ return NULL;
+ }
+
+ b->pos = 0;
+ errno = 0;
+ ret = stream_readline(in, b->buf, 1024);
+ if (ret < 0) {
+ GDKerror("response2bat: failed to receive response from %s: %s\n",
+ dst, errno?strerror(errno):".");
+ stream_close(in); stream_destroy(in);
+ return NULL;
+ }
+ if (strncmp(b->buf+9, "200", 3) != 0) {
+ b->buf[ret] = '\0';
+ GDKerror("HTTP Error Code : %s\n", b->buf + 9);
+ strptr = b->buf + b->pos + 1;
+ do{ /* receive the whole error message */
+ errno = 0;
+ ret = stream_read(in, (b->buf + b->pos), 1, (b->len - b->pos));
+ b->pos += ret;
+ } while (ret > 0);
+ if (ret < 0) {
+ GDKerror("response2bat: failed to receive response from %s: %s\n",
+ dst, errno?strerror(errno):".");
+ stream_close(in); stream_destroy(in);
+ return NULL;
+ }
+ b->buf[b->pos] = 0;
+
+ strptr = strstr(strptr, "<env:Value");
+ if(!strptr) {
+ GDKerror("response2bat: SOAP Fault message not well-formed: "
+ "could not find \"<env:Value\".\n");
+ stream_close(in); stream_destroy(in);
+ return NULL;
+ }
+ strptr = strchr(strptr, '>') + 1;
+ ptr = strstr(strptr, "</env:Value>");
+ if( (!strptr) || (!ptr) ) {
+ GDKerror("response2bat: SOAP Fault message not well-formed: "
+ "could not find \"</env:Value>\".\n");
+ stream_close(in); stream_destroy(in);
+ return NULL;
+ }
+ ptr[0] = '\0';
+ GDKerror("SOAP Fault Code : %s\n", strptr);
+
+ strptr = strstr(ptr+12, "<env:Text");
+ if(!strptr) {
+ GDKerror("response2bat: SOAP Fault message not well-formed: "
+ "could not find \"<env:Text\".\n");
+ stream_close(in); stream_destroy(in);
+ return NULL;
+ }
+ strptr = strchr(strptr, '>') + 1;
+ ptr = strstr(strptr, "</env:Text>");
+ if( (!strptr) || (!ptr) ) {
+ GDKerror("response2bat: SOAP Fault message not well-formed: "
+ "could not find \"</env:Text>\".\n");
+ stream_close(in); stream_destroy(in);
+ return NULL;
+ }
+ ptr[0] = '\0';
+ GDKerror("SOAP Fault Reason: %s\n", strptr);
+ stream_close(in); stream_destroy(in);
+ return NULL;
+ }
+
+ do{ /* read the HTTP header and throw it away */
+ errno = 0;
+ ret = stream_readline(in, b->buf, 1024);
+ b->buf[ret] = '\0';
+ if(ret == 1 && b->buf[0] == '\r')
+ ret = 0; /* end-of-HTTP-header found */
+ } while (ret > 0);
+ if (ret < 0) {
+ GDKerror("response2bat: failed to receive response from %s: %s\n",
+ dst, errno?strerror(errno):".");
+ stream_close(in); stream_destroy(in);
+ return NULL;
+ }
+
+ /* Start timing Client DeSerialisation */
+ *time_xrpcClntDeSeria = GDKusec();
+ if (!(shredBAT = BATnew(TYPE_str, TYPE_bat, 32))){
+ GDKerror("response2bat: failed to malloc shredBAT\n");
+ stream_close(in); stream_destroy(in);
+ return NULL;
+ }
+
+ if (updCall) {
+ *time_xrpcClntDeSeria = GDKusec() - *time_xrpcClntDeSeria;
+ stream_close(in); stream_destroy(in);
+ return shredBAT;
+ }
+
+ if(shred(shredBAT, NULL, NULL, in, 0, NULL, NULL, NULL) ==GDK_FAIL) {
+ GDKerror("most probably, the XRPC response contains some MIL "
+ "error message\n");
+ if(BBPreclaim(shredBAT) == -1){
+ GDKerror("response2bat: failed to destroy \"shredBAT\"!");
+ }
+ stream_close(in); stream_destroy(in);
+ return NULL;
+ }
+ /* Stop timing Client DeSerialisation */
+ *time_xrpcClntDeSeria = GDKusec() - *time_xrpcClntDeSeria;
+
+ stream_close(in); stream_destroy(in);
+ return shredBAT;
+}
+
int
CMDhttp_post(BAT **res, str options, str dst, str rpc_module,
str rpc_uri, str rpc_method, int *updCall, int *arity,
@@ -737,11 +874,10 @@
BAT *fun_item, BAT *fun_kind, BAT *int_values, BAT *dbl_values,
BAT *dec_values, BAT *str_values)
{
- lng percentage = 0;
- int ret, sock = -1;
+ int sock = -1;
buffer *b = NULL;
- str ptr = NULL, strptr=NULL, str_val = NULL;
- stream *in = NULL, *out = NULL, *bs = NULL;
+ str str_val = NULL;
+ stream *out = NULL, *bs = NULL;
/* Hold the temp BATs after the RPC response has been shredded: */
BAT *shredBAT = NULL;
/* BATs hold item|kind of a node parameter */
@@ -773,11 +909,6 @@
close(sock);
return GDK_FAIL;
}
- if (!(in = socket_rastream(sock, "http_receive"))) {
- GDKerror("CMDhttp_post: failed to create socket_rastream\n");
- stream_close(out); stream_destroy(out);
- return GDK_FAIL;
- }
/* Start timing Client Serialisation */
time_xrpcClntSeria = GDKusec();
@@ -786,7 +917,7 @@
b = buffer_create(MAX_BUF_SIZE);
if (!b || b->len == 0) {
GDKerror("CMDhttp_post: failed to create outgoing buffer\n");
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
@@ -796,7 +927,7 @@
if (!(argcnt = GDKmalloc(iterc * sizeof(lng*)))) {
GDKerror("CMDhttp_post: failed to malloc argcnt\n");
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
for (i = 0; i < iterc; i++) {
@@ -804,7 +935,7 @@
* we have a place to note that a function has zero parameter */
if (!(argcnt[i] = GDKmalloc((argc>0?argc:1) * sizeof(lng)))) {
GDKerror("CMDhttp_post: failed to malloc argcnt[" SZFMT "]\n", i);
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
argcnt[i][0] = 0;
@@ -834,7 +965,7 @@
nr_args++;
if (nr_args == max_args) {
GDKerror("CMDhttp_post: too many parameters.\n");
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
}
@@ -852,7 +983,7 @@
char *bptr = realloc(b->buf, b->len);
if (!bptr) {
GDKerror("CMDhttp_post: failed to allocate larger message
buffer.\n");
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
b->buf = bptr;
@@ -943,7 +1074,7 @@
qn_loc = getWsBAT(ws, contID, QN_LOC);
if( (!attr_qn) || (!attr_prop) || (!prop_val) ||
(!qn_prefix) || (!qn_loc) ){
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
@@ -970,7 +1101,7 @@
contID = XTRACT_CONT(cmbn_cont_kind);
elem_kind = getELEMkind(ws, contID, item);
if (elem_kind == GDK_chr_min) {
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
switch(elem_kind) {
@@ -983,7 +1114,7 @@
default:
GDKerror("CMDhttp_post:
call%d/sequence%d/item%d has "
"invalid type %d\n", my_iter, my_argc,
i);
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
/* print body of the node */
@@ -996,7 +1127,7 @@
if (!(bs = buffer_wastream(b, "shred_element"))) {
GDKerror("CMDhttp_post: failed to create
buffer_wastream to "
"shred element parameter\n");
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
xquery_print_result_driver(bs,
@@ -1010,7 +1141,7 @@
"BAT \"node_item\", because it is in "
"use by other process!");
GDKerror("THIS SHOULD NEVER HAPPEN!!!");
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
@@ -1019,7 +1150,7 @@
"BAT \"node_item\", because it is in "
"use by other process!");
GDKerror("THIS SHOULD NEVER HAPPEN!!!");
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
@@ -1032,14 +1163,14 @@
case 5: /* COLLECTION: nothing to be done, here */
break;
default:
GDKerror("CMDhttp_post: should have never
reached here!");
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
} break;
default:
GDKerror("CMDhttp_post: call%d/sequence%d/item%d has
invalid type %d\n",
my_iter, my_argc, i);
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
}
@@ -1065,122 +1196,32 @@
XRPCD_CALLBACK, dst, b->pos, b->buf);
if (bytes_sent < b->pos) {
GDKerror("CMDhttp_post: failed to send XRPC request.");
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
/* Stop timing Network Send Client2Server */
time_xrpcClnt2Serv= GDKusec() - time_xrpcClnt2Serv;
- /*** Receive all response data ***/
- b->pos = b->buf[0] = 0;
- do{
- errno = 0;
- ret = stream_read(in, (b->buf + b->pos), 1, (b->len - b->pos));
- b->pos += ret;
- if (b->pos > (b->len * 0.8)) {
- b->len *= 2;
- b->buf = realloc(b->buf, b->len);
- }
- } while (ret > 0);
- b->buf[b->pos] = 0;
- bytes_received = b->pos;
-
- if ((ret < 0) || (b->pos == 0)) {
- GDKerror("CMDhttp_post: failed to receive response from %s: %s\n",
- dst, errno?strerror(errno):".");
- return GDK_FAIL;
- }
-
- /* Start timing Client DeSerialisation */
- time_xrpcClntDeSeria = GDKusec();
- /* the HTTP header starts with "HTTP/1.x xxx", after the first
- * white space character, it is the 3-digits return code */
- strptr = strchr(b->buf, (int)' ') + 1;
- if (!strptr ||
- (strstr(strptr, "200") != strptr) ||
- strstr(strptr, "<env:Fault") ) {
-
- /* get the error code, error string out of the message */
- if(strptr){
- ptr = strchr(strptr, '\r');
- if (ptr){
- ptr[0] = '\0';
- GDKerror("HTTP Error Code : %s\n", b->buf);
- strptr = strstr(ptr+1, "<env:Fault");
- if(strptr){
- strptr = strstr(strptr, "<env:Value>") + 11;
- ptr = strchr(strptr, '<');
- ptr[0] = '\0';
- GDKerror("SOAP Fault Code : %s\n", strptr);
- strptr = strstr(ptr+1, "<env:Text") + 10;
- strptr = strchr(strptr, '>') + 1;
- ptr = strchr(strptr, '<');
- ptr[0] = '\0';
- GDKerror("SOAP Fault Reason: %s\n", strptr);
- }
- }
- } else {
- GDKerror("CMDhttp_post: remote execution failed, "
- "no further information can be provided\n");
- }
- clean_up(sock, in, out, bs, b, argcnt, iterc);
- return GDK_FAIL;
- }
-
- if (!(shredBAT = BATnew(TYPE_str, TYPE_bat, 32))){
- GDKerror("CMDhttp_post: failed to malloc shredBAT\n");
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ shredBAT = response2bat(sock, dst, b, *updCall, &time_xrpcClntDeSeria);
+ if(!shredBAT) {
+ clean_up(sock, out, bs, b, argcnt, iterc);
return GDK_FAIL;
}
- if (!(*updCall)){
- bit verbose = FALSE;
- strptr = strstr(b->buf, "<env:Envelope");
- ptr = strrchr(b->buf, '>'); /* Find end of "</env:Envelope>" */
- ptr[1] = '\0';
- if (!strptr || CMDshred_str(shredBAT, strptr, &percentage, NULL,
&verbose) == GDK_FAIL) {
- GDKerror("CMDhttp_post: invalid XRPC response received\n");
- GDKerror("%s\n", strptr?strptr:b->buf);
- clean_up(sock, in, out, bs, b, argcnt, iterc);
- if(BBPreclaim(shredBAT) == -1){
- GDKerror("CMDhttp_post: failed to destroy BAT "
- "\"shredBAT\", because it is in use by other "
- "process!");
- GDKerror("THIS SHOULD NEVER HAPPEN!!!");
- }
- return GDK_FAIL;
- }
- } else {
- ptr = b->buf;
- }
- time_xrpcClntDeSeria = GDKusec() - time_xrpcClntDeSeria;
if (options && strstr(options, "timing")) {
stream_printf(GDKout,
"XRPC_Client_Serialisation: %lld microsec\n"
"XRPC_Network_Client_2_Server: %lld microsec\n"
"XRPC_Data_Sent/Received: " SZFMT " / "
- SZFMT " bytes\n\n",
+ SZFMT " bytes\n\n"
+ "XRPC_Client_DeSerialisation: %lld microsec\n",
time_xrpcClntSeria,
time_xrpcClnt2Serv,
- bytes_sent, bytes_received);
- }
-
- ptr = strstr(ptr, "XRPC_Server_Timing");
- if (ptr) {
- ptr += 18; /* Skip "XRPC_Server_Timing" */
- /* Find the start of the real timing info */
- ptr = strstr(ptr, "XRPC");
- stream_printf(GDKout, strstr(ptr, "XRPC_"));
- stream_printf(GDKout, "\n");
- }
-
- if (options && strstr(options, "timing")) {
- stream_printf(GDKout,
- "XRPC_Client_DeSerialisation: %lld microsec\n",
+ bytes_sent, bytes_received,
time_xrpcClntDeSeria);
}
- clean_up(sock, in, out, bs, b, argcnt, iterc);
+ clean_up(sock, out, bs, b, argcnt, iterc);
*res = shredBAT;
return GDK_SUCCEED;
}
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.300
retrieving revision 1.301
diff -u -d -r1.300 -r1.301
--- pathfinder.mx 27 Feb 2007 09:59:10 -0000 1.300
+++ pathfinder.mx 2 Mar 2007 18:23:59 -0000 1.301
@@ -5390,7 +5390,7 @@
if (flags&1){
/* print timing ourselves */
- stream_printf(GDKout, "\nXRPC_Server_Timing:\n"
+ fprintf(stdout, "\n"
"XRPC_Server_Application: %lld microsec\n"
"XRPC_Network_Server_2_Client: %lld microsec\n",
(*ctx->time_shred + *ctx->time_compile + *ctx->time_exec),
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -d -r1.17 -r1.18
--- xrpc_server.mx 2 Mar 2007 12:35:43 -0000 1.17
+++ xrpc_server.mx 2 Mar 2007 18:23:59 -0000 1.18
@@ -1016,7 +1016,7 @@
}
if (timing) {
- stream_printf(GDKout,
+ fprintf(stdout,
"XRPC_Server_DeSerialisation: %lld microsec\n",
time_xrpcServDeSeria);
}
@@ -1188,10 +1188,6 @@
snprintf(datadir, 1024, "%s%cMonetDB%cxrpc", s, DIR_SEP, DIR_SEP);
}
shttpd_setopt("document_root", datadir);
-/*
- shttpd_setopt("error_log", "c:\\tmp\\err.txt");
- shttpd_setopt("debug", "1");
- */
shttpd_init(NULL); /* Initialize httpsd thread */
-------------------------------------------------------------------------
Take Surveys. Earn Cash. Influence the Future of IT
Join SourceForge.net's Techsay panel and you'll get the chance to share your
opinions on IT & business topics through brief surveys-and earn cash
http://www.techsay.com/default.php?page=join.php&p=sourceforge&CID=DEVDEV
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins