Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv2219/runtime

Modified Files:
      Tag: xrpcdemo
        pathfinder.mx serialize.mx xrpc_server.mx 
Log Message:
- some more work on XRPC, 2pc and nested read-only calls are a bit
  tested and seem to work 



U serialize.mx
Index: serialize.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize.mx,v
retrieving revision 1.109.4.3
retrieving revision 1.109.4.4
diff -u -d -r1.109.4.3 -r1.109.4.4
--- serialize.mx        8 Jun 2008 01:02:37 -0000       1.109.4.3
+++ serialize.mx        9 Jun 2008 05:27:18 -0000       1.109.4.4
@@ -2022,7 +2022,7 @@
     (void) driverArg;
     if (trace) {
         stream* logstream = xquery_print_trace("res",*xrpc_seqnr);
-        fp = logstream?open_teestream(fp, logstream):NULL;
+        fp = logstream?attach_teestream(fp, logstream):NULL;
         if (fp == NULL) return GDK_FAIL;
     }
     ret = xquery_print_result_driver (
@@ -2048,10 +2048,7 @@
                xrpc_timeout, 
                xrpc_start);
   
-     if (trace) {
-         stream_close(fp);
-         stream_destroy(fp);
-     }
+     if (trace) detach_teestream(fp);
      return ret; 
 }
 

U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.4.10
retrieving revision 1.68.4.11
diff -u -d -r1.68.4.10 -r1.68.4.11
--- xrpc_server.mx      8 Jun 2008 09:01:22 -0000       1.68.4.10
+++ xrpc_server.mx      9 Jun 2008 05:27:18 -0000       1.68.4.11
@@ -208,7 +208,7 @@
                  " xsi:schemaLocation=\"" XRPC_NS " " XRPC_LOC "\">"
 
 #define XRPC_WS_QID\
-      "<wscoor:CoordinationContext xmlns:wscoor=\""WSCOOR_NS"\""\
+      "<wscoor:CoordinationContext xmlns:wscoor=\""WSCOOR_NS"\" "\
                  "env:mustUnderstand=\"true\">"\
         "<wscoor:Identifier>%s</wscoor:Identifier>"\
         "<wscoor:Expires>"LLFMT"</wscoor:Expires>"\
@@ -533,7 +533,7 @@
         BUN p,q;
         BATiter xrpc_trustedi = bat_iterator(xrpc_trusted);
         BATloop(xrpc_trusted, p, q) {
-            char* prefix = BUNhead(xrpc_trustedi, p);
+            char* prefix = BUNtail(xrpc_trustedi, p);
             len_x += strlen(prefix) + 4;
         }
         err = alloca(len + len_x);
@@ -545,7 +545,7 @@
         } else {
             pos += snprintf(err+pos, len-pos, "\n'%s' not in", location);
             BATloop(xrpc_trusted, p, q) {
-                char* prefix = BUNhead(xrpc_trustedi, p);
+                char* prefix = BUNtail(xrpc_trustedi, p);
                 pos += snprintf(err+pos, len-pos, "\n'%s',", prefix);
             }
             err[pos-1] = '.';
@@ -571,15 +571,6 @@
     return -1;
 } 
 
-static BAT* 
-get_elt_qn(BAT* pre_kind, BAT* pre_prop) {
-    int elt = ELEMENT;
-    BAT *elt_qn, *tmp = BATselect(pre_kind, &elt, &elt);
-    if (!tmp) return NULL;
-    elt_qn = BATsemijoin(pre_prop, tmp);
-    BBPreclaim(tmp);
-    return elt_qn;
-}
 
 XRPCreq_t *
 xrpc_parse_message(stream *out,
@@ -590,20 +581,19 @@
     XRPCreq_t *req = NULL, *res = NULL;
     char* msg = participants?XRPC_RESPONSE:XRPC_REQUEST;
     char *module = NULL, *method = NULL, *location = NULL;
-    char *qid = NULL, *host = NULL, *caller = "query,0";
-    char *querynr_str = NULL, *timeout_str = NULL, *mode_str = NULL;
+    char *mode = NULL, *qid = NULL, *caller = "query,0";
     char *arity_str = NULL, *itercnt_str = NULL;
     char *pul = NULL, *val = NULL;
     int updCall = FALSE;
-    lng timeout = 300000, argc = GDK_lng_min, iterc = -1, i = 0, j = 0, k = 0;
+    lng timeout = 30000, argc = GDK_lng_min, iterc = -1, i = 0, j = 0, k = 0;
     char errstr[1024];
 
-    BATiter shredBATi, prop_vali, qn_uli;
+    BATiter shredBATi, prop_vali, qn_uli, qn_histi;
     BAT *pre_size = NULL, *pre_level = NULL, *pre_kind = NULL,  *pre_prop = 
NULL;
-    BAT *qn_loc   = NULL, *qn_uri_loc = NULL;
+    BAT *qn_loc   = NULL, *qn_uri_loc = NULL, *qn_histo = NULL;
     BAT *prop_val = NULL,  *prop_text = NULL;
     BAT *attr_own = NULL,  *attr_qn = NULL, *attr_prop = NULL;
-    BAT *frag_root = NULL, *elt_qn = NULL;
+    BAT *frag_root = NULL;
     int   *pre_sizeT = NULL; /* Arrays holding the Tail values of some of the 
BATs above. */
     char  *pre_levelT = NULL, *pre_kindT = NULL;
     char  *text_base = NULL; /* text base of prop_text tail values */
@@ -611,7 +601,7 @@
     oid   *pre_propT = NULL;
     var_t *prop_textT = NULL;
 
-    oid msg_node_pre = 0, qid_node_pre = 0, val_node_pre = 0;
+    oid msg_node_pre = 0, hdr_node_pre = 0, val_node_pre = 0;
     oid call_node_pre = 0,  seq_node_pre = 0,  tpe_node_pre = 0;
     oid next_call_node_pre = 0, next_seq_node_pre = 0, next_tpe_node_pre = 0;
     oid ao_ptr = 0; /* cursor in the attr_own bat */
@@ -640,6 +630,7 @@
     @:getbat(prop_text,PROP_TEXT)@
     @:getbat(prop_val,PROP_VAL)@
     @:getbat(qn_uri_loc,QN_URI_LOC)@
+    @:getbat(qn_histo,QN_HISTOGRAM)@
     @:getbat(qn_loc,QN_LOC)@
     @:getbat(frag_root,FRAG_ROOT)@
     @:getbat(attr_own,ATTR_OWN)@
@@ -648,6 +639,7 @@
 
     prop_vali = bat_iterator(prop_val);
     qn_uli = bat_iterator(qn_uri_loc);
+    qn_histi = bat_iterator(qn_histo);
 
     pre_sizeT  = (int*)  Tloc(pre_size, BUNfirst(pre_size));
     pre_levelT = (char*) Tloc(pre_level, BUNfirst(pre_level));
@@ -663,6 +655,58 @@
     frag_root  = BATsetaccess(frag_root, BAT_APPEND);
     nattrs     = BATcount(attr_prop);
 
+    if (participants) {
+        /* parse a SOAP header for participants (XRPC response case) */
+        if ((hdr_node_pre = get_pre_by_qname(XRPC_NS"|participants",
+                        msg_node_pre, msg_node_pre + pre_sizeT[msg_node_pre] + 
1,
+                        pre_propT, pre_kindT, qn_uri_loc)))
+        {
+            oid hdr_node_end = hdr_node_pre + pre_sizeT[hdr_node_pre];
+            while(++hdr_node_pre <= hdr_node_end) {
+                if (pre_kindT[hdr_node_pre] == ELEMENT) {
+                    char *s = (char*) BUNtail(qn_uli, pre_propT[hdr_node_pre]);
+                    if (strcmp(s, XRPC_NS"|participant") == 0) {
+                        oid end_pre = hdr_node_pre + pre_sizeT[hdr_node_pre];
+                        while(++hdr_node_pre <= end_pre) {
+                            if (pre_kindT[hdr_node_pre] == TEXT) {
+                                s = text_base + 
prop_textT[pre_propT[hdr_node_pre]];
+                                BUNappend(participants, s, FALSE);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    } else {
+        /* parse a SOAP header for qid and timeout  (XRPC request case)*/
+        if ((hdr_node_pre = get_pre_by_qname(WSCOOR_NS"|CoordinationContext",
+                        msg_node_pre, msg_node_pre + pre_sizeT[msg_node_pre] + 
1,
+                        pre_propT, pre_kindT, qn_uri_loc)))
+        {
+            oid hdr_node_end = hdr_node_pre + pre_sizeT[hdr_node_pre];
+            while(++hdr_node_pre <= hdr_node_end) {
+                if (pre_kindT[hdr_node_pre] == ELEMENT) {
+                    char *s = (char*) BUNtail(qn_uli, pre_propT[hdr_node_pre]);
+                    int isqid = (strcmp(s, WSCOOR_NS"|Identifier") == 0);
+                    if (isqid || strcmp(s, WSCOOR_NS"|Expires") == 0) {
+                        oid end_pre = hdr_node_pre + pre_sizeT[hdr_node_pre];
+                        while(++hdr_node_pre <= end_pre) {
+                            if (pre_kindT[hdr_node_pre] == TEXT) {
+                                s = text_base + 
prop_textT[pre_propT[hdr_node_pre]];
+                                if (isqid) {
+                                    qid = GDKstrdup(s);
+                                } else {
+                                    timeout = my_strtoll(out, FALSE, s, 
"timeout");
+                                    if (timeout == GDK_lng_min) goto cleanup;
+                                }
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
 
     if (!(msg_node_pre = get_pre_by_qname(msg, 2, BATcount(pre_size), 
pre_propT, pre_kindT, qn_uri_loc))) {
         send_err(out, ERR404, "env:Sender", msg);
@@ -670,8 +714,11 @@
     }
     call_node_pre = msg_node_pre;
 
-    if (participants == NULL) {
-        /* XRPC request message parsing */
+    /* analyze the XRPC request information */
+    if (participants) {
+        argc = 1; /* responses have only a out param (argc=1) ie a result */
+    } else {
+        /* more XRPC request message parsing: the XRPC Request in the body 
contains module/method etc info */
         while(ao_ptr < nattrs && attr_ownT[ao_ptr] < msg_node_pre) ao_ptr++;
         while(ao_ptr < nattrs && attr_ownT[ao_ptr] == msg_node_pre) {
             pul = (char*) BUNtail(qn_uli, BUNfirst(qn_uri_loc) + 
attr_qnT[ao_ptr]);
@@ -687,7 +734,7 @@
             } else if(strcmp(pul, XRPC_NS"|iter-count") == 0) {
                 itercnt_str = val;
             } else if(strcmp(pul, XRPC_NS"|mode") == 0) {
-                mode_str = val;
+                mode = val;
             } else if(strcmp(pul, XRPC_NS"|caller") == 0) {
                 caller = val;
             } else if(strcmp(pul,XRPC_NS"|updCall") == 0) {
@@ -710,93 +757,21 @@
             if(iterc == GDK_lng_min) goto cleanup;
         }
 
-        /* Does this request require any isolation support? */
-        if ((qid_node_pre = get_pre_by_qname(XRPC_NS"|queryID",
-                        msg_node_pre, msg_node_pre + pre_sizeT[msg_node_pre] + 
1,
-                        pre_propT, pre_kindT, qn_uri_loc))) {
-            while(ao_ptr < nattrs && attr_ownT[ao_ptr] < qid_node_pre) 
ao_ptr++;
-            while(ao_ptr < nattrs && attr_ownT[ao_ptr] == qid_node_pre) {
-                pul = (char*) BUNtail(qn_uli, BUNfirst(qn_uri_loc) + 
attr_qnT[ao_ptr]);
-                val = (char*) BUNtail(prop_vali, BUNfirst(prop_val) + 
attr_propT[ao_ptr]);
-                if(strcmp(pul, XRPC_NS"|host") == 0) {
-                    host = val;
-                } else if(strcmp(pul, XRPC_NS"|querynr") == 0) {
-                    querynr_str = val;
-                } else if(strcmp(pul, XRPC_NS"|timeout") == 0) {
-                    timeout_str = val;
-                    timeout = my_strtoll(out, FALSE, timeout_str, "timeout");
-                    if (timeout == GDK_lng_min) goto cleanup;
-                } else {
-                    snprintf(errstr, 1024, 
-                        "Invalid attribute for the \""XRPC_NS":queryID\" 
element: %s\n", pul);
-                    send_err(out, ERR404, "env:Sender", errstr);
-                    goto cleanup;
-                }
-                ao_ptr++;
-            }
-            if(!(host && querynr_str && timeout_str)) {
-                send_err(out, ERR404, "env:Sender", "A \""XRPC_NS":queryID\" 
element "
-                    "must contain all three attributes: host, querynr, 
timeout\n");
-                goto cleanup;
-            }
-
-            k = strlen(host) + strlen(querynr_str) + 2;
-            if(!(qid = GDKmalloc(k))) {
-                send_err(out, ERR500, "env:Receiver", OUT_OF_MEM);
-                goto cleanup;
-            }
-
-            val = qid;
-            pul = qid + k;
-            while(*host && val < pul) *val++ = *host++;
-            *val++ = '|';
-            while(*querynr_str && val < pul) *val++ = *querynr_str++;
-            *val = '\0';
-        }
         call_node_pre = get_pre_by_qname(XRPC_NS"|call", 
MAX(call_node_pre,msg_node_pre),
             msg_node_pre + pre_sizeT[msg_node_pre] + 1,
             pre_propT, pre_kindT, qn_uri_loc);
 
         argc = my_strtoll(out, FALSE, arity_str, "arity");
+        if (argc == GDK_lng_min) goto cleanup;
     }
     if (iterc == -1) {
         /* if no iterc was found in the request (and always for XRPC 
responses), just count sequence elements */
-        BAT *qn_seq = BATselect(qn_uri_loc, XRPC_NS"|sequence", 
XRPC_NS"|sequence");
-        elt_qn = get_elt_qn(pre_kind, pre_prop);
-        if (elt_qn && qn_seq) {
-            BAT* elt_seq = BATmirror(BATsemijoin(BATmirror(elt_qn), qn_seq));
-            if (elt_seq) {
-                iterc = BATcount(elt_seq);
-                BBPreclaim(elt_seq);
-            }
-        }
-        if (iterc == -1) goto cleanup;
+        BUN p = BUNfnd(BATmirror(qn_uri_loc), XRPC_NS"|sequence");
+        iterc = (p == BUN_NONE)?1:(*(lng*) BUNtail(qn_histi, p) / argc);
     }
-    if (participants) {
-        /* for XRPC response messages we add all participants to the 
participant bat */
-        BAT *qn_part = BATselect(qn_uri_loc, XRPC_NS"|participant", 
XRPC_NS"|participant");
-        if (qn_part) {
-            BAT *elt_part = BATmirror(BATsemijoin(BATmirror(elt_qn),qn_part));
-            if (elt_part) {
-                BATiter pi = bat_iterator(elt_part);
-                BUN p,q;
-                BATloop(elt_part,p,q) {
-                    oid pre = 1 + *(oid*) BUNhead(pi,p);
-                    if (pre_kindT[pre] == TEXT) {
-                        str part = text_base + pre_propT[pre];
-                        BUNappend(participants, part, FALSE);
-                    }
-                }
-                BBPreclaim(elt_part);
-                argc = 1;
-            }
-            BBPreclaim(qn_part);
-        }
-    } 
-    if (argc == GDK_lng_min) goto cleanup;
 
     /* the req struct contains all parsed data (we use it also for response 
messages) */ 
-    if(!(req = XRPCreq_new(qid, caller, timeout, mode_str, module, method, 
location, updCall, iterc, argc))) {
+    if(!(req = XRPCreq_new(qid, caller, timeout, mode, module, method, 
location, updCall, iterc, argc))) {
         send_err(out, ERR500, "env:Receiver", OUT_OF_MEM);
         goto cleanup;
     }
@@ -925,7 +900,6 @@
 @c
 cleanup:
     if (req) XRPCreq_free(req);
-    if (elt_qn) BBPreclaim(elt_qn);
     @:delbat(pre_level)@
     @:delbat(pre_prop)@
     @:delbat(pre_kind)@
@@ -933,6 +907,7 @@
     @:delbat(prop_val)@
     @:delbat(qn_uri_loc)@
     @:delbat(qn_loc)@
+    @:delbat(qn_histo)@
     @:delbat(frag_root)@
     @:delbat(attr_own)@
     @:delbat(attr_qn)@
@@ -1047,10 +1022,9 @@
         /* send empty HTTP OK header for updating request */
         stream_write(mc->c->fdout, "HTTP/1.1 200 OK\r\n"
                     "Content-type: text/xml; charset=\"utf-8\"\r\n\r\n", 1, 
60);
+        stream_flush(mc->c->fdout);
     }
-
-    stream_flush(mc->c->fdout);
-       GDKsetbuf(errbuf_bak);
+    GDKsetbuf(errbuf_bak);
     return ret;
 }
 
@@ -1319,9 +1293,6 @@
     return -1;
 }
 
-/* the thread that checks every 5 seconds for expired queries */
-MT_Id checker;
-
 int
 CMDrpcd_start(int *port, bit *open, str options)
 {

U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.11
retrieving revision 1.416.2.1.2.12
diff -u -d -r1.416.2.1.2.11 -r1.416.2.1.2.12
--- pathfinder.mx       8 Jun 2008 01:02:34 -0000       1.416.2.1.2.11
+++ pathfinder.mx       9 Jun 2008 05:27:17 -0000       1.416.2.1.2.12
@@ -514,7 +514,7 @@
 var xrpc_statuses := bat("xrpc_statuses");  # bat[void,str] 2PC status of this 
query: "exec", "wait", "prepare", "commit", "abort" or "timeout"
 var xrpc_lock     := pflock_get(6); # master XRPC lock, to protect above 
xrpc_* BATs
 var xrpc_hostport := my_hostname() + ":" + str(get_xrpc_port());
-var xrpc_querynr  := 999999LL;
+var xrpc_querynr  := (lng(get_xrpc_port()) * 100000LL) - 1LL;
 
 # abort a 2PC transaction, keeping the record (note ws should have been 
destroyed already)
 PROC _ws_xrpc_abort(oid idx, str status) : void
@@ -523,7 +523,7 @@
     var ws := xrpc_wsbats.find(idx);
     ws_destroy(ws);
     xrpc_statuses.inplace(idx, status);
-    xrpc_wsbats.inplace(idx, bat("xrpc_statuses"));
+    xrpc_wsbats.inplace(idx, xrpc_statuses); # use xrpc_statuses as a dummy 
bat, replacing the ws (that gets freed)
     xrpc_locks.inplace(idx, lock_nil);
     lock_destroy(wslock);
 }
@@ -1498,19 +1498,6 @@
     return ws;
 }
 
-   
-var update := 0;
-var xrpc_qid := "1";
-var xrpc_seqnr := 1LL;
-var xrpc_timeout := 30000LL;
-var xrpc_mode := "repeatable";
-var xrpc_coord := false;
-var ws, wsid, xrpc, idx; 
-xrpc := _ws_xrpcget();
-idx := oid(abs(xrpc));
-ws := _ws_new(xrpc, idx, update);
-wsid := ws_id(ws);
-
 PROC ws_end(BAT[void,bat] ws, str err) : int 
 {
     ws_log_wsid := ws_id(ws);


-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to