Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv18847/runtime

Modified Files:
      Tag: xrpcdemo
        pathfinder.mx pf_support.mx serialize.mx serialize_dflt.mx 
        xrpc_client.mx xrpc_server.mx 
Log Message:
Initial 2PC implementation

THIS CODE COMPILES, BUT IS NOT TESTED/DEBUGGED. IT DOES NOT WORK!!

JUST CHECKED IN TO PROVUIDE JENNIE SOME WORK AND I CAN GIVE BACK THE CST LAPTOP


pathfinder.mx:
==============
- add xquery_2pc_prepare(mapi_client *mc, str qid, ptr hdl)
  a function that fires off the MIL proc for 2PC commit (when a Prepare request 
comes in)

 
pf_support.mx
=============
- collect_update_tape() now returns whether the tape actually containes updates 
(bit)
  this allows us to exclude peers from the 2PC thing that were only marked as 
updating 
  peers because they did an execute at of an updating function. 

- remove do_2phase_commit() : void
  add log_trans_precommit / xrpc_wait_for_commit in commit protocol 


xrpc_client.mx:
===============
- xrpc_2pc_send/xrpc_2pc_receive
  helper functions that send and receive a WSAT message

- CMDxrpc_2pc_commit(str qid, lng* xrpc_timeout, lng* time_start, BAT* 
participants)

  a new MIL command that performs 2PC commit coordination, sending out Prepare 
to
  all participants, and then Commit/Abort to all (depending on the success of 
Prepare)

xrpc_server.mx & xrpc_client.mx:
================================
- the mode attribute is now part of the XRPC SOAP header in a new element 
(xrpc:header) 
  this is required as Prepare/Commit also need a mode but do not have an 
xrpc:request
  where mode was previously an attribute.


xrpc_server.mx:
===============
some whitespace changes in xrpc_parse_message. skip_text_nodes not only skips 
text nodes
now, but in fact all nodes that are not elements.

renames
- get_xrpc_logdir => xrpc_log_dir
- xquery_print_trace => xrpc_log_message

xrpc_handle_wsat request that handles HTTP POST requests starting with the 
"xrpc/wsat" prefix.
WSAT message parsing is rudimentary.

two new MIL commands for 2PC:
.COMMAND xrpc_wait_for_commit(str qid, str caller, lng seqnr, ptr response) : 
void = CMDxrpc_2pc_wait;
"report back Prepared, wait for Commit"
.COMMAND xrpc_confirm_commit(str qid, str caller, lng seqnr, ptr response) : 
void = CMDxrpc_2pc_confirm;
"report back Committed"

a new xrpc_commit_t handle struct that is used when a Prepare comes in to 
record all details of the
incoming HTTP request. A new xrpc_handle_wsat_request then fires off a MIL 
execution to commit
the accumulated pending update list. In the middle of the ensuing MIL commit, 
xrpc_wait_for_commit()
(see above) is then called with a pointer to the handle (passed in the 
query-global MIL variable xrpc_hdl)  
This COMMAND then sends back the HTTP response on the still open stream and 
starts waiting on a 
semaphore. This semaphore is unlocked when a Commit message comes in (with 
matching qid).
Then the MIL commit resumes until collaing back to xrpc_confirm_commit, that 
again sends back
the HTTP response.



serialize.mx:
============
add (orange,green) colors for Commit/Prepare calls in dot print.


milprint_summer.c:
==================
- use the dirty bit returned by collect_update_tape to properly mark a 
participant as a 
  real updater, i.e. updating= { bit(nil)=>rdonly, false=>nothing updated, 
true=>dirty }  

- add call to xrpc_commit() in the query epilogue, if the query is the XRPC 2PC 
corrdinator 



U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.15
retrieving revision 1.416.2.1.2.16
diff -u -d -r1.416.2.1.2.15 -r1.416.2.1.2.16
--- pathfinder.mx       12 Jun 2008 14:47:28 -0000      1.416.2.1.2.15
+++ pathfinder.mx       17 Jun 2008 16:32:20 -0000      1.416.2.1.2.16
@@ -1509,9 +1509,9 @@
         CATCH(_ws_xrpc_end(xrpc_qid, err));
         lock_unset(xrpc_lock);
     }
-    # do not auto-restart XRPC updates (deadlock danger) 
+    # do not auto-restart XRPC updates (deadlock danger)
     if (xrpc_method != "") return 2;
-    return 1; 
+    return 1;
 }
 
 PROC ws_create(int update) : BAT[void,bat]
@@ -4185,6 +4185,7 @@
 pathfinder_export char* xquery_method(mapi_client*, int, lng*, char*, char*, 
char*, char*, char*, char*, lng, lng, lng, lng**, int*, str*, BAT*);
 pathfinder_export void  xquery_client_end(mapi_client *, char *); 
 pathfinder_export char* xquery_parse_val(int, char*, BAT*, BAT*, BAT* , BAT* , 
BAT* , char*, oid);
+pathfinder_export lng   xquery_2pc_exec(mapi_client *mc, str qid, ptr hdl);
 
 /* the xquery builtin type hierarchy */
 typedef struct {
@@ -4400,6 +4401,7 @@
     lng   *xrpc_seqnr;
     char **xrpc_qid;
     char **xrpc_caller;
+    ptr   *xrpc_hdl;
 
     char **genType;
 
@@ -4528,6 +4530,20 @@
     return ret;
 }
 
+/*
+ * fire off a MIL runtime proc that executes transaction commit (2pc, the 
perpare request) 
+ */
+lng
+xquery_2pc_exec(mapi_client *mc, str qid, ptr hdl)
+{
+    xquery_client *ctx = (xquery_client*) mc->fc;
+    *(ctx->xrpc_hdl) = hdl;
+    *(ctx->xrpc_qid) = GDKstrdup(qid);
+    if (!xquery_mil_exec(ctx, "{ var ws; CATCH(execute_update_tape(ws := 
ws_create(0)); })"))
+        return 0;
+    return *ctx->xrpc_seqnr; /* return the seqnr we got assigned in MIL (used 
for message logging) */
+}
+
 static char* xquery_parse_ident(char* p); 
 static char* xquery_parse_space(char* p); 
 static char* xquery_parse_string(char* p, char *buf, int len); 
