Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv18319/runtime
Modified Files:
xrpc_client.mx
Log Message:
- added options in compiler
xrpc:bulkrpc "yes"|"no"
xrpc:isolation "none"|"repeatable"
xrpc:timeout "<sec>"
- xrpc_client.mx: added implementation of one-rpc-a-time
- added tests for the new PROC doIterativeRPC, and adjusted existing
tests
- approved sigs_xrpc and procs due to changes in signature
NOTE: would some please approve the changes in the signature of
'UpdateTape' and 'step' in "procs"?
U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.43
retrieving revision 1.44
diff -u -d -r1.43 -r1.44
--- xrpc_client.mx 4 Mar 2008 21:47:31 -0000 1.43
+++ xrpc_client.mx 28 Mar 2008 20:47:55 -0000 1.44
@@ -325,12 +325,14 @@
PROC doLoopLiftedRPC(
str options,
+ str isoLevel, # xrpc:isolation, can be 'none' or 'repeatable'
+ int timeout, # xrpc:timeout, >= 0
str moduleNS,
str location,
str method,
int updCall,
int arity,
- lng iterc_total,
+ lng niters,
BAT[void, BAT] ws,
BAT[oid, str] dsts,
BAT[void, oid] fun_vid,
@@ -347,7 +349,7 @@
var unq_dsts := dsts.tunique().hmark([EMAIL PROTECTED]);
var steps := unq_dsts.count();
# we assume each dst has the same #iterations
- var iterc := iterc_total / steps;
+ var iterc := niters / steps;
var rpc_results := bat(str,oid);
var rpc_iter := bat(void,bat).seqbase([EMAIL PROTECTED]);
var off := count(ws.fetch(CONT_NAME));
@@ -469,6 +471,114 @@
"DESCRIPTION: implementation of the loop-lifted RPC",
"xrpc_client");
+PROC doIterativeRPC(
+ str options,
+ str isoLevel, # xrpc:isolation, can be 'none' or 'repeatable'
+ int timeout, # xrpc:timeout, >= 0
+ str moduleNS,
+ str location,
+ str method,
+ int updCall,
+ int arity,
+ lng niters,
+ BAT[void,BAT] ws,
+ BAT[oid, str] dsts,
+ BAT[void,oid] fun_vid,
+ BAT[void,oid] fun_iter,
+ BAT[void,oid] fun_item,
+ BAT[void,int] fun_kind,
+ BAT[void,lng] int_values,
+ BAT[void,dbl] dbl_values,
+ BAT[void,dbl] dec_values,
+ BAT[void,str] str_values) : BAT[void,bat]
+{
+ var time_xrpcClntSeria := 0;
+ var time_xrpcClntDeSeria := 0;
+
+ var res_bats := nil;
+ var h := 0;
+ var t := 1;
+ while (lng(h) < niters){
+ # get function parameters for this destination
+ var time_xrpcClntSeriaStart := usec();
+ var dst := dsts.fetch(h);
+ var cur_fun_iter := fun_iter.select(oid(t));
+ var cur_fun_vid := fun_vid.fetch(cur_fun_iter.mirror()).tmark([EMAIL
PROTECTED]);
+ var cur_fun_item := fun_item.fetch(cur_fun_iter.mirror()).tmark([EMAIL
PROTECTED]);
+ var cur_fun_kind := fun_kind.fetch(cur_fun_iter.mirror()).tmark([EMAIL
PROTECTED]);
+ # renumber cur_fun_iter, we only have one iteration
+ cur_fun_iter := cur_fun_iter.tmark([EMAIL PROTECTED]).project([EMAIL
PROTECTED]);
+ time_xrpcClntSeria := time_xrpcClntSeria + (usec() -
time_xrpcClntSeriaStart);
+
+ var local_name := "rpc_res_00" + str(h+1);
+ var rpc_oid := oid(nil);
+
+ var rpc_err := CATCH(
+ {
+ rpc_oid := rpc_client(local_name, options, dst,
+ moduleNS, location, method, updCall, arity, lng(1), ws,
+ cur_fun_vid, cur_fun_iter, cur_fun_item, cur_fun_kind,
+ int_values, dbl_values, dec_values, str_values);
+ });
+
+ # add result of this iteration to result BATs
+ var time_xrpcClntDeSeriaStart := usec();
+ if (isnil(rpc_err)) {
+ if (updCall != 1) {
+ var res := get_rpc_res(rpc_oid, local_name, ws,
+ int_values, dbl_values, str_values);
+ if (isnil(res_bats)){
+ res_bats := res;
+ } else {
+ # merge results of this iteration into existing results.
+ res_bats := merged_union(res_bats.fetch(0), res.fetch(0),
+ res_bats.fetch(1), res.fetch(1),
+ res_bats.fetch(2), res.fetch(2));
+ }
+ }
+ } else {
+ # We do not want to discard results from other destinations
+ # where executions might have succeeded, so we only print a
+ # WARNING by error, iso. terminate the execution with 'ERROR'
+ printf("!WARNING: doIterativeRPC: ");
+ printf("error occurred during RPC call to \"%s\".\n", dst);
+ printf("!WARNING: Received error was: \n%s\n", rpc_err);
+ }
+ h := h + 1;
+ t := t + 1;
+
+ time_xrpcClntDeSeria := time_xrpcClntDeSeria + (usec() -
time_xrpcClntDeSeriaStart);
+ }
+
+ var time_xrpcClntDeSeriaStart := usec();
+ if (updCall = 1){
+ # If called function is an updating function, there is no
+ # results to retrieve. So, return empty BATs.
+ var res_iter := bat(void,oid).seqbase([EMAIL PROTECTED]);
+ var res_item := bat(void,oid).seqbase([EMAIL PROTECTED]);
+ var res_kind := bat(void,int).seqbase([EMAIL PROTECTED]);
+
+ res_bats := bat(void,bat,4).seqbase([EMAIL PROTECTED]);
+ res_bats.append(res_iter).append(res_item).append(res_kind);
+ res_bats.access(BAT_READ);
+ }
+
+ time_xrpcClntDeSeria := time_xrpcClntDeSeria + (usec() -
time_xrpcClntDeSeriaStart);
+
+ if (options.search("timing") >= 0) {
+ printf("XRPC_Client_Serialisation (get_dst_params): %lld microsec\n",
+ time_xrpcClntSeria);
+ printf("Client_DeSerialisation (get_rpc_res): %lld msec\n",
+ time_xrpcClntDeSeria);
+ }
+
+ return res_bats;
+}
+ADDHELP("doIterativeRPC", "zhang", "April 2006",
+"DESCRIPTION: make a separate RPC call for ever iteration",
+"xrpc_client");
+
+
@h
#ifndef XRPC_CLIENT_H
#define XRPC_CLIENT_H
-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://ad.doubleclick.net/clk;164216239;13503038;w?http://sf.net/marketplace
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins