Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs7.sourceforge.net:/tmp/cvs-serv16161

Modified Files:
        pathfinder.mx xrpc_client.mx xrpc_server.mx 
Log Message:
use shred_stream iso. shred_str to avoid 1 intermediate buffering step.



Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -d -r1.14 -r1.15
--- xrpc_client.mx      28 Feb 2007 14:45:43 -0000      1.14
+++ xrpc_client.mx      2 Mar 2007 18:23:59 -0000       1.15
@@ -139,7 +139,18 @@
     var seq_prop;
     var err := CATCH ({ seq_prop := 
qn_loc.ord_select("sequence").reverse().fetch(0); });
     if (not(isnil(err))) {
-        ERROR("get_rpc_res expects at least *one* \"<sequence>\" tag.\n");
+        if(count(qn_loc.ord_select("Fault")) = 0) {
+            ERROR("get_rpc_res expects at least *one* \"<sequence>\" tag.\n");
+        } else {
+            var fault_value_pre := 
qn_loc.ord_select("Value").reverse().fetch(0);
+                fault_valut_pre := 
pre_prop.ord_select(fault_valut_pre).reverse().fetch(0);
+            var fault_value     := prop_text.fetch(fault_value_pre+1);
+
+            var fault_text_pre := qn_loc.ord_select("Text").reverse().fetch(0);
+                fault_text_pre := 
pre_prop.ord_select(fault_text_pre).reverse().fetch(0);
+            var fault_text     := prop_text.fetch(fault_text_pre+1);
+            ERROR("SOAP Fault Code  : %s\nSOAP Fault Reason: %d\n", 
fault_value, fault_text);
+        }
     }
 
     # Fetch the position of the first "pre_prop" value of the "sequence"
@@ -385,9 +396,8 @@
         } else {
             # We do not want to discard results from other destinations
             # where executions might have succeeded, so we only print a
-            # WARNING by error, iso. terminate the execution with 'ERROR'
-            printf("!WARNING: doLoopLiftedRPC: ");
-            printf("error occurred during RPC call to \"%s\"\n%s\n",
+            # message by error, iso. terminate the execution with 'ERROR'
+            printf("!ERROR: during RPC call to \"%s\"\n%s\n",
                     $t, rpc_err);
         }
     }
@@ -499,7 +509,7 @@
     b->pos += len;                                      \
 }
 