@@ -5339,6 +5355,7 @@
     @:find_var(xrpc_module,str,sval)@
     @:find_var(xrpc_method,str,sval)@
     @:find_var(xrpc_caller,str,sval)@
+    @:find_var(xrpc_hdl,ptr,pval)@
     @:find_var(xrpc_qid,str,sval)@
     @:find_var(xrpc_mode,str,sval)@
     @:find_var(xrpc_seqnr,lng,lval)@

U serialize.mx
Index: serialize.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize.mx,v
retrieving revision 1.109.4.11
retrieving revision 1.109.4.12
diff -u -d -r1.109.4.11 -r1.109.4.12
--- serialize.mx        12 Jun 2008 14:47:29 -0000      1.109.4.11
+++ serialize.mx        17 Jun 2008 16:32:23 -0000      1.109.4.12
@@ -303,8 +303,6 @@
     lng                *xrpc_timeout,
     lng                *time_exec);
 
-stream* xquery_print_trace(str msg, lng seqnr);
-
 #endif /* SERIALIZE_H */
 @c
 
@@ -316,6 +314,7 @@
 #include <pf_config.h>
 #include "serialize.h"
 #include "pf_support.h"
+#include "xrpc_server.h"
 
 /**
  *
@@ -1978,36 +1977,6 @@
     return GDK_SUCCEED;
 }
 
-char xrpc_logdir_buf[1024], *xrpc_logdir = NULL;
-static char* get_xrpc_logdir() {
-    char *logdir;
-    if (xrpc_logdir == NULL && (logdir = GDKgetenv("datadir")) != NULL) {
-        snprintf(xrpc_logdir_buf, 1024, "%s%cMonetDB%cxrpc%clogs%c", logdir, 
DIR_SEP, DIR_SEP, DIR_SEP, DIR_SEP);
-        if (GDKcreatedir(xrpc_logdir_buf)) {
-            xrpc_logdir = xrpc_logdir_buf;
-        } else {
-            GDKerror("get_xrpc_logdir: directory %s not writable.", 
xrpc_logdir_buf);
-        }
-    }
-    return xrpc_logdir;
-}
-
-stream*
-xquery_print_trace(str msg, lng xrpc_seqnr) {
-    char logfile[1024], *logdir = get_xrpc_logdir();
-    stream* fp = NULL;
-    if (logdir) {
-        snprintf(logfile, 1024, "%s%s_" LLFMT ".xml", logdir, msg, xrpc_seqnr);
-        fp = open_wastream(logfile);
-    }
-    if (!fp || stream_errnr(fp)) {
-        GDKerror("print_result: could not open logfile for writing\n");
-        if (fp) stream_destroy(fp);
-        return NULL;
-    }
-    return fp;
-}
-
 int
 draw_queryflow(
         int xrpc_updating,
@@ -2018,7 +1987,7 @@
 {
     int ret = 0;
     stream *s = NULL;
-    char *ptr = NULL, *logdir = get_xrpc_logdir();
+    char *ptr = NULL, *logdir = xrpc_log_dir();
     char base_name[2048], dotFile[2052], caller[1024], dst[1024];
     char dstNID[1024], fcn[1024], time[64], *seqnr = NULL;
     char dotcmd[1024];
@@ -2026,7 +1995,7 @@
     BATiter bi;
 
     if (logdir == NULL) return GDK_FAIL;
-    snprintf(base_name, 2048, "%sqflow_"LLFMT, logdir, xrpc_seqnr);
+    snprintf(base_name, 2048, "%s%cqflow_"LLFMT, logdir, DIR_SEP, xrpc_seqnr);
     snprintf(dotFile, 2048, "%s.dot", base_name);
 
     /* replace all non-alpha-numeric chars in the string with '_', so
@@ -2052,7 +2021,7 @@
             " 
query->%s_"LLFMT"[headURL=\"http://%s:%d/logs/req_"LLFMT".xml\"]\n";
             " 
%s_"LLFMT"->query[label=\""LLFMT"ms\",headURL=\"http://%s:%d/logs/res_"LLFMT".xml\"]\n\n";,
             dstNID,
-            xrpc_updating?"orange":".7 .3 1.0",
+            
(xrpc_updating==1)?"orange":(xrpc_updating==2)?"red":(xrpc_updating==3)?"green":".7
 .3 1.0",
             xrpc_hostname, xrpc_port,
             dstNID, xrpc_seqnr, xrpc_method,
             dstNID, xrpc_seqnr, xrpc_hostname, xrpc_port, xrpc_seqnr,
@@ -2060,7 +2029,7 @@
 
     bi = bat_iterator(participants);
     BATloop(participants, p, q) {
-        if (sscanf(BUNtail(bi,p), "%[^','],%[^','],%[^','],%[^','],%d", 
caller, dst, fcn, time, &xrpc_updating) != 5) {
+        if (sscanf(BUNtail(bi,p), "%[^','],%[^','],%[^','],%s,%d", caller, 
dst, fcn, time, &xrpc_updating) != 5) {
             GDKerror("draw_queryflow: sscanf failed");
             stream_destroy(s);
             return GDK_FAIL;
@@ -2092,7 +2061,7 @@
                 " %s->%s_%s[headURL=\"http://%s/logs/req_%s.xml\"]\n";
                 " 
%s_%s->%s[label=\"%sms\",headURL=\"http://%s/logs/res_%s.xml\"]\n\n";,
                 dstNID,
-                xrpc_updating?"orange":".7 .3 1.0",
+                
(xrpc_updating==1)?"orange":(xrpc_updating==2)?"red":(xrpc_updating==3)?"green":".7
 .3 1.0",
                 dst,
                 dstNID, seqnr, fcn,
                 caller, dstNID, seqnr, dst, seqnr,
@@ -2111,6 +2080,39 @@
     return GDK_SUCCEED;
 }
 
+
+char xrpc_logdir_buf[1024], *xrpc_logdir = NULL;
+char*
+xrpc_log_dir() {
+    char *logdir;
+    if (xrpc_logdir == NULL && (logdir = GDKgetenv("datadir")) != NULL) {
+        snprintf(xrpc_logdir_buf, 1024, "%s%cMonetDB%cxrpc%clogs%c", logdir, 
DIR_SEP, DIR_SEP, DIR_SEP, DIR_SEP);
+        if (GDKcreatedir(xrpc_logdir_buf)) {
+            xrpc_logdir = xrpc_logdir_buf;
+        } else {
+            GDKerror("get_xrpc_logdir: directory %s not writable.", 
xrpc_logdir_buf);
+        }
+    }
+    return xrpc_logdir;
+}
+
+
+stream*
+xrpc_log_message(str msg, lng xrpc_seqnr) {
+    char logfile[1024], *logdir = xrpc_log_dir();
+    stream* fp = NULL;
+    if (logdir) {
+        snprintf(logfile, 1024, "%s%s_" LLFMT ".xml", logdir, msg, xrpc_seqnr);
+        fp = open_wastream(logfile);
+    }
+    if (!fp || stream_errnr(fp)) {
+        GDKerror("print_result: could not open logfile for writing\n");
+        if (fp) stream_destroy(fp);
+        return NULL;
+    }
+    return fp;
+}
+
 int 
 xquery_print_result_DRIVER (
     str                 mode,
@@ -2144,7 +2146,7 @@
     lng time_exec = (GDKusec() - *xrpc_start) / 1000;
 
     if (trace) {
-        stream* logstream = xquery_print_trace("res",*xrpc_seqnr);
+        stream* logstream = xrpc_log_message("res",*xrpc_seqnr);
         fp = logstream?attach_teestream(fp, logstream):NULL;
         if (fp == NULL) return GDK_FAIL;
         if (strcmp(xrpc_caller, "query") == 0) {

U pf_support.mx
Index: pf_support.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pf_support.mx,v
retrieving revision 1.299.4.8
retrieving revision 1.299.4.9
diff -u -d -r1.299.4.8 -r1.299.4.9
--- pf_support.mx       12 Jun 2008 14:47:28 -0000      1.299.4.8
+++ pf_support.mx       17 Jun 2008 16:32:21 -0000      1.299.4.9
@@ -3362,7 +3362,7 @@
   do_update_end(ws, affected_conts);
 }
 
-PROC collect_update_tape(bat[void, bat] ws, bat[void, oid] item, bat[void, 
int] kind, bat[void,lng] int_values, bat[void,str] str_values) : void
+PROC collect_update_tape(bat[void, bat] ws, bat[void, oid] item, bat[void, 
int] kind, bat[void,lng] int_values, bat[void,str] str_values) : bit
 {
   # convert MPS input to Algebra input
   var cmd := [and]([lng](item.mirror()), 
3LL).ord_uselect(0LL).mirror().leftfetchjoin(item).leftfetchjoin(int_values); # 
[i,CMD]
@@ -3412,6 +3412,8 @@
   ws.fetch(COLLECTED_PRE_CONT_INS).append(pre_cont_ins);
   ws.fetch(COLLECTED_ATTR_INS).append(attr_ins);
   ws.fetch(COLLECTED_ATTR_CONT_INS).append(attr_cont_ins);
+ 
+  return (count(cmd) > 0);
 }
 
 PROC execute_update_tape(bat[void, bat] ws) : void
@@ -3817,6 +3819,9 @@
       newattr := vx_maintain(elemnids, elemqns, attrqns, attrvals);
     }
 
+    if (xrpc_qid != "") 
+      xrpc_confirm_commit(xrpc_qid, xrpc_caller, xrpc_seqnr, xrpc_hdl);
+
     var newtext;
     {
       var prop_text := ws.fetch(_PROP_TEXT).find(cont);
@@ -3866,18 +3871,6 @@
 b.col_name(nme).print();
 }
 
-PROC do_2phase_commit() : void
-{
-  # write PRECOMMIT record
-  log_trans_precommit(pf_logger, xrpc_qid);
-
-  # now wait until we get an ABORT or a COMMIT message
-  xrpc_wait_for_commit(xrpc_qid);
-
-  if (abort) {
-    ERROR("transaction aborted!");
-  }
-}
 
 PROC do_log_updates(BAT[void,bat] ws, BAT[any,any] cont_order, BAT[oid,bit] 
map_pid_changed_bat) : void
 {
@@ -3949,11 +3942,15 @@
     if (ws_log_active)
       ws_log(ws, "commit-LOG_SIZE exec" + str(ws_logtime - (ws_logtime := 
usec()))); 
   }
+
+  # 2PC transactions report back here ("Prepared") and must wait for the final 
Commit
   if (xrpc_qid != "") {
-    do_2phase_commit(); # do the complete 2 phase commit stuff
-  } else {
-    log_trans_end(pf_logger); # write commit record in WAL: ==> THIS IS THE 
COMMIT POINT <==
-  }
+    log_trans_precommit(pf_logger); # write PRECOMMIT record
+    xrpc_wait_for_commit(xrpc_qid, xrpc_caller, xrpc_seqnr, xrpc_hdl);
+  } 
+
+  # write commit record in WAL: ==> THIS IS THE COMMIT POINT <==
+  log_trans_end(pf_logger); 
 
   if (ws_log_active)
     ws_log(ws, "commit-LOG_TRANS_END exec" + str(ws_logtime - usec()));
@@ -5834,8 +5831,10 @@
 #define PF_MODE_UPDATE 2 /* updating query */
 #define PF_MODE_RETRY  3 /* updating query run for the second time (exclusive 
mode) */
 
-pf_support_export int xrpc_port;
-pf_support_export char *xrpc_hostname;
+pf_support_export int     xrpc_port;
+pf_support_export char*   xrpc_hostname;
+pf_support_export stream* xrpc_log_message(str msg, lng xrpc_seqnr);
+pf_support_export char*   xrpc_log_dir();
 
 #endif
 @c

U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.45.4.15
retrieving revision 1.45.4.16
diff -u -d -r1.45.4.15 -r1.45.4.16
--- xrpc_client.mx      12 Jun 2008 14:47:29 -0000      1.45.4.15
+++ xrpc_client.mx      17 Jun 2008 16:32:23 -0000      1.45.4.16
@@ -76,6 +76,9 @@
         BAT[void, str] str_values) : BAT[void, bat] = CMDget_rpc_res;
 "Retrieve values from the response message and return as a iter|item|kind bat"
 
+.COMMAND xrpc_commit(str qid, str mode, BAT[void,str] participants) : void = 
CMDxrpc_2pc_commit;
+"coordinate a 2pc commit"
+
 .END xrpc_client;
 
 @mil
@@ -313,14 +316,6 @@
     }
 
     var time_xrpcClntDeSeriaStart := usec();
-    var res_iter := bat(void,oid).seqbase([EMAIL PROTECTED]);
-    var res_item := bat(void,oid).seqbase([EMAIL PROTECTED]);
-    var res_kind := bat(void,int).seqbase([EMAIL PROTECTED]);
-
-    res_bats := bat(void,bat,4).seqbase([EMAIL PROTECTED]);
-    res_bats.append(res_iter).append(res_item).append(res_kind);
-    res_bats.access(BAT_READ);
-
     time_xrpcClntDeSeria := time_xrpcClntDeSeria + (usec() - 
time_xrpcClntDeSeriaStart);
 
     if (genType.search("timing") >= 0) {
@@ -439,7 +434,7 @@
                                     if (err) break;
 
                                     /* set the correct iteration number */
-                                    while (j == (unsigned)argcnt[it]) { 
+                                    while (j == (unsigned) argcnt[it]) { 
                                         j=0; it++; 
                                     }
                                     BUNappend(res_iter, &it, FALSE);
@@ -905,19 +900,19 @@
         return clean_up(bs, argcnt, iterc);
 
     if(qid && *qid) { /* an XRPC query that requires 2PC */
-        ret = stream_printf(bs, "<env:Header>" XRPC_WS_QID "</env:Header>" , 
qid, timeout);
+        ret = stream_printf(bs, "<env:Header>" XRPC_WS_QID "</env:Header>" , 
qid, timeout, rpc_mode);
         if (ret < 0) 
             return clean_up(bs, argcnt, iterc);
     }
 
     if(firstCaller) {
-        ret = stream_printf(bs, XRPC_REQ_HEADER_FIRST_CALLER,
+        ret = stream_printf(bs, XRPC_REQ_BODY_FIRST_CALLER,
                 rpc_module, rpc_uri, rpc_method, arity, iterc,
-                rpc_mode, updCall?"true":"false");
+                updCall?"true":"false");
     } else {
-        ret = stream_printf(bs, XRPC_REQ_HEADER, rpc_module, rpc_uri,
+        ret = stream_printf(bs, XRPC_REQ_BODY, rpc_module, rpc_uri,
                 rpc_method, arity, iterc, xrpc_hostname, xrpc_port,
-                seqnr, rpc_mode, updCall?"true":"false");
+                seqnr, updCall?"true":"false");
     }
     if (ret < 0) 
         return clean_up(bs, argcnt, iterc);
@@ -1240,4 +1235,118 @@
     *res = shredBAT;
     return GDK_SUCCEED;
 }
+
+
+
+/* send a WSAT request */
+static int
+xrpc_2pc_send(int sock, char* dst, int port, str msg) { 
+    stream *out = NULL;
+    
+    if (!(out = socket_wastream(sock, "http_send"))) {
+        GDKerror("xrpc_2pc_commit: failed to create socket_wastream for 
sending XRPC request\n");
+        return GDK_FAIL;
+    }
+    if (stream_printf(out,
+            "POST %s HTTP/1.1\r\n"
+            "Host: %s:%d\r\n"
+            "Accept: text/html, text/xml, application/soap+xml\r\n"
+            "Accept-Language: en-uk en-us\r\n"
+            "Content-Type: text/html; charset=\"utf-8\"\r\n"
+            "Content-Length: " SZFMT "\r\n\r\n"
+            XRPC_WSAT_CALLBACK, dst, port, strlen(msg), msg) < 0) 
+    {
+        GDKerror("xrpc_2pc_commit: failed to send XRPC request header.");
+        return GDK_FAIL;
+    }
+    if (stream_write(out, msg, 1, strlen(msg)) < 0) {
+        GDKerror("xrpc_2pc_commit: failed to send XRPC request.");
+        return GDK_FAIL;
+    }
+    stream_close(out);
+    stream_destroy(out);
+    return GDK_SUCCEED;
+}
+
+/* get a WSAT response */
+static int
+xrpc_2pc_recv(int sock, char* dst, int port, str msg) { 
+    stream *in = NULL;
+    BAT* shredBAT;
+    int ret = GDK_FAIL;
+
+    if( !(in = socket_rastream(sock, "http_receive")) ){
+        GDKerror("do_simple_query: failed to create socket_rastream\n");
+        return GDK_FAIL;
+    }
+    shredBAT = response2bat(0, in, dst, port, 0);
+    if (shredBAT) {
+        BATiter si = bat_iterator(shredBAT);
+        BAT *qn_uri_loc = BATdescriptor(*(bat*) BUNtail(si, QN_URI_LOC));
+        if (qn_uri_loc) {
+            if (BUNfnd(qn_uri_loc, msg)) ret = GDK_SUCCEED;
+            BBPunfix(qn_uri_loc->batCacheid);
+        }
+    }
+    BBPreclaim(shredBAT);
+    stream_close(in);
+    stream_destroy(in);
+    return ret;
+}
+    
+int
+CMDxrpc_2pc_commit(str qid, str mode, lng* xrpc_timeout, lng* time_start, BAT* 
participants) {
+    int i, j, ret=GDK_FAIL, cnt=0, npart = BATcount(participants);
+    str *part = (str*) alloca(npart*sizeof(npart));
+    int *port = (int*) alloca(npart*sizeof(int));
+    int *sock = (int*) alloca(npart*sizeof(int)), xrpc_updating;
+    char dst[1024], msg[1024], *r; 
+    BATiter pi = bat_iterator(participants);
+    BUN p,q;
+  
+    /* get all URIs of peers that performed updates */ 
+    BATloop(participants, p, q) {
+        str s = (str) BUNtail(pi, p); 
+        if (sscanf(s, "%[^','],%[^','],%[^','],%s,%d", 
+                   msg, dst, msg, msg, &xrpc_updating) != 5 
+            || (r=strrchr(dst,':')) == NULL) 
+        {
+            GDKerror("draw_queryflow: sscanf failed");
+            goto cleanup;
+        }
+        if (xrpc_updating == 1) { /* a request that did updates */
+            *r = 0;
+            for(i=0; i<cnt; i++) 
+                if (strcmp(part[i],dst) == 0) break;
+
+            if (i == cnt) { /* not already in the list? */
+                char* p = strchr(part[i] = GDKstrdup(dst), ':');
+                port[i] = 80;
+                if (p) { *p = 0; port[i] = atoi(p+1); }
+                if ((sock[i] = setup_connection(part[i], port+i)) < 0) goto 
cleanup;
+                cnt++;
+            }
+        }
+    }
+
+    /* create Prepare and Commit SOAP messages */
+    snprintf(msg, 1024, XRPC_WSAT_REQ, mode, qid, *xrpc_timeout - (GDKusec() - 
*time_start), "Prepare");
+    for(i=0; i<cnt; i++) 
+       if (xrpc_2pc_send(sock[i], part[i], port[i], msg) == GDK_FAIL) break;
+    for(j=0; j<i; j++) 
+       if (xrpc_2pc_recv(sock[j], part[j], port[j], WSAT_NS"|Prepared") == 
GDK_FAIL) break;
+
+    if (j == cnt) ret = GDK_SUCCEED;
+    snprintf(msg, 1024, XRPC_WSAT_REQ, mode, qid, *xrpc_timeout - (GDKusec() - 
*time_start), (ret == GDK_SUCCEED) ? "Commit" : "Abort");
+    for(j=0; j<cnt; j++) 
+       if (xrpc_2pc_send(sock[j], part[j], port[j], msg) == GDK_FAIL) ret = 
GDK_FAIL;
+    for(i=0; i<j; i++) 
+       if (xrpc_2pc_recv(sock[i], part[i], port[i], WSAT_NS"|Committed") == 
GDK_FAIL) ret = GDK_FAIL;
+cleanup:
+    for(i=0; i<cnt; i++) {
+        if (part[i]) GDKfree(part[i]);
+        if (sock[i] >= 0) close(sock[i]);
+    }
+    return ret;
+}
 /* vim:set shiftwidth=4 expandtab: */

U serialize_dflt.mx
Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.46.4.10
retrieving revision 1.46.4.11
diff -u -d -r1.46.4.10 -r1.46.4.11
--- serialize_dflt.mx   11 Jun 2008 07:28:23 -0000      1.46.4.10
+++ serialize_dflt.mx   17 Jun 2008 16:32:23 -0000      1.46.4.11
@@ -1038,7 +1038,7 @@
         BBPunfix(bid);
         if (ret < 0) return PROBLEM;
     }
-    ret = stream_printf(ctx->out, XRPC_RES_HEADER, ctx->xrpc_module, 
ctx->xrpc_method);
+    ret = stream_printf(ctx->out, XRPC_RES_BODY, ctx->xrpc_module, 
ctx->xrpc_method);
     return (ret < 0)?PROBLEM:SUCCESS;
 }
 

