Update of /cvsroot/monetdb/pathfinder/compiler/mil
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv11815/compiler/mil
Modified Files:
Tag: xrpcdemo
milprint_summer.c
Log Message:
Ongoing implementation for the SIGMOD demo (now to the right branch)
U milprint_summer.c
Index: milprint_summer.c
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/compiler/mil/milprint_summer.c,v
retrieving revision 1.419.4.1
retrieving revision 1.419.4.2
diff -u -d -r1.419.4.1 -r1.419.4.2
--- milprint_summer.c 25 May 2008 22:44:33 -0000 1.419.4.1
+++ milprint_summer.c 25 May 2008 22:55:06 -0000 1.419.4.2
@@ -6254,7 +6254,9 @@
static void
translateXRPCCall (opt_t *f, int cur_level, int counter, PFcnode_t *xrpc)
{
- int i = 0, rc = NORMAL, updCall = 0, timeout = 0;
+ int i = 0, rc = NORMAL;
+ long long timeout = 0;
+ bool updCall = false;
PFcnode_t *dsts = NULL, *funApp = NULL;
PFfun_t *fun = NULL;
PFcnode_t *args = NULL;
@@ -6263,7 +6265,7 @@
assert(f && xrpc);
- updCall = PFqueryType(xrpc) == 0 ? 0 : 1;
+ updCall = PFqueryType(xrpc) == 0 ? false : true;
dsts = L(xrpc);
funApp = R(xrpc);
@@ -6365,34 +6367,29 @@
isoLevel = *((char **) PFarray_top (opt));
if(strcmp(isoLevel, "none") !=0 && strcmp(isoLevel, "repeatable") != 0)
PFoops(OOPS_FATAL, "Invalid value of option 'xrpc:isolation':
\"%s\".", isoLevel);
-
- if(strcmp(isoLevel, "none") != 0)
- PFoops(OOPS_NOTSUPPORTED, "XRPC isolation level \"repeatable\" is
not implemented yet.");
}
opt = PFenv_lookup(PFoptions, PFqname(PFns_xrpc, "timeout"));
- if(!opt) {
- timeout = 30000; /* msec, default value of option 'xrpc:timeout' */
- } else {
- if(PFarray_last(opt) > 1)
- PFoops(OOPS_FATAL, "Multiple declarations of option 'xrpc:timeout'
not allowed!");
- errno = 0;
- timeout = strtol(*((char **) PFarray_top (opt)), NULL, 10);
- if(errno == EINVAL)
- PFoops(OOPS_FATAL, "Invalid value of option 'xrpc:timeout':
\"%s\".",
- *((char **) PFarray_top (opt)));
- else if(errno == ERANGE)
- PFoops(OOPS_FATAL, "Value of option 'xrpc:timeout' out-of-range (>
%ld).",
- LONG_MAX);
- else if(timeout < 0)
- PFoops(OOPS_FATAL, "Invalid value of option 'xrpc:timeout': may
not be negative.");
-
- if(strcmp(isoLevel, "none") != 0) { /* isoLevel == "repeatable" */
- if(timeout == 0)
- PFoops(OOPS_FATAL, "Invalid value of option
'xrpc:timeout': must be positive "
- "when the isloation level \"repeatable\" is
required.");
- } else { /* isoLevel == "none" */
- PFoops(OOPS_WARNING, "The option 'xrpc:timeout' does not have
effect in isolation level \"none\", discarded.");
+ if(strcmp(isoLevel, "none") == 0) {
+ if (opt)
+ PFoops(OOPS_WARNING, "The option 'xrpc:timeout' does not have
effect "
+ "in isolation level \"none\", discarded.");
+ } else { /* isoLevel == "repeatable" */
+ if(!opt) {
+ timeout = 30000; /* msec, default value of 'xrpc:timeout' in
repeatable level */
+ } else {
+ if(PFarray_last(opt) > 1)
+ PFoops(OOPS_FATAL, "Multiple declarations of option
'xrpc:timeout' not allowed!");
+ errno = 0;
+ timeout = strtoll(*((char **) PFarray_top (opt)), NULL, 10);
+ if(errno == EINVAL)
+ PFoops(OOPS_FATAL, "Invalid value of option 'xrpc:timeout':
\"%s\".",
+ *((char **) PFarray_top (opt)));
+ else if(errno == ERANGE)
+ PFoops(OOPS_FATAL, "Value of option 'xrpc:timeout'
out-of-range (> %ld).",
+ LONG_MAX);
+ else if(timeout <= 0)
+ PFoops(OOPS_FATAL, "Invalid value of option 'xrpc:timeout':
must be positive.");
}
}
@@ -6409,9 +6406,9 @@
* parameters. */
" fun_vid%03u := ([-](fun_vid%03u.[lng](),
fun_base%03u)).[oid]();\n"
" var res := %s(genType,\n"
- " \"%s\", %d,\n" /* isoLevel, timeout */
+ " \"%s\", lng(%lld),\n" /* isoLevel, timeout */
" \"%s\", \"%s\", \"%s\",\n" /* module, location,
method */
- " %d, %d, iterc_total,\n" /* updCall, arity */
+ " %s, lng(%d), iterc_total,\n" /* updCall, arity */
" ws, rpc_dsts,\n"
" fun_vid%03u, fun_iter%03u,\n"
" fun_item%03u, fun_kind%03u,\n"
@@ -6434,7 +6431,7 @@
strcmp(bulkRPC, "yes") == 0 ? "doLoopLiftedRPC" : "doIterativeRPC",
isoLevel, timeout,
PFqname_uri (fun->qname), fun->atURI?fun->atURI:f->url, PFqname_loc
(fun->qname),
- updCall, fun->arity,
+ updCall ? "true" : "false", fun->arity,
counter, counter, counter, counter);
}
@@ -11586,6 +11583,96 @@
(PF_STARTMIL_NORMAL("1"));
}
+/* Note 1: this code will only be printed for queries requiring
+ * (repeatable|2PC) isolation, normal MIL code (see above) will
+ * be printed for one-time queries.
+ * Note 2: when this code is executed, xrpclock is already set
+ * by xrpc_fork_mapiclient(). For one-time queries, the
+ * xrpclock is already released in xrpc_client_engine(). */
+#define PF_STARTMIL_XRPC_GETWS(STMT)\
+ " var ws := empty_bat;\n"\
+ " var xrpc_wslock := lock_nil;\n"\
+ " err := CATCH({\n"\
+ " var qid_idx;\n"\
+ " var err2 := CATCH(qid_idx :=
xrpc_qids.reverse().find(\"%s\"));\n"\
+ " if(isnil(err2)) {\n"\
+ " # we have received request for this query earlier, reuse the
ws\n"\
+ " var err3 := CATCH({ ws :=
bat(str(xrpc_wsids.fetch(qid_idx)));\n"\
+ " xrpc_wslock :=
xrpc_locks.fetch(qid_idx);\n"\
+ " lock_set(xrpc_wslock); # protect the ws of
this query\n"\
+ " });\n"\
+ " CATCH({ if(not(isnil(err3))) { # FIXME: is this proper handling
of error?\n"\
+ " xrpc_statuss.replace(qid_idx, \"abort\");\n"\
+ " if(lock_try(xrpc_wslock) != 0) {\n"\
+ " lock_unset(xrpc_wslock); \n"\
+ " }\n"\
+ " ERROR(err3);\n"\
+ " }\n"\
+ " });\n"\
+ " lock_unset(xrpclock);\n"\
+ " } else {\n"\
+ " # first request received for this 2PC-XRPC query\n"\
+ " var err3 := CATCH({ qid_idx := count(xrpc_qids).oid(); # index
of the new xrpc_* BUNs\n"\
+ " ws := ws_create(" STMT ");\n"\
+ " # HACK: use the name of ws[XRPC_SUCCESSOR]
to store the QID\n"\
+ "
ws.fetch(XRPC_SUCCESSOR).rename(str(int(qid_idx)));\n"\
+ " xrpc_wslock := lock_create();\n"\
+ " lock_set(xrpc_wslock);\n"\
+ " xrpc_qids.append(\"%s\");\n"\
+ " xrpc_timeouts.append(lng(%lld));\n"\
+ " xrpc_wsids.append(ws_id(ws));\n"\
+ " xrpc_locks.append(xrpc_wslock);\n"\
+ " xrpc_statuss.append(\"exec\");\n"\
+ " });\n"\
+ " if(not(isnil(err3))) { # Undo above actions, ignore possible
errors\n"\
+ " CATCH({ ws_destroy(ws);\n"\
+ " if(lock_try(xrpc_wslock) != 0) {\n"\
+ " lock_unset(xrpc_wslock); \n"\
+ " }\n"\
+ " lock_destroy(xrpc_wslock);\n"\
+ " xrpc_qids.delete(qid_idx);\n"\
+ " xrpc_timeouts.delete(qid_idx);\n"\
+ " xrpc_wsids.delete(qid_idx);\n"\
+ " xrpc_locks.delete(qid_idx);\n"\
+ " xrpc_statuss.delete(qid_idx);\n"\
+ " });\n"\
+ " lock_unset(xrpclock);\n"\
+ " ERROR(err3);\n"\
+ " }\n"\
+ " lock_unset(xrpclock);\n"\
+ " }\n"
+#define PF_STARTMIL_NORMAL_XRPC_TRANS(STMT)\
+ PF_STARTMIL_START\
+ "var err;\n"\
+ "{{\n"\
+ PF_STARTMIL_XRPC_GETWS(STMT)\
+ PF_STARTMIL_END
+#define PF_STARTMIL_UPDATE_XRPC_TRANS\
+ PF_STARTMIL_START\
+ "var try := 1;\n"\
+ "var err := \"!ERROR: conflicting update\";\n"\
+ "var ws_log_wsid := 0LL;\n"\
+ "{while(((try :+= 1) <= 3) and not(isnil(err))) {\n"\
+ " if (not(err.startsWith(\"!ERROR: conflicting update\"))) break;\n"\
+ PF_STARTMIL_XRPC_GETWS("try")\
+ " if (ws_log_active and bit(ws_log_wsid)) \n"\
+ " ws_log(ws, \"restarted \" + str(ws_log_wsid));\n"\
+ PF_STARTMIL_END
+
+int
+PFstartMIL_XRPCTrans(
+ char *buf,
+ int buflen,
+ int statement_type,
+ char *qid,
+ long long timeout) {
+ return (statement_type == 1) ?
+ snprintf(buf, buflen, PF_STARTMIL_UPDATE_XRPC_TRANS, qid, qid,
timeout) :
+ snprintf(buf, buflen, (statement_type == 0) ?
+ PF_STARTMIL_NORMAL_XRPC_TRANS("0") :
+ PF_STARTMIL_NORMAL_XRPC_TRANS("1"), qid, qid, timeout);
+}
+
/* debug statement for PFstopMIL to print result set
"if (genType.search(\"debug\") >= 0)
print(item.slice(0,10).col_name(\"tot_items_\"+str(item.count())));\n"
*/
@@ -11599,31 +11686,40 @@
#endif
#define PF_STOPMIL_START \
- " time_print := usec();\n"\
- " time_exec := time_print - time_start;\n"
+ " time_print := usec();\n"\
+ " time_exec := time_print - time_start;\n"
+#define PF_STOPMIL_RDONLY_BODY\
+ " # 'none' could theoretically occur in genType as root tagname
('xml-root-none'), so check for 'xml'\n"\
+ " if ((genType.search(\"none\") < 0) or (genType.search(\"xml\") >=
0))\n"\
+ "
print_result(genType,moduleNS,method,ws,tunique(iter),constant2bat(iter),item.materialize(ipik),constant2bat(kind),int_values,dbl_values,str_values);\n"
+#define PF_STOPMIL_UPDATE_BODY\
+ " play_update_tape(ws, item.materialize(ipik),
kind.materialize(ipik), int_values, str_values);\n"
+#define PF_STOPMIL_DOCMGT_BODY\
+ " play_doc_tape(ws, item.materialize(ipik), kind.materialize(ipik),
int_values, str_values);\n"
#define PF_STOPMIL_RDONLY PF_STOPMIL_START\
- " # 'none' could theoretically occur in genType as root tagname
('xml-root-none'), so check for 'xml'\n"\
- " if ((genType.search(\"none\") < 0) or (genType.search(\"xml\")
>= 0))\n"\
- "
print_result(genType,moduleNS,method,ws,tunique(iter),constant2bat(iter),item.materialize(ipik),constant2bat(kind),int_values,dbl_values,str_values);\n"\
- PF_STOPMIL_END("Print ")
+ PF_STOPMIL_RDONLY_BODY\
+ PF_STOPMIL_END("Print ")
#define PF_STOPMIL_UPDATE PF_STOPMIL_START\
- " play_update_tape(ws, item.materialize(ipik),
kind.materialize(ipik), int_values, str_values);\n" PF_STOPMIL_END("Update")
+ PF_STOPMIL_UPDATE_BODY\
+ PF_STOPMIL_END("Update")
#define PF_STOPMIL_DOCMGT PF_STOPMIL_START\
- " play_doc_tape(ws, item.materialize(ipik),
kind.materialize(ipik), int_values, str_values);\n" PF_PLAY_TIJAH_TAPE
PF_STOPMIL_END("Update")
-#define PF_STOPMIL_END(LASTPHASE) \
- " });\n"\
- " ws_log_wsid := ws_id(ws);\n"\
- " if (not(isnil(err))) ws_log(ws, err);\n"\
- " ws_destroy(ws);\n"\
- "}}\n"\
- PF_STOP_PFTIJAH\
- "if (not(isnil(err))) {\n"\
- " ERROR(err);\n"\
- "} else if (genType.startsWith(\"timing\")) {\n"\
- " time_print := usec() - time_print;\n"\
- " printf(\"\\nTrans %% 10.3f msec\\nShred %% 10.3f msec\\nQuery
%% 10.3f msec\\n" LASTPHASE " %% 10.3f msec\\n\","\
- " dbl(time_compile)/1000.0, dbl(time_shred)/1000.0,
dbl(time_exec - time_shred)/1000.0, time_print/1000.0);\n}"
-
+ PF_STOPMIL_DOCMGT_BODY\
+ PF_PLAY_TIJAH_TAPE PF_STOPMIL_END("Update")
+#define PF_STOPMIL_END(LASTPHASE)\
+ " });\n"\
+ " ws_log_wsid := ws_id(ws);\n"\
+ " if (not(isnil(err))) ws_log(ws, err);\n"\
+ " ws_destroy(ws);\n"\
+ "}}\n"\
+ PF_STOP_PFTIJAH\
+ PF_STOPMIL_END_PRINT_TIMING(LASTPHASE)
+#define PF_STOPMIL_END_PRINT_TIMING(LASTPHASE)\
+ "if (not(isnil(err))) {\n"\
+ " ERROR(err);\n"\
+ "} else if (genType.startsWith(\"timing\")) {\n"\
+ " time_print := usec() - time_print;\n"\
+ " printf(\"\\nTrans %% 10.3f msec\\nShred %% 10.3f msec\\nQuery %%
10.3f msec\\n" LASTPHASE " %% 10.3f msec\\n\","\
+ " dbl(time_compile)/1000.0, dbl(time_shred)/1000.0, dbl(time_exec
- time_shred)/1000.0, time_print/1000.0);\n}"
const char* PFstopMIL(int statement_type) {
return (statement_type==0)?
@@ -11633,6 +11729,44 @@
(PF_STOPMIL_DOCMGT);
}
+#define PF_STOPMIL_RDONLY_XRPC_TRANS\
+ PF_STOPMIL_START\
+ PF_STOPMIL_RDONLY_BODY\
+ PF_STOPMIL_END_XRPC_TRANS("Print ")
+#define PF_STOPMIL_UPDATE_XRPC_TRANS\
+ PF_STOPMIL_START\
+ PF_STOPMIL_UPDATE_BODY\
+ PF_STOPMIL_END_XRPC_TRANS("Update")
+#define PF_STOPMIL_DOCMGT_XRPC_TRANS\
+ PF_STOPMIL_START\
+ PF_STOPMIL_DOCMGT_BODY\
+ PF_PLAY_TIJAH_TAPE\
+ PF_STOPMIL_END_XRPC_TRANS("Update")
+#define PF_STOPMIL_END_XRPC_TRANS(LASTPHASE)\
+ " }); \n"\
+ " # catch all possible errors while possibly holding lock\n"\
+ " CATCH({ ws_log_wsid := ws_id(ws);\n"\
+ " if (not(isnil(err))) ws_log(ws, err); });\n"\
+ " if(not(isnil(xrpc_wslock)))\n"\
+ " lock_unset(xrpc_wslock);\n"\
+ "}}\n"\
+ PF_STOP_PFTIJAH\
+ PF_STOPMIL_END_PRINT_TIMING(LASTPHASE)
+
+int
+PFstopMIL_XRPCTrans(
+ char *buf,
+ int buflen,
+ int statement_type,
+ char *qid ) {
+ (void)qid;
+ return snprintf(buf, buflen, (statement_type==0) ?
+ PF_STOPMIL_RDONLY_XRPC_TRANS :
+ (statement_type==1) ?
+ PF_STOPMIL_UPDATE_XRPC_TRANS:
+ PF_STOPMIL_DOCMGT_XRPC_TRANS);
+}
+
const char* PFudfMIL(void) {
return
"{\n"
-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins