Update of /cvsroot/monetdb/pathfinder/compiler/mil
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv11815/compiler/mil

Modified Files:
      Tag: xrpcdemo
        milprint_summer.c 
Log Message:
Ongoing implementation for the SIGMOD demo (now to the right branch)



U milprint_summer.c
Index: milprint_summer.c
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/compiler/mil/milprint_summer.c,v
retrieving revision 1.419.4.1
retrieving revision 1.419.4.2
diff -u -d -r1.419.4.1 -r1.419.4.2
--- milprint_summer.c   25 May 2008 22:44:33 -0000      1.419.4.1
+++ milprint_summer.c   25 May 2008 22:55:06 -0000      1.419.4.2
@@ -6254,7 +6254,9 @@
 static void
 translateXRPCCall (opt_t *f, int cur_level, int counter, PFcnode_t *xrpc)
 {
-    int i = 0, rc = NORMAL, updCall = 0, timeout = 0;
+    int i = 0, rc = NORMAL;
+    long long timeout = 0;
+    bool updCall = false;
     PFcnode_t *dsts = NULL, *funApp = NULL;
     PFfun_t   *fun  = NULL;
     PFcnode_t *args = NULL;
@@ -6263,7 +6265,7 @@
 
     assert(f && xrpc);
 
-    updCall = PFqueryType(xrpc) == 0 ? 0 : 1;
+    updCall = PFqueryType(xrpc) == 0 ? false : true;
     dsts = L(xrpc);
     funApp = R(xrpc);
 
@@ -6365,34 +6367,29 @@
         isoLevel = *((char **) PFarray_top (opt));
         if(strcmp(isoLevel, "none") !=0 && strcmp(isoLevel, "repeatable") != 0)
             PFoops(OOPS_FATAL, "Invalid value of option 'xrpc:isolation': 
\"%s\".", isoLevel);
-
-        if(strcmp(isoLevel, "none") != 0)
-            PFoops(OOPS_NOTSUPPORTED, "XRPC isolation level \"repeatable\" is 
not implemented yet.");
     }
 
     opt = PFenv_lookup(PFoptions, PFqname(PFns_xrpc, "timeout"));
-    if(!opt) {
-        timeout = 30000; /* msec, default value of option 'xrpc:timeout' */
-    } else {
-        if(PFarray_last(opt) > 1)
-            PFoops(OOPS_FATAL, "Multiple declarations of option 'xrpc:timeout' 
not allowed!");
-        errno = 0;
-        timeout = strtol(*((char **) PFarray_top (opt)), NULL, 10);
-        if(errno == EINVAL)
-            PFoops(OOPS_FATAL, "Invalid value of option 'xrpc:timeout': 
\"%s\".",
-                    *((char **) PFarray_top (opt)));
-        else if(errno == ERANGE)
-            PFoops(OOPS_FATAL, "Value of option 'xrpc:timeout' out-of-range (> 
%ld).",
-                    LONG_MAX);
-        else if(timeout < 0)
-            PFoops(OOPS_FATAL, "Invalid value of option 'xrpc:timeout': may 
not be negative.");
-
-        if(strcmp(isoLevel, "none") != 0) { /* isoLevel == "repeatable" */
-                if(timeout == 0)
-                    PFoops(OOPS_FATAL, "Invalid value of option 
'xrpc:timeout': must be positive "
-                            "when the isloation level \"repeatable\" is 
required.");
-        } else { /* isoLevel == "none" */
-            PFoops(OOPS_WARNING, "The option 'xrpc:timeout' does not have 
effect in isolation level \"none\", discarded.");
+    if(strcmp(isoLevel, "none") == 0) {
+        if (opt)
+            PFoops(OOPS_WARNING, "The option 'xrpc:timeout' does not have 
effect "
+                    "in isolation level \"none\", discarded.");
+    } else { /* isoLevel == "repeatable" */
+        if(!opt) {
+            timeout = 30000; /* msec, default value of 'xrpc:timeout' in 
repeatable level */
+        } else {
+            if(PFarray_last(opt) > 1)
+                PFoops(OOPS_FATAL, "Multiple declarations of option 
'xrpc:timeout' not allowed!");
+            errno = 0;
+            timeout = strtoll(*((char **) PFarray_top (opt)), NULL, 10);
+            if(errno == EINVAL)
+                PFoops(OOPS_FATAL, "Invalid value of option 'xrpc:timeout': 
\"%s\".",
+                        *((char **) PFarray_top (opt)));
+            else if(errno == ERANGE)
+                PFoops(OOPS_FATAL, "Value of option 'xrpc:timeout' 
out-of-range (> %ld).",
+                        LONG_MAX);
+            else if(timeout <= 0)
+                PFoops(OOPS_FATAL, "Invalid value of option 'xrpc:timeout': 
must be positive.");
         }
     }
 
@@ -6409,9 +6406,9 @@
              * parameters. */
             "  fun_vid%03u := ([-](fun_vid%03u.[lng](), 
fun_base%03u)).[oid]();\n"
             "  var res := %s(genType,\n"
-            "                \"%s\", %d,\n" /* isoLevel, timeout */
+            "                \"%s\", lng(%lld),\n" /* isoLevel, timeout */
             "                \"%s\", \"%s\", \"%s\",\n" /* module, location, 
method */
-            "                %d, %d, iterc_total,\n" /* updCall, arity */
+            "                %s, lng(%d), iterc_total,\n" /* updCall, arity */
             "                ws, rpc_dsts,\n"
             "                fun_vid%03u, fun_iter%03u,\n"
             "                fun_item%03u, fun_kind%03u,\n"
@@ -6434,7 +6431,7 @@
         strcmp(bulkRPC, "yes") == 0 ? "doLoopLiftedRPC" : "doIterativeRPC",
         isoLevel, timeout,
         PFqname_uri (fun->qname), fun->atURI?fun->atURI:f->url, PFqname_loc 
(fun->qname),
-        updCall, fun->arity,
+        updCall ? "true" : "false", fun->arity,
         counter, counter, counter, counter);
 }
 
@@ -11586,6 +11583,96 @@
                                    (PF_STARTMIL_NORMAL("1"));
 }
 
+/* Note 1: this code will only be printed for queries requiring
+ *         (repeatable|2PC) isolation, normal MIL code (see above) will
+ *         be printed for one-time queries.
+ * Note 2: when this code is executed, xrpclock is already set 
+ *         by xrpc_fork_mapiclient().  For one-time queries, the
+ *         xrpclock is already released in xrpc_client_engine(). */
+#define PF_STARTMIL_XRPC_GETWS(STMT)\
+        "  var ws := empty_bat;\n"\
+        "  var xrpc_wslock := lock_nil;\n"\
+        "  err := CATCH({\n"\
+        "    var qid_idx;\n"\
+        "    var err2 := CATCH(qid_idx := 
xrpc_qids.reverse().find(\"%s\"));\n"\
+        "    if(isnil(err2)) {\n"\
+        "      # we have received request for this query earlier, reuse the 
ws\n"\
+        "      var err3 := CATCH({ ws := 
bat(str(xrpc_wsids.fetch(qid_idx)));\n"\
+        "                          xrpc_wslock := 
xrpc_locks.fetch(qid_idx);\n"\
+        "                          lock_set(xrpc_wslock); # protect the ws of 
this query\n"\
+        "                  });\n"\
+        "      CATCH({ if(not(isnil(err3))) { # FIXME: is this proper handling 
of error?\n"\
+        "                xrpc_statuss.replace(qid_idx, \"abort\");\n"\
+        "                if(lock_try(xrpc_wslock) != 0) {\n"\
+        "                  lock_unset(xrpc_wslock); \n"\
+        "                }\n"\
+        "                ERROR(err3);\n"\
+        "              }\n"\
+        "      });\n"\
+        "      lock_unset(xrpclock);\n"\
+        "    } else {\n"\
+        "      # first request received for this 2PC-XRPC query\n"\
+        "      var err3 := CATCH({ qid_idx := count(xrpc_qids).oid(); # index 
of the new xrpc_* BUNs\n"\
+        "                          ws := ws_create(" STMT ");\n"\
+        "                          # HACK: use the name of ws[XRPC_SUCCESSOR] 
to store the QID\n"\
+        "                          
ws.fetch(XRPC_SUCCESSOR).rename(str(int(qid_idx)));\n"\
+        "                          xrpc_wslock := lock_create();\n"\
+        "                          lock_set(xrpc_wslock);\n"\
+        "                          xrpc_qids.append(\"%s\");\n"\
+        "                          xrpc_timeouts.append(lng(%lld));\n"\
+        "                          xrpc_wsids.append(ws_id(ws));\n"\
+        "                          xrpc_locks.append(xrpc_wslock);\n"\
+        "                          xrpc_statuss.append(\"exec\");\n"\
+        "                  });\n"\
+        "      if(not(isnil(err3))) { # Undo above actions, ignore possible 
errors\n"\
+        "        CATCH({ ws_destroy(ws);\n"\
+        "                if(lock_try(xrpc_wslock) != 0) {\n"\
+        "                  lock_unset(xrpc_wslock); \n"\
+        "                }\n"\
+        "                lock_destroy(xrpc_wslock);\n"\
+        "                xrpc_qids.delete(qid_idx);\n"\
+        "                xrpc_timeouts.delete(qid_idx);\n"\
+        "                xrpc_wsids.delete(qid_idx);\n"\
+        "                xrpc_locks.delete(qid_idx);\n"\
+        "                xrpc_statuss.delete(qid_idx);\n"\
+        "        });\n"\
+        "        lock_unset(xrpclock);\n"\
+        "        ERROR(err3);\n"\
+        "      }\n"\
+        "      lock_unset(xrpclock);\n"\
+        "    }\n"
+#define PF_STARTMIL_NORMAL_XRPC_TRANS(STMT)\
+        PF_STARTMIL_START\
+        "var err;\n"\
+        "{{\n"\
+        PF_STARTMIL_XRPC_GETWS(STMT)\
+        PF_STARTMIL_END
+#define PF_STARTMIL_UPDATE_XRPC_TRANS\
+        PF_STARTMIL_START\
+        "var try := 1;\n"\
+        "var err := \"!ERROR: conflicting update\";\n"\
+        "var ws_log_wsid := 0LL;\n"\
+        "{while(((try :+= 1) <= 3) and not(isnil(err))) {\n"\
+        " if (not(err.startsWith(\"!ERROR: conflicting update\"))) break;\n"\
+        PF_STARTMIL_XRPC_GETWS("try")\
+        "  if (ws_log_active and bit(ws_log_wsid)) \n"\
+        "    ws_log(ws, \"restarted \" + str(ws_log_wsid));\n"\
+        PF_STARTMIL_END
+
+int
+PFstartMIL_XRPCTrans(
+        char *buf,
+        int buflen,
+        int statement_type,
+        char *qid,
+        long long timeout) {
+    return (statement_type == 1) ?
+        snprintf(buf, buflen, PF_STARTMIL_UPDATE_XRPC_TRANS, qid, qid, 
timeout) :
+        snprintf(buf, buflen, (statement_type == 0) ?
+                PF_STARTMIL_NORMAL_XRPC_TRANS("0") :
+                PF_STARTMIL_NORMAL_XRPC_TRANS("1"), qid, qid, timeout);
+}
+
 /* debug statement for PFstopMIL to print result set
 "if (genType.search(\"debug\") >= 0) 
print(item.slice(0,10).col_name(\"tot_items_\"+str(item.count())));\n"
 */
@@ -11599,31 +11686,40 @@
 #endif
 
 #define PF_STOPMIL_START \
-           "  time_print := usec();\n"\
-           "  time_exec := time_print - time_start;\n"
+        "  time_print := usec();\n"\
+        "  time_exec := time_print - time_start;\n"
+#define PF_STOPMIL_RDONLY_BODY\
+        "  # 'none' could theoretically occur in genType as root tagname 
('xml-root-none'), so check for 'xml'\n"\
+        "  if ((genType.search(\"none\") < 0) or (genType.search(\"xml\") >= 
0))\n"\
+        "   
print_result(genType,moduleNS,method,ws,tunique(iter),constant2bat(iter),item.materialize(ipik),constant2bat(kind),int_values,dbl_values,str_values);\n"
+#define PF_STOPMIL_UPDATE_BODY\
+        "  play_update_tape(ws, item.materialize(ipik), 
kind.materialize(ipik), int_values, str_values);\n"
+#define PF_STOPMIL_DOCMGT_BODY\
+        "  play_doc_tape(ws, item.materialize(ipik), kind.materialize(ipik), 
int_values, str_values);\n"
 #define PF_STOPMIL_RDONLY PF_STOPMIL_START\
-           "  # 'none' could theoretically occur in genType as root tagname 
('xml-root-none'), so check for 'xml'\n"\
-           "  if ((genType.search(\"none\") < 0) or (genType.search(\"xml\") 
>= 0))\n"\
-           "   
print_result(genType,moduleNS,method,ws,tunique(iter),constant2bat(iter),item.materialize(ipik),constant2bat(kind),int_values,dbl_values,str_values);\n"\
-           PF_STOPMIL_END("Print ")
+        PF_STOPMIL_RDONLY_BODY\
+        PF_STOPMIL_END("Print ")
 #define PF_STOPMIL_UPDATE PF_STOPMIL_START\
-           "  play_update_tape(ws, item.materialize(ipik), 
kind.materialize(ipik), int_values, str_values);\n" PF_STOPMIL_END("Update")
+        PF_STOPMIL_UPDATE_BODY\
+        PF_STOPMIL_END("Update")
 #define PF_STOPMIL_DOCMGT PF_STOPMIL_START\
-           "  play_doc_tape(ws, item.materialize(ipik), 
kind.materialize(ipik), int_values, str_values);\n" PF_PLAY_TIJAH_TAPE 
PF_STOPMIL_END("Update")
-#define PF_STOPMIL_END(LASTPHASE) \
-           " });\n"\
-           " ws_log_wsid := ws_id(ws);\n"\
-           " if (not(isnil(err))) ws_log(ws, err);\n"\
-           " ws_destroy(ws);\n"\
-           "}}\n"\
-           PF_STOP_PFTIJAH\
-           "if (not(isnil(err))) {\n"\
-           "  ERROR(err);\n"\
-           "} else if (genType.startsWith(\"timing\")) {\n"\
-           "  time_print := usec() - time_print;\n"\
-           "  printf(\"\\nTrans  %% 10.3f msec\\nShred  %% 10.3f msec\\nQuery  
%% 10.3f msec\\n" LASTPHASE " %% 10.3f msec\\n\","\
-           "      dbl(time_compile)/1000.0, dbl(time_shred)/1000.0, 
dbl(time_exec - time_shred)/1000.0, time_print/1000.0);\n}"
-
+        PF_STOPMIL_DOCMGT_BODY\
+        PF_PLAY_TIJAH_TAPE PF_STOPMIL_END("Update")
+#define PF_STOPMIL_END(LASTPHASE)\
+        " });\n"\
+        " ws_log_wsid := ws_id(ws);\n"\
+        " if (not(isnil(err))) ws_log(ws, err);\n"\
+        " ws_destroy(ws);\n"\
+        "}}\n"\
+        PF_STOP_PFTIJAH\
+        PF_STOPMIL_END_PRINT_TIMING(LASTPHASE)
+#define PF_STOPMIL_END_PRINT_TIMING(LASTPHASE)\
+        "if (not(isnil(err))) {\n"\
+        "  ERROR(err);\n"\
+        "} else if (genType.startsWith(\"timing\")) {\n"\
+        "  time_print := usec() - time_print;\n"\
+        "  printf(\"\\nTrans  %% 10.3f msec\\nShred  %% 10.3f msec\\nQuery  %% 
10.3f msec\\n" LASTPHASE " %% 10.3f msec\\n\","\
+        "      dbl(time_compile)/1000.0, dbl(time_shred)/1000.0, dbl(time_exec 
- time_shred)/1000.0, time_print/1000.0);\n}"
 
 const char* PFstopMIL(int statement_type) {
     return (statement_type==0)?
@@ -11633,6 +11729,44 @@
                (PF_STOPMIL_DOCMGT);
 }
 