U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.4.18
retrieving revision 1.68.4.19
diff -u -d -r1.68.4.18 -r1.68.4.19
--- xrpc_server.mx      12 Jun 2008 06:10:26 -0000      1.68.4.18
+++ xrpc_server.mx      17 Jun 2008 16:32:23 -0000      1.68.4.19
@@ -46,6 +46,17 @@
 .COMMAND my_hostname() : str = CMDmy_hostname;
 "Returns the hostname of the localhost."
 
[EMAIL PROTECTED] 2pc
+very basic 2pc implementation. we do not do aborts yet (failures just lead to
+timeout). Also, recovery is not yet implemented (ie finding out after a 
precommit
+record without commit whether the transaction succeeded or not).
[EMAIL PROTECTED]
+.COMMAND xrpc_wait_for_commit(str qid, str caller, lng seqnr, ptr handle) : 
void = CMDxrpc_2pc_wait_for_commit;
+"report prepared, wait for commit"
+
+.COMMAND xrpc_confirm_commit(str qid, str caller, lng seqnr, ptr handle) : 
void = CMDxrpc_2pc_confirm_commit;
+"report commit"
+
 .PRELUDE = xrpc_prelude;
 .EPILOGUE = xrpc_epilogue;
 .END xrpc_server;
@@ -167,6 +178,7 @@
 /* HTTP defines */
 #define XRPC_DOC_CALLBACK       "/xrpc/doc"
 #define XRPC_ADM_CALLBACK       "/xrpc/admin"