-#define clean_up(sock, s1, s2, s3, b, argcnt, iterc) {  \
+#define clean_up(sock, s1, s2, b, argcnt, iterc) {      \
     lng i = 0;                                          \
     if (sock > 0) close(sock);                          \
     if(s1){                                             \
@@ -508,9 +518,6 @@
     if(s2){                                             \
         stream_close(s2); stream_destroy(s2);           \
     }                                                   \
-    if(s3){                                             \
-        stream_close(s3); stream_destroy(s3);           \
-    }                                                   \
     if (b) {                                            \
         if (b->len > 0) buffer_destroy(b);              \
         else            free(b);                        \
@@ -730,6 +737,136 @@
     return ret;
 }
 
+/**
+ * try to receive the response message and shred it into shredBAT
+ * Returns shredBAT, or
+ *         NULL on error
+ */
+static BAT *
+response2bat(int sock,
+             char *dst,
+             buffer *b,
+             int updCall,
+             lng *time_xrpcClntDeSeria)
+{
+    char *strptr = NULL, *ptr = NULL;
+    int ret;
+    stream *in;
+    BAT *shredBAT;
+    if (!(in = socket_rastream(sock, "http_receive"))) {
+        GDKerror("response2bat: failed to create socket_rastream\n");
+        return NULL;
+    }
+
+    b->pos = 0;
+    errno = 0;
+    ret = stream_readline(in, b->buf, 1024);
+    if (ret < 0) {
+        GDKerror("response2bat: failed to receive response from %s: %s\n",
+                dst, errno?strerror(errno):".");
+        stream_close(in); stream_destroy(in);
+        return NULL;
+    }
+    if (strncmp(b->buf+9, "200", 3) != 0) {
+        b->buf[ret] = '\0';
+        GDKerror("HTTP Error Code  : %s\n", b->buf + 9);
+        strptr = b->buf + b->pos + 1;
+        do{ /* receive the whole error message */
+            errno = 0;
+            ret = stream_read(in, (b->buf + b->pos), 1, (b->len - b->pos));
+            b->pos += ret;
+        } while (ret > 0);
+        if (ret < 0) {
+            GDKerror("response2bat: failed to receive response from %s: %s\n",
+                    dst, errno?strerror(errno):".");
+            stream_close(in); stream_destroy(in);
+            return NULL;
+        }
+        b->buf[b->pos] = 0;
+
+        strptr = strstr(strptr, "<env:Value");
+        if(!strptr) {
+            GDKerror("response2bat: SOAP Fault message not well-formed: "
+                     "could not find \"<env:Value\".\n");
+            stream_close(in); stream_destroy(in);
+            return NULL;
+        }
+        strptr = strchr(strptr, '>') + 1;
+        ptr = strstr(strptr, "</env:Value>");
+        if( (!strptr) || (!ptr) ) {
+            GDKerror("response2bat: SOAP Fault message not well-formed: "
+                     "could not find \"</env:Value>\".\n");
+            stream_close(in); stream_destroy(in);
+            return NULL;
+        }
+        ptr[0] = '\0';
+        GDKerror("SOAP Fault Code  : %s\n", strptr);
+
+        strptr = strstr(ptr+12, "<env:Text");
+        if(!strptr) {
+            GDKerror("response2bat: SOAP Fault message not well-formed: "
+                     "could not find \"<env:Text\".\n");
+            stream_close(in); stream_destroy(in);
+            return NULL;
+        }
+        strptr = strchr(strptr, '>') + 1;
+        ptr = strstr(strptr, "</env:Text>");
+        if( (!strptr) || (!ptr) ) {
+            GDKerror("response2bat: SOAP Fault message not well-formed: "
+                     "could not find \"</env:Text>\".\n");
+            stream_close(in); stream_destroy(in);
+            return NULL;
+        }
+        ptr[0] = '\0';
+        GDKerror("SOAP Fault Reason: %s\n", strptr);
+        stream_close(in); stream_destroy(in);
+        return NULL;
+    }
+
+    do{ /* read the HTTP header and throw it away */
+        errno = 0;
+        ret = stream_readline(in, b->buf, 1024);
+        b->buf[ret] = '\0';
+        if(ret == 1 && b->buf[0] == '\r')
+            ret = 0; /* end-of-HTTP-header found */
+    } while (ret > 0);
+    if (ret < 0) {
+        GDKerror("response2bat: failed to receive response from %s: %s\n",
+                dst, errno?strerror(errno):".");
+        stream_close(in); stream_destroy(in);
+        return NULL;
+    }
+
+    /* Start timing Client DeSerialisation */
+    *time_xrpcClntDeSeria = GDKusec();
+    if (!(shredBAT = BATnew(TYPE_str, TYPE_bat, 32))){
+        GDKerror("response2bat: failed to malloc shredBAT\n");
+        stream_close(in); stream_destroy(in);
+        return NULL;
+    }
+
+    if (updCall) {
+        *time_xrpcClntDeSeria = GDKusec() - *time_xrpcClntDeSeria;
+        stream_close(in); stream_destroy(in);
+        return shredBAT;
+    }
+
+    if(shred(shredBAT, NULL, NULL, in, 0, NULL, NULL, NULL) ==GDK_FAIL) {
+        GDKerror("most probably, the XRPC response contains some MIL "
+                "error message\n");
+        if(BBPreclaim(shredBAT) == -1){
+            GDKerror("response2bat: failed to destroy \"shredBAT\"!");
+        }
+        stream_close(in); stream_destroy(in);
+        return NULL;
+    }
+    /* Stop timing Client DeSerialisation */
+    *time_xrpcClntDeSeria = GDKusec() - *time_xrpcClntDeSeria;
+
+    stream_close(in); stream_destroy(in);
+    return shredBAT;
+}
+
 int
 CMDhttp_post(BAT **res, str options, str dst, str rpc_module,
         str rpc_uri, str rpc_method, int *updCall, int *arity,
@@ -737,11 +874,10 @@
         BAT *fun_item, BAT *fun_kind, BAT *int_values, BAT *dbl_values,
         BAT *dec_values, BAT *str_values)
 {
-    lng percentage = 0;
-    int ret, sock = -1;
+    int sock = -1;
     buffer *b = NULL;
-    str ptr = NULL, strptr=NULL, str_val = NULL;
-    stream *in = NULL, *out = NULL, *bs = NULL;
+    str str_val = NULL;
+    stream *out = NULL, *bs = NULL;
     /* Hold the temp BATs after the RPC response has been shredded: */
     BAT *shredBAT = NULL;
     /* BATs hold item|kind of a node parameter */
@@ -773,11 +909,6 @@
         close(sock);
         return GDK_FAIL;
     }
-    if (!(in = socket_rastream(sock, "http_receive"))) {
-        GDKerror("CMDhttp_post: failed to create socket_rastream\n");
-        stream_close(out); stream_destroy(out);
-        return GDK_FAIL;
-    }
 
     /* Start timing Client Serialisation */
     time_xrpcClntSeria = GDKusec();
@@ -786,7 +917,7 @@
     b = buffer_create(MAX_BUF_SIZE);
     if (!b || b->len == 0) {
         GDKerror("CMDhttp_post: failed to create outgoing buffer\n");
-        clean_up(sock, in, out, bs, b, argcnt, iterc);
+        clean_up(sock, out, bs, b, argcnt, iterc);
         return GDK_FAIL;
     }
 
@@ -796,7 +927,7 @@
 
     if (!(argcnt = GDKmalloc(iterc * sizeof(lng*)))) {
         GDKerror("CMDhttp_post: failed to malloc argcnt\n");
-        clean_up(sock, in, out, bs, b, argcnt, iterc);
+        clean_up(sock, out, bs, b, argcnt, iterc);
         return GDK_FAIL;
     }
     for (i = 0; i < iterc; i++) {
@@ -804,7 +935,7 @@
          * we have a place to note that a function has zero parameter */
         if (!(argcnt[i] = GDKmalloc((argc>0?argc:1) * sizeof(lng)))) {
             GDKerror("CMDhttp_post: failed to malloc argcnt[" SZFMT "]\n", i);
-            clean_up(sock, in, out, bs, b, argcnt, iterc);
+            clean_up(sock, out, bs, b, argcnt, iterc);
             return GDK_FAIL;
         }
         argcnt[i][0] = 0;
@@ -834,7 +965,7 @@
         nr_args++;
         if (nr_args == max_args) {
             GDKerror("CMDhttp_post: too many parameters.\n");
-            clean_up(sock, in, out, bs, b, argcnt, iterc);
+            clean_up(sock, out, bs, b, argcnt, iterc);
             return GDK_FAIL;
         }
     }
@@ -852,7 +983,7 @@
             char *bptr = realloc(b->buf, b->len);
             if (!bptr) {
                 GDKerror("CMDhttp_post: failed to allocate larger message 
buffer.\n");
-                clean_up(sock, in, out, bs, b, argcnt, iterc);
+                clean_up(sock, out, bs, b, argcnt, iterc);
                 return GDK_FAIL;
             }
             b->buf = bptr;
@@ -943,7 +1074,7 @@
                         qn_loc    = getWsBAT(ws, contID, QN_LOC);
                         if( (!attr_qn) || (!attr_prop) || (!prop_val) ||
                             (!qn_prefix) || (!qn_loc) ){
-                            clean_up(sock, in, out, bs, b, argcnt, iterc);
+                            clean_up(sock, out, bs, b, argcnt, iterc);
                             return GDK_FAIL;
                         }
                         
@@ -970,7 +1101,7 @@
                         contID = XTRACT_CONT(cmbn_cont_kind);
                         elem_kind = getELEMkind(ws, contID, item);
                         if (elem_kind == GDK_chr_min) {
-                            clean_up(sock, in, out, bs, b, argcnt, iterc);
+                            clean_up(sock, out, bs, b, argcnt, iterc);
                             return GDK_FAIL;
                         }
                         switch(elem_kind) {
@@ -983,7 +1114,7 @@
                             default:
                                 GDKerror("CMDhttp_post: 
call%d/sequence%d/item%d has "
                                         "invalid type %d\n", my_iter, my_argc, 
i);
-                                clean_up(sock, in, out, bs, b, argcnt, iterc);
+                                clean_up(sock, out, bs, b, argcnt, iterc);
                                 return GDK_FAIL;
                         }
                         /* print body of the node */
@@ -996,7 +1127,7 @@
                         if (!(bs = buffer_wastream(b, "shred_element"))) {
                             GDKerror("CMDhttp_post: failed to create 
buffer_wastream to "
                                     "shred element parameter\n");
-                            clean_up(sock, in, out, bs, b, argcnt, iterc);
+                            clean_up(sock, out, bs, b, argcnt, iterc);
                             return GDK_FAIL;
                         }
                         xquery_print_result_driver(bs,
@@ -1010,7 +1141,7 @@
                                     "BAT \"node_item\", because it is in "
                                     "use by other process!");
                             GDKerror("THIS SHOULD NEVER HAPPEN!!!");
-                            clean_up(sock, in, out, bs, b, argcnt, iterc);
+                            clean_up(sock, out, bs, b, argcnt, iterc);
                             return GDK_FAIL;
                         }
 
@@ -1019,7 +1150,7 @@
                                     "BAT \"node_item\", because it is in "
                                     "use by other process!");
                             GDKerror("THIS SHOULD NEVER HAPPEN!!!");
-                            clean_up(sock, in, out, bs, b, argcnt, iterc);
+                            clean_up(sock, out, bs, b, argcnt, iterc);
                             return GDK_FAIL;
                         }
 
@@ -1032,14 +1163,14 @@
                             case 5: /* COLLECTION: nothing to be done, here */ 
   break;
                             default:
                                 GDKerror("CMDhttp_post: should have never 
reached here!");
-                                clean_up(sock, in, out, bs, b, argcnt, iterc);
+                                clean_up(sock, out, bs, b, argcnt, iterc);
                                 return GDK_FAIL;
                         }
                     }   break;
                     default:
                         GDKerror("CMDhttp_post: call%d/sequence%d/item%d has 
invalid type %d\n",
                                 my_iter, my_argc, i);
-                        clean_up(sock, in, out, bs, b, argcnt, iterc);
+                        clean_up(sock, out, bs, b, argcnt, iterc);
                         return GDK_FAIL;
                 }
             }
@@ -1065,122 +1196,32 @@
             XRPCD_CALLBACK, dst, b->pos, b->buf);
     if (bytes_sent < b->pos) {
         GDKerror("CMDhttp_post: failed to send XRPC request.");
-        clean_up(sock, in, out, bs, b, argcnt, iterc);
+        clean_up(sock, out, bs, b, argcnt, iterc);
         return GDK_FAIL;
     }
     /* Stop timing Network Send Client2Server */
     time_xrpcClnt2Serv= GDKusec() - time_xrpcClnt2Serv;
 
-    /*** Receive all response data ***/
-    b->pos = b->buf[0] = 0;
-    do{
-        errno = 0;
-        ret = stream_read(in, (b->buf + b->pos), 1, (b->len - b->pos));
-        b->pos += ret;
-        if (b->pos > (b->len * 0.8)) {
-            b->len *= 2;
-            b->buf = realloc(b->buf, b->len);
-        }
-    } while (ret > 0);
-    b->buf[b->pos] = 0;
-    bytes_received = b->pos;
-
-    if ((ret < 0) || (b->pos == 0)) {
-        GDKerror("CMDhttp_post: failed to receive response from %s: %s\n",
-                dst, errno?strerror(errno):".");
-        return GDK_FAIL;
-    }
-
-    /* Start timing Client DeSerialisation */
-    time_xrpcClntDeSeria = GDKusec();
-    /* the HTTP header starts with "HTTP/1.x xxx", after the first
-     * white space character, it is the 3-digits return code */
-    strptr = strchr(b->buf, (int)' ') + 1;
-    if (!strptr ||
-        (strstr(strptr, "200") != strptr) ||
-        strstr(strptr, "<env:Fault") ) {
-
-        /* get the error code, error string out of the message */
-        if(strptr){
-            ptr = strchr(strptr, '\r');
-            if (ptr){
-                ptr[0] = '\0';
-                GDKerror("HTTP Error Code  : %s\n", b->buf);
-                strptr = strstr(ptr+1, "<env:Fault");
-                if(strptr){
-                    strptr = strstr(strptr, "<env:Value>") + 11;
-                    ptr = strchr(strptr, '<');
-                    ptr[0] = '\0';
-                    GDKerror("SOAP Fault Code  : %s\n", strptr);
-                    strptr = strstr(ptr+1, "<env:Text") + 10;
-                    strptr = strchr(strptr, '>') + 1;
-                    ptr = strchr(strptr, '<');
-                    ptr[0] = '\0';
-                    GDKerror("SOAP Fault Reason: %s\n", strptr);
-                } 
-            }
-        } else {
-            GDKerror("CMDhttp_post: remote execution failed, "
-                     "no further information can be provided\n");
-        }
-        clean_up(sock, in, out, bs, b, argcnt, iterc);
-        return GDK_FAIL;
-    }
-
-    if (!(shredBAT = BATnew(TYPE_str, TYPE_bat, 32))){
-        GDKerror("CMDhttp_post: failed to malloc shredBAT\n");
-        clean_up(sock, in, out, bs, b, argcnt, iterc);
+    shredBAT = response2bat(sock, dst, b, *updCall, &time_xrpcClntDeSeria);
+    if(!shredBAT) {
+        clean_up(sock, out, bs, b, argcnt, iterc);
         return GDK_FAIL;
     }
-    if (!(*updCall)){
-        bit verbose = FALSE;
-        strptr = strstr(b->buf, "<env:Envelope");
-        ptr = strrchr(b->buf, '>'); /* Find end of "</env:Envelope>" */
-        ptr[1] = '\0';
-        if (!strptr || CMDshred_str(shredBAT, strptr, &percentage, NULL, 
&verbose) == GDK_FAIL) {
-            GDKerror("CMDhttp_post: invalid XRPC response received\n");
-            GDKerror("%s\n", strptr?strptr:b->buf);
-            clean_up(sock, in, out, bs, b, argcnt, iterc);
-            if(BBPreclaim(shredBAT) == -1){
-                GDKerror("CMDhttp_post: failed to destroy BAT "
-                        "\"shredBAT\", because it is in use by other "
-                        "process!");
-                GDKerror("THIS SHOULD NEVER HAPPEN!!!");
-            }
-            return GDK_FAIL;
-        }
-    } else {
-        ptr = b->buf;
-    }
-    time_xrpcClntDeSeria = GDKusec() - time_xrpcClntDeSeria;
 
     if (options && strstr(options, "timing")) {
         stream_printf(GDKout,
                 "XRPC_Client_Serialisation:     %lld microsec\n"
                 "XRPC_Network_Client_2_Server:  %lld microsec\n"
                 "XRPC_Data_Sent/Received:       " SZFMT " / "
-                                                  SZFMT " bytes\n\n",
+                                                  SZFMT " bytes\n\n"
+                "XRPC_Client_DeSerialisation:   %lld microsec\n",
                 time_xrpcClntSeria,
                 time_xrpcClnt2Serv,
-                bytes_sent, bytes_received);
-    }
-
-    ptr = strstr(ptr, "XRPC_Server_Timing");
-    if (ptr) {
-        ptr += 18; /* Skip "XRPC_Server_Timing" */
-        /* Find the start of the real timing info */
-        ptr = strstr(ptr, "XRPC");
-        stream_printf(GDKout, strstr(ptr, "XRPC_"));
-        stream_printf(GDKout, "\n");
-    }
-
-    if (options && strstr(options, "timing")) {
-        stream_printf(GDKout,
-                "XRPC_Client_DeSerialisation:   %lld microsec\n",
+                bytes_sent, bytes_received,
                 time_xrpcClntDeSeria);
     }
 
-    clean_up(sock, in, out, bs, b, argcnt, iterc);
+    clean_up(sock, out, bs, b, argcnt, iterc);
     *res = shredBAT;
     return GDK_SUCCEED;
 }

Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.300
retrieving revision 1.301
diff -u -d -r1.300 -r1.301
--- pathfinder.mx       27 Feb 2007 09:59:10 -0000      1.300
+++ pathfinder.mx       2 Mar 2007 18:23:59 -0000       1.301
@@ -5390,7 +5390,7 @@
 
     if (flags&1){
         /* print timing ourselves */
-        stream_printf(GDKout, "\nXRPC_Server_Timing:\n"
+        fprintf(stdout, "\n"
                "XRPC_Server_Application:       %lld microsec\n"
                "XRPC_Network_Server_2_Client:  %lld microsec\n",
                (*ctx->time_shred + *ctx->time_compile + *ctx->time_exec),

Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -d -r1.17 -r1.18
--- xrpc_server.mx      2 Mar 2007 12:35:43 -0000       1.17
+++ xrpc_server.mx      2 Mar 2007 18:23:59 -0000       1.18
@@ -1016,7 +1016,7 @@
     }
 
     if (timing) {
-        stream_printf(GDKout,
+        fprintf(stdout,
                 "XRPC_Server_DeSerialisation:   %lld microsec\n",
                 time_xrpcServDeSeria);
     }
@@ -1188,10 +1188,6 @@
         snprintf(datadir, 1024, "%s%cMonetDB%cxrpc", s, DIR_SEP, DIR_SEP);
     }
     shttpd_setopt("document_root", datadir);
-/*
-    shttpd_setopt("error_log", "c:\\tmp\\err.txt");
-    shttpd_setopt("debug", "1");
- */
 
     shttpd_init(NULL); /* Initialize httpsd thread */
 


-------------------------------------------------------------------------
Take Surveys. Earn Cash. Influence the Future of IT
Join SourceForge.net's Techsay panel and you'll get the chance to share your
opinions on IT & business topics through brief surveys-and earn cash
http://www.techsay.com/default.php?page=join.php&p=sourceforge&CID=DEVDEV
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to