+#define PF_STOPMIL_RDONLY_XRPC_TRANS\
+        PF_STOPMIL_START\
+        PF_STOPMIL_RDONLY_BODY\
+        PF_STOPMIL_END_XRPC_TRANS("Print ")
+#define PF_STOPMIL_UPDATE_XRPC_TRANS\
+        PF_STOPMIL_START\
+        PF_STOPMIL_UPDATE_BODY\
+        PF_STOPMIL_END_XRPC_TRANS("Update")
+#define PF_STOPMIL_DOCMGT_XRPC_TRANS\
+        PF_STOPMIL_START\
+        PF_STOPMIL_DOCMGT_BODY\
+        PF_PLAY_TIJAH_TAPE\
+        PF_STOPMIL_END_XRPC_TRANS("Update")
+#define PF_STOPMIL_END_XRPC_TRANS(LASTPHASE)\
+        " }); \n"\
+        " # catch all possible errors while possibly holding lock\n"\
+        " CATCH({ ws_log_wsid := ws_id(ws);\n"\
+        "         if (not(isnil(err))) ws_log(ws, err); });\n"\
+        " if(not(isnil(xrpc_wslock)))\n"\
+        "   lock_unset(xrpc_wslock);\n"\
+        "}}\n"\
+        PF_STOP_PFTIJAH\
+        PF_STOPMIL_END_PRINT_TIMING(LASTPHASE)
+
+int
+PFstopMIL_XRPCTrans(
+        char *buf,
+        int buflen,
+        int statement_type,
+        char *qid ) {
+    (void)qid;
+    return snprintf(buf, buflen, (statement_type==0) ?
+                                    PF_STOPMIL_RDONLY_XRPC_TRANS :
+                                    (statement_type==1) ?
+                                    PF_STOPMIL_UPDATE_XRPC_TRANS:
+                                    PF_STOPMIL_DOCMGT_XRPC_TRANS);
+}
+
 const char* PFudfMIL(void) {
     return
         "{\n"


-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to