+#define XRPC_WSAT_CALLBACK      "/xrpc/wsat"
 #define XRPC_REQ_CALLBACK       "/xrpc"
 
 #define ERR403                  "403 Forbidden"
@@ -209,15 +221,17 @@
     "  xsi:schemaLocation=\"" XRPC_NS " " XRPC_LOC "\">\n"
 
 #define XRPC_WS_QID\
-      " <wscoor:CoordinationContext\n"\
-      "  xmlns:wscoor=\""WSCOOR_NS"\"\n"\
-      "  env:mustUnderstand=\"true\">\n"\
-      "   <wscoor:Identifier>%s</wscoor:Identifier>\n"\
-      "   <wscoor:Expires>"LLFMT"</wscoor:Expires>\n"\
-      "   <wscoor:CoordinationType>" WSAT_NS "</wscoor:CoordinationType>\n"\
-      " </wscoor:CoordinationContext>\n"
+      " <xrpc:header xrpc:mode=\"%s\">\n"\
+      "  <wscoor:CoordinationContext\n"\
+      "   xmlns:wscoor=\""WSCOOR_NS"\"\n"\
+      "   env:mustUnderstand=\"true\">\n"\
+      "    <wscoor:Identifier>%s</wscoor:Identifier>\n"\
+      "    <wscoor:Expires>"LLFMT"</wscoor:Expires>\n"\
+      "    <wscoor:CoordinationType>" WSAT_NS "</wscoor:CoordinationType>\n"\
+      "  </wscoor:CoordinationContext>\n"\
+      " </xrpc:header>\n"
 
