Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv18319/runtime

Modified Files:
        xrpc_client.mx 
Log Message:
- added options in compiler

    xrpc:bulkrpc "yes"|"no"
    xrpc:isolation "none"|"repeatable"
    xrpc:timeout "<sec>"

- xrpc_client.mx: added implementation of one-rpc-a-time
- added tests for the new PROC doIterativeRPC, and adjusted existing
  tests
- approved sigs_xrpc and procs due to changes in signature
  NOTE: would some please approve the changes in the signature of
  'UpdateTape' and 'step' in "procs"?

 


U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.43
retrieving revision 1.44
diff -u -d -r1.43 -r1.44
--- xrpc_client.mx      4 Mar 2008 21:47:31 -0000       1.43
+++ xrpc_client.mx      28 Mar 2008 20:47:55 -0000      1.44
@@ -325,12 +325,14 @@
 
 PROC doLoopLiftedRPC(
         str options,
+        str isoLevel, # xrpc:isolation, can be 'none' or 'repeatable'
+        int timeout, # xrpc:timeout, >= 0
         str moduleNS,
         str location,
         str method,
         int updCall,
         int arity,
-        lng iterc_total,
+        lng niters,
         BAT[void, BAT] ws,
         BAT[oid,  str] dsts,
         BAT[void, oid] fun_vid,
@@ -347,7 +349,7 @@
     var unq_dsts := dsts.tunique().hmark([EMAIL PROTECTED]);
     var steps := unq_dsts.count();
     # we assume each dst has the same #iterations
-    var iterc := iterc_total / steps;
+    var iterc := niters / steps;
     var rpc_results := bat(str,oid);
     var rpc_iter := bat(void,bat).seqbase([EMAIL PROTECTED]);
     var off := count(ws.fetch(CONT_NAME));
@@ -469,6 +471,114 @@
 "DESCRIPTION: implementation of the loop-lifted RPC",
 "xrpc_client");
 
+PROC doIterativeRPC(
+        str options,
+        str isoLevel, # xrpc:isolation, can be 'none' or 'repeatable'
+        int timeout, # xrpc:timeout, >= 0
+        str moduleNS,
+        str location,
+        str method,
+        int updCall,
+        int arity,
+        lng niters,
+        BAT[void,BAT] ws,
+        BAT[oid, str] dsts,
+        BAT[void,oid] fun_vid,
+        BAT[void,oid] fun_iter,
+        BAT[void,oid] fun_item,
+        BAT[void,int] fun_kind,
+        BAT[void,lng] int_values,
+        BAT[void,dbl] dbl_values,
+        BAT[void,dbl] dec_values,
+        BAT[void,str] str_values) : BAT[void,bat]
+{
+    var time_xrpcClntSeria := 0;
+    var time_xrpcClntDeSeria := 0;
+
+    var res_bats := nil;
+    var h := 0;
+    var t := 1;
+    while (lng(h) < niters){
+        # get function parameters for this destination
+        var time_xrpcClntSeriaStart := usec();
+        var dst := dsts.fetch(h);
+        var cur_fun_iter := fun_iter.select(oid(t));
+        var cur_fun_vid  := fun_vid.fetch(cur_fun_iter.mirror()).tmark([EMAIL 
PROTECTED]);
+        var cur_fun_item := fun_item.fetch(cur_fun_iter.mirror()).tmark([EMAIL 
PROTECTED]);
+        var cur_fun_kind := fun_kind.fetch(cur_fun_iter.mirror()).tmark([EMAIL 
PROTECTED]);
+        # renumber cur_fun_iter, we only have one iteration
+        cur_fun_iter := cur_fun_iter.tmark([EMAIL PROTECTED]).project([EMAIL 
PROTECTED]);
+        time_xrpcClntSeria := time_xrpcClntSeria + (usec() - 
time_xrpcClntSeriaStart);
+
+        var local_name := "rpc_res_00" + str(h+1);
+        var rpc_oid := oid(nil);
+
+        var rpc_err := CATCH(
+        {
+            rpc_oid := rpc_client(local_name, options, dst, 
+                        moduleNS, location, method, updCall, arity, lng(1), ws,
+                        cur_fun_vid, cur_fun_iter, cur_fun_item, cur_fun_kind,
+                        int_values, dbl_values, dec_values, str_values);
+        });
+
+        # add result of this iteration to result BATs
+        var time_xrpcClntDeSeriaStart := usec();
+        if (isnil(rpc_err)) {
+            if (updCall != 1) {
+                var res := get_rpc_res(rpc_oid, local_name, ws,
+                                       int_values, dbl_values, str_values);
+                if (isnil(res_bats)){
+                    res_bats := res;
+                } else {
+                    # merge results of this iteration into existing results.
+                    res_bats := merged_union(res_bats.fetch(0), res.fetch(0), 
+                                             res_bats.fetch(1), res.fetch(1), 
+                                             res_bats.fetch(2), res.fetch(2));
+                }
+            }
+        } else {
+            # We do not want to discard results from other destinations
+            # where executions might have succeeded, so we only print a
+            # WARNING by error, iso. terminate the execution with 'ERROR'
+            printf("!WARNING: doIterativeRPC: ");
+            printf("error occurred during RPC call to \"%s\".\n", dst);
+            printf("!WARNING: Received error was: \n%s\n", rpc_err);
+        }
+        h := h + 1;
+        t := t + 1;
+
+        time_xrpcClntDeSeria := time_xrpcClntDeSeria + (usec() - 
time_xrpcClntDeSeriaStart);
+    }
+
+    var time_xrpcClntDeSeriaStart := usec();
+    if (updCall = 1){
+        # If called function is an updating function, there is no
+        # results to retrieve.  So, return empty BATs.
+        var res_iter := bat(void,oid).seqbase([EMAIL PROTECTED]);
+        var res_item := bat(void,oid).seqbase([EMAIL PROTECTED]);
+        var res_kind := bat(void,int).seqbase([EMAIL PROTECTED]);
+
+        res_bats := bat(void,bat,4).seqbase([EMAIL PROTECTED]);
+        res_bats.append(res_iter).append(res_item).append(res_kind);
+        res_bats.access(BAT_READ);
+    }
+
+    time_xrpcClntDeSeria := time_xrpcClntDeSeria + (usec() - 
time_xrpcClntDeSeriaStart);
+
+    if (options.search("timing") >= 0) {
+        printf("XRPC_Client_Serialisation (get_dst_params):   %lld microsec\n",
+                time_xrpcClntSeria);
+        printf("Client_DeSerialisation (get_rpc_res):        %lld msec\n",
+                time_xrpcClntDeSeria);
+    }
+
+    return res_bats;
+}
+ADDHELP("doIterativeRPC", "zhang", "April 2006",
+"DESCRIPTION: make a separate RPC call for ever iteration",
+"xrpc_client");
+
+
 @h
 #ifndef XRPC_CLIENT_H
 #define XRPC_CLIENT_H


-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://ad.doubleclick.net/clk;164216239;13503038;w?http://sf.net/marketplace
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to