-#define XRPC_REQ_HEADER\
+#define XRPC_REQ\
     "<env:Body>\n"\
       "<xrpc:request\n"\
       "  xrpc:module=\"%s\"\n"\
@@ -225,23 +239,24 @@
       "  xrpc:method=\"%s\"\n"\
       "  xrpc:arity=\""LLFMT"\"\n"\
       "  xrpc:iter-count=\""LLFMT"\"\n"\
-      "  xrpc:caller=\"%s:%d:"LLFMT"\"\n"\
-      "  xrpc:mode=\"%s\"\n"\
-      "  xrpc:updCall=\"%s\">\n"
+      "  xrpc:updCall=\"%s\"\n"\
+      "  xrpc:caller=\""
+#define XRPC_REQ_BODY\
+      XRPC_REQ "%s:%d:"LLFMT"\">\n"
+#define XRPC_REQ_BODY_FIRST_CALLER\
+      XRPC_REQ "query\">\n"
 
-#define XRPC_REQ_HEADER_FIRST_CALLER\
-    "<env:Body>\n"\
-    " <xrpc:request\n"\
-    "   xrpc:module=\"%s\"\n"\
-    "   xrpc:location=\"%s\"\n"\
-    "   xrpc:method=\"%s\"\n"\
-    "   xrpc:arity=\""LLFMT"\"\n"\
-    "   xrpc:iter-count=\""LLFMT"\"\n"\
-    "   xrpc:caller=\"query\"\n"\
-    "   xrpc:mode=\"%s\"\n"\
-    "   xrpc:updCall=\"%s\">\n"
+#define XRPC_WSAT_REQ SOAP_ENVELOPE\
+    "<env:Header>" XRPC_WS_QID "</env:Header>\n"\
+    "<env:Body><wsat:%s/></env:Body>\n"\
+    "</env:Envelope>"
 
-#define XRPC_RES_HEADER\
+#define XRPC_WSAT_RES SOAP_ENVELOPE\
+    "<env:Header>\n" XRPC_WS_QID 
"<xrpc:partipant>%s,%s:%s:%s,"LLFMT",%d</xrpc:participant>\n</env:Header>\n"\
+    "<env:Body><wsat:%s/></env:Body>\n"\
+    "</env:Envelope>"
+
+#define XRPC_RES_BODY\
     "<env:Body>\n"\
     " <xrpc:response\n"\
     "   xrpc:module=\"%s\"\n"\
@@ -338,6 +353,16 @@
 static int listen_socket = -1;
 static char datadir[1024];
 
+typedef struct {
+    lng start;
+    stream *s;
+    char* qid;
+    char* mode;
+} xrpc_commit_t;
+
+static MT_Sema xrpc_commit_sema;
+static xrpc_commit_t *xrpc_commit_active = NULL;
+
 
 int CMDmy_hostname(char **res)
 {
@@ -599,11 +624,12 @@
     return -1;
 } 
 
+
 static INLINE oid
-skip_text_nodes(char *pre_kindT, oid pre, oid max)
+skip_text_nodes(char *pre_kindT, oid pre, oid max) 
 {
-       while(pre < max && pre_kindT[pre] == TEXT) pre++;
-       return pre;
+    while(pre < max && pre_kindT[pre] != ELEM) pre++;
+    return pre; 
 }
 
 XRPCreq_t *
@@ -647,7 +673,6 @@
     oid start_invalidate = 0; /* indicates from which node we should start 
with invalidation */
     char level_diff = 0; /* indicates how many levels each pre_level value of 
a node should be reduced. */
 
-    i = BUNfirst(shredBAT);
     shredBATi = bat_iterator(shredBAT);
     /* FIXME: should used BATdescriptor */
 @= getbat
@@ -690,6 +715,16 @@
     nattrs     = BATcount(attr_prop);
     nnodes     = BATcount(pre_size);
 
+    /* get mode from the header (a bit hacked) */
+    for(i=0; i<nattrs; i++) {
+        str s = (str) BUNtail(qn_uli, attr_qnT[i]);
+        if (strcmp(s, XRPC_NS"|mode") == 0) {
+            mode = (str) BUNtail(prop_vali, attr_propT[i]);
+            break;
+        }
+    }
+
+    i = BUNfirst(shredBAT);
     if (participants) {
         /* parse a SOAP header for participants (XRPC response case) */
         if ((hdr_node_pre = get_pre_by_qname(XRPC_NS"|participants",
@@ -768,8 +803,6 @@
                 arity_str = val;
             } else if(strcmp(pul, XRPC_NS"|iter-count") == 0) {
                 itercnt_str = val;
-            } else if(strcmp(pul, XRPC_NS"|mode") == 0) {
-                mode = val;
             } else if(strcmp(pul, XRPC_NS"|caller") == 0) {
                 caller = val;
             } else if(strcmp(pul,XRPC_NS"|updCall") == 0) {
@@ -814,27 +847,30 @@
     /* Fill the arrays 'req->argcnt', 'req->argval', 'req->argtpe' */
     /* i: index in xrpc:call; j: index of xrpc:sequence per xrpc:call */
        next_call_node_pre = call_node_pre;
-       for(i = 0; next_call_node_pre > 0 && next_call_node_pre < nnodes;
-                       call_node_pre = next_call_node_pre, i++) {
-               /* end of current call node */
+    for(i = 0; 
+            next_call_node_pre > 0 && next_call_node_pre < nnodes; 
+                call_node_pre = next_call_node_pre, i++) 
+    {
+        /* end of current call node */
         next_call_node_pre = call_node_pre + pre_sizeT[call_node_pre] + 1;
 
-               for(seq_node_pre = skip_text_nodes(pre_kindT,
-                                       call_node_pre + 1, next_call_node_pre),
-                               next_seq_node_pre = seq_node_pre, j = 0;
-                               next_seq_node_pre > 0 && next_seq_node_pre < 
next_call_node_pre;
-                seq_node_pre = next_seq_node_pre, j++) {
-                       /* end of current sequence node */
+        for(seq_node_pre = skip_text_nodes(pre_kindT, call_node_pre + 1, 
next_call_node_pre), 
+            next_seq_node_pre = seq_node_pre, j = 0;
+                next_seq_node_pre > 0 && next_seq_node_pre < 
next_call_node_pre;
+                    seq_node_pre = next_seq_node_pre, j++) 
+        {
+            /* end of current sequence node */
             next_seq_node_pre = seq_node_pre + pre_sizeT[seq_node_pre] + 1;
        
-            for(tpe_node_pre = skip_text_nodes(pre_kindT,
-                                               seq_node_pre+1, 
next_seq_node_pre),
-                                       next_tpe_node_pre = tpe_node_pre, k=1;
-                                       next_tpe_node_pre > 0 && 
next_tpe_node_pre < next_seq_node_pre;
-                    tpe_node_pre = next_tpe_node_pre, k++) {
-                               /* end of current xrpc:<type> node */
+            for(tpe_node_pre = skip_text_nodes(pre_kindT, seq_node_pre+1, 
next_seq_node_pre),
+               next_tpe_node_pre = tpe_node_pre, k=1;
+                    next_tpe_node_pre > 0 && next_tpe_node_pre < 
next_seq_node_pre;
+                        tpe_node_pre = next_tpe_node_pre, k++) 
+            {
+                /* end of current xrpc:<type> node */
                 next_tpe_node_pre = tpe_node_pre + pre_sizeT[tpe_node_pre]+1;
 
+                
                 /* advance our cursor in attr_own */
                 while(ao_ptr < nattrs && attr_ownT[ao_ptr] < tpe_node_pre) 
ao_ptr++;
 
@@ -931,11 +967,11 @@
                 }
                 req->argcnt[i][j]++;
                 req->nr_args++;
-                               next_tpe_node_pre = skip_text_nodes(pre_kindT, 
next_tpe_node_pre, next_seq_node_pre);
+                next_tpe_node_pre = skip_text_nodes(pre_kindT, 
next_tpe_node_pre, next_seq_node_pre);
             } /* end loop 'xrpc:<type>' */
-                       next_seq_node_pre = skip_text_nodes(pre_kindT, 
next_seq_node_pre, next_call_node_pre);
+            next_seq_node_pre = skip_text_nodes(pre_kindT, next_seq_node_pre, 
next_call_node_pre);
         } /* end loop 'xrpc:sequence' */
-               next_call_node_pre = skip_text_nodes(pre_kindT, 
next_call_node_pre, nnodes);
+        next_call_node_pre = skip_text_nodes(pre_kindT, next_call_node_pre, 
nnodes);
     } /* end loop 'xrpc:call' */
     frag_root = BATsetaccess(frag_root, BAT_READ);
     res = req; req = NULL;
@@ -1081,16 +1117,16 @@
     BAT *shredBAT;
     lng time_xrpcServDeSeria;
     int flags = timing|debug; 
-    char *msg;
+    char *msg = shttpd_get_msg(arg);
 
     time_xrpcServDeSeria = GDKusec();
     if(!(shredBAT = request2bat(mc->c->fdout, msg=shttpd_get_msg(arg))))
         return GDK_FAIL;
 
     if(!(req = parse_request(mc->c->fdout, shredBAT, 0))) {
-        BBPreclaim(shredBAT);
         return GDK_FAIL;
     }
+    BBPreclaim(shredBAT);
 
     time_xrpcServDeSeria = GDKusec() - time_xrpcServDeSeria;
 
@@ -1101,7 +1137,7 @@
         return GDK_FAIL;
     }
     if (req->mode && strstr(req->mode,"trace")) {
-        stream *logstream = xquery_print_trace("req", req->seqnr);
+        stream *logstream = xrpc_log_message("req", req->seqnr);
         if (logstream == NULL) return GDK_FAIL;
         if (stream_errnr(logstream) || stream_write(logstream, msg, 1, 
strlen(msg)) <= 0) {
             stream_destroy(logstream);
@@ -1276,6 +1312,71 @@
     return ret;
 }
 
+
+/**
+ * Handle 2PC Web Services Atomic Transactions (WSAT) requests
+ *
+ * @return GDK_SUCCEED, or
+ *         GDK_FAIL if an error has occurred.
+ */
+static int 
+xrpc_handle_wsat_request(mapi_client *mc, struct shttpd_callback_arg *arg)
+{
+    char *msg = shttpd_get_msg(arg);
+    char *qid = strstr(msg, ":Identifier>");
+    char *mode = NULL, *p = strstr(msg, ":header mode=");
+    if (p) {
+        /* we do some very hacky unsafe XML parsing here (get mode) */
+        for(p+=18; *p; p++) 
+            if (*p != ' ' && *p != '\t' && *p != '\n') break; 
+        if (*p == '\'' || *p == '\"') {
+            char *q = strchr(p+1, *p);
+            if (*q) { *q = 0; mode = p+1; }
+        }
+    } 
+    msg = strchr(msg, '<');
+
+    if (msg && qid) { 
+        /* we do some very hacky unsafe XML parsing here (get qid) */
+        str q = (qid+=12);
+        while(*q && *q != '<') q++;
+        *q = 0;
+
+        if (strstr(msg, ":Prepare")) {
+            lng seqnr;
+ 
+            /* initialize a commit record; the MIL code will come back with 
its pointer to send the response */
+            xrpc_commit_t c;
+            c.s = mc->c->fdout;
+            c.mode = mode;
+            c.qid = qid;
+            c.start = GDKusec();
+            seqnr = xquery_2pc_exec(mc, qid, (ptr) &c);
+            if (seqnr == 0) send_err(c.s, ERR404, "env:Sender", "prepare 
failed");
+
+            /* log the request */ 
+            if (mode && strstr(mode, "trace")) {
+                stream *logstream = xrpc_log_message("prepare", seqnr);
+                if (logstream) {
+                    stream_write(logstream, q, 1, strlen(q));
+                    stream_close(logstream);
+                    stream_destroy(logstream);
+                }
+            }
+        } else if (strcmp(xrpc_commit_active->qid,qid) == 0) {
+            if (strstr(msg, ":Commit")) {
+                xrpc_commit_active->mode = mode;
+                xrpc_commit_active->s = mc->c->fdout;
+            } /* else: abort */
+            xrpc_commit_active->qid = msg; /* hack: put message in qid for 
logging */
+            xrpc_commit_active = NULL;
+            MT_up_sema(xrpc_commit_sema, "xrpc_handle_wsat_request");
+        } 
+    }
+    return GDK_SUCCEED;
+}
+
+
 /*
  * XRPC MAPI client handler (overrides the xquery_client_engine)
  */
@@ -1293,6 +1394,8 @@
         (void) xrpc_handle_file_request(mc, arg); /* GET/PUT/DELETE file 
request */
     } else if (strncmp(uri, XRPC_REQ_CALLBACK, strlen(XRPC_REQ_CALLBACK)) == 
0) {
         (void) xrpc_handle_request(mc, arg);
+    } else if (strncmp(uri, XRPC_WSAT_CALLBACK, strlen(XRPC_WSAT_CALLBACK)) == 
0) {
+        (void) xrpc_handle_wsat_request(mc, arg);
     }
 
     /* clean up */
@@ -1373,6 +1476,7 @@
     shttpd_register_url(XRPC_REQ_CALLBACK, xrpc_fork_mapiclient, NULL);
     shttpd_register_url(XRPC_DOC_CALLBACK, xrpc_fork_mapiclient, NULL);
     shttpd_register_url(XRPC_ADM_CALLBACK, xrpc_fork_mapiclient, NULL);
+    shttpd_register_url(XRPC_WSAT_CALLBACK, xrpc_fork_mapiclient, NULL);
 
     /* Open listening socket */
     ctx = shttpd_open_port(xrpc_port, *open);
@@ -1384,6 +1488,67 @@
     return GDK_SUCCEED;
 }
 
+
+int
+CMDxrpc_2pc_wait_for_commit(str qid, str caller, lng* seqnr, ptr* handle) {
+    xrpc_commit_t *c = (xrpc_commit_t*) handle;
+    long now = GDKusec();
+
+    stream_printf(c->s, HTTP_200_OK);
+
+    if (c->mode && strstr(c->mode,"trace")) {
+        stream *logstream = xrpc_log_message("res", *seqnr);
+        if (logstream) c->s = attach_teestream(c->s, logstream);   
+    }
+    stream_printf(c->s, XRPC_WSAT_RES, c->mode?c->mode:"repeatable",
+                  qid, 0LL, caller, xrpc_hostname, xrpc_port, "Prepare", now - 
c->start, 2, "Prepared");
+    stream_printf(c->s, HTTP_200_OK);
+    stream_close(c->s);
+    stream_destroy(c->s); 
+
+    /* now start waiting for the commit request (handle_wsat_request deblocks 
us) */
+    c->start = now;
+    c->s = 0;
+    assert(xrpc_commit_active == NULL);
+    assert(strcmp(c->qid,qid) == 0);
+    xrpc_commit_active = c;
+    MT_down_sema(xrpc_commit_sema, "xrpc_2pc_wait_for_commit");
+    if (c->s == NULL) {
+        GDKerror("XRPC transaction %s aborted!", qid);
+        return GDK_FAIL;
+    }
+    return GDK_SUCCEED;
+}
+
+int
+CMDxrpc_2pc_confirm_commit(str qid, str caller, lng* seqnr, ptr* handle) {
+    xrpc_commit_t *c = (xrpc_commit_t*) handle;
+    long now = GDKusec();
+
+    stream_printf(c->s, HTTP_200_OK);
+
+    if (c->mode && strstr(c->mode,"trace")) {
+        stream* logstream = xrpc_log_message("res", *seqnr);
+        if (logstream) c->s = attach_teestream(c->s, logstream);   
+    }
+    stream_printf(c->s, XRPC_WSAT_RES, c->mode?c->mode:"repeatable",
+                  qid, 0LL, caller, xrpc_hostname, xrpc_port, "Commit", now - 
c->start, 3, "Committed");
+    stream_close(c->s);
+    stream_destroy(c->s); 
+
+    if (c->mode && strstr(c->mode,"trace")) {
+        /* hack: we had put Commit msg in qid so we can log it now with the 
right seqnr */
+        stream* logstream = xrpc_log_message("req", *seqnr);
+        if (logstream) {
+            stream_write(logstream, c->qid, 1, strlen(c->qid));
+            stream_close(logstream);
+            stream_destroy(logstream);
+        }
+    }
+    return GDK_SUCCEED;
+}
+
+
 @= xrpc_bat
     [EMAIL PROTECTED] = BATnew(TYPE_void,ATOMindex("@2"),1024);
     assert([EMAIL PROTECTED]);
@@ -1393,6 +1558,7 @@
 BAT *xrpc_qids = NULL, *xrpc_statuses = NULL, *xrpc_timeouts = NULL, 
*xrpc_locks = NULL, *xrpc_wsbats = NULL;
 BAT *xrpc_trusted = NULL, *xrpc_admin = NULL, *xrpc_user = NULL;
 
+
 bat* xrpc_prelude(void) {
     @:xrpc_bat(qids,str)@
     @:xrpc_bat(statuses,str)@
@@ -1403,6 +1569,7 @@
     @:xrpc_bat(admin,str)@
     @:xrpc_bat(user,str)@
     CMDmy_hostname(&xrpc_hostname);
+    MT_init_sema(xrpc_commit_sema,0, "XRPC_COMMIT_SEMA");
     assert(xrpc_hostname);
     return NULL;
 }


-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to