Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv1129/runtime

Modified Files:
      Tag: xrpcdemo
        pathfinder.mx pf_support.mx 
Log Message:
Support for 2-phase commit.

The code assumes that there is a variable xprc_qid with value != ""
when we're in a P2P situation.

Instead of play_update_tape (which is unchanged), we call
collect_update_tape to collect all updates in the working set.  The
necessary bats are always created when we're doing an update (see
ws_udpate).

Once all updates have been collected, we assume that the function
execute_update_tape is called with just the ws as parameter.  This
function will get the collected updates and pass them to UpdateTape
(the algebra version of play_update_tape).

Instead of writing the commit record, we call a function
do_2phase_commit (no arguments--feel free to change) to do the work
needed for a two-phase commit.  This function needs to be implemented
(there is a stub).

This code is totally untested (except that the main code of
collect_update_tape does transform the update tape into something that
we can store).



U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.3
retrieving revision 1.416.2.1.2.4
diff -u -d -r1.416.2.1.2.3 -r1.416.2.1.2.4
--- pathfinder.mx       29 May 2008 23:11:44 -0000      1.416.2.1.2.3
+++ pathfinder.mx       2 Jun 2008 14:07:55 -0000       1.416.2.1.2.4
@@ -492,6 +492,20 @@
 const ADDED_ATTR := ADDED_NID + 1;
 const DELETED_TEXT_NID := ADDED_ATTR + 1;
 const ADDED_TEXT_NID := DELETED_TEXT_NID + 1;
+# the following are used for XRPC updates
+const COLLECTED_COMMAND := ADDED_TEXT_NID + 1;
+const COLLECTED_PRE_TGT := COLLECTED_COMMAND + 1;
+const COLLECTED_PRE_CONT_TGT := COLLECTED_PRE_TGT + 1;
+const COLLECTED_ATTR_TGT := COLLECTED_PRE_CONT_TGT + 1;
+const COLLECTED_ATTR_CONT_TGT := COLLECTED_ATTR_TGT + 1;
+const COLLECTED_REPLACE_STRINGS := COLLECTED_ATTR_CONT_TGT + 1;
+const COLLECTED_RENAME_QN_URI := COLLECTED_REPLACE_STRINGS + 1;
+const COLLECTED_RENAME_QN_PREFIX := COLLECTED_RENAME_QN_URI + 1;
+const COLLECTED_RENAME_QN_LOCAL := COLLECTED_RENAME_QN_PREFIX + 1;
+const COLLECTED_PRE_INS := COLLECTED_RENAME_QN_LOCAL + 1;
+const COLLECTED_PRE_CONT_INS := COLLECTED_PRE_INS + 1;
+const COLLECTED_ATTR_INS := COLLECTED_PRE_CONT_INS + 1;
+const COLLECTED_ATTR_CONT_INS := COLLECTED_ATTR_INS + 1;
 
 
 # transaction debugging / performance profiling 
@@ -615,6 +629,20 @@
        .insert(oid,"added_attr")
        .insert(oid,"deleted_text_nid")
        .insert(oid,"added_text_nid")
+       # the below are only used for XRPC updates
+       .insert(lng,"collected_command")
+       .insert(oid,"collected_pre_tgt")
+       .insert(oid,"collected_pre_cont_tgt")
+       .insert(oid,"collected_attr_tgt")
+       .insert(oid,"collected_attr_cont_tgt")
+       .insert(str,"collected_replace_strings")
+       .insert(str,"collected_rename_qn_uri")
+       .insert(str,"collected_rename_qn_prefix")
+       .insert(str,"collected_rename_qn_local")
+       .insert(oid,"collected_pre_ins")
+       .insert(oid,"collected_pre_cont_ins")
+       .insert(oid,"collected_attr_ins")
+       .insert(oid,"collected_attr_cont_ins")
        .access(BAT_READ).rename("ws_update");
 
 # the bats that get changed using the logger in case of updates reach up until 
qn_histogram

U pf_support.mx
Index: pf_support.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pf_support.mx,v
retrieving revision 1.299.4.2
retrieving revision 1.299.4.3
diff -u -d -r1.299.4.2 -r1.299.4.3
--- pf_support.mx       30 May 2008 14:34:28 -0000      1.299.4.2
+++ pf_support.mx       2 Jun 2008 14:07:57 -0000       1.299.4.3
@@ -3351,6 +3351,79 @@
   do_update_end(ws, affected_conts);
 }
 
+PROC collect_update_tape(bat[void, bat] ws, bat[void, oid] item, bat[void, 
int] kind, bat[void,lng] int_values, bat[void,str] str_values) : void
+{
+  # convert MPS input to Algebra input
+  var cmd := [and]([lng](item.mirror()), 
3LL).ord_uselect(0LL).mirror().leftfetchjoin(item).leftfetchjoin(int_values); # 
[i,CMD]
+  var node_item := [oid]([+]([lng](cmd.mirror()), 
1)).leftfetchjoin(item).tmark([EMAIL PROTECTED]); # [j,ITEM] target nodes
+  var node_kind := [oid]([+]([lng](cmd.mirror()), 
1)).leftfetchjoin(kind).tmark([EMAIL PROTECTED]); # [j,KIND] target nodes
+  var item2 := [oid]([+]([lng](cmd.mirror()), 
2)).leftfetchjoin(item).tmark([EMAIL PROTECTED]); # [j,ITEM] parameter
+  var kind2 := [oid]([+]([lng](cmd.mirror()), 
2)).leftfetchjoin(kind).tmark([EMAIL PROTECTED]); # [j,KIND] parameter
+  var command := cmd.tmark([EMAIL PROTECTED]);
+  var node_types := node_kind.get_types(); # [j,ATTR/ELEM]
+  var node_conts := node_kind.get_container(); # [j,CONT]
+  var attr_tgt := node_types.ord_uselect(ATTR).mirror(); # [j,j] (attributes)
+  var attr_cont_tgt := 
command.mirror().outerjoin(attr_tgt.leftjoin(node_conts)); # [j,CONT/nil]
+  attr_tgt := command.mirror().outerjoin(attr_tgt.leftjoin(node_item)); # 
[j,ITEM/nil]
+  var pre_tgt := node_types.ord_uselect(ELEM).mirror(); # [j,j] (elements)
+  var pre_cont_tgt := 
command.mirror().outerjoin(pre_tgt.leftjoin(node_conts)); # [j,CONT/nil]
+  pre_tgt := command.mirror().outerjoin(pre_tgt.leftjoin(node_item)); # 
[j,ITEM/nil]
+
+  var renames := command.ord_uselect(UPDATE_RENAME).mirror(); # [j,j] (renames)
+  var rename_str := renames.leftjoin(item2).leftjoin(str_values); # 
[j,QNLOCAL/nil]
+  var rename_qn_local := command.mirror().outerjoin(rename_str); # 
[j,QNLOCAL/nil]
+  var rename_qn_uri := command.mirror().outerjoin(rename_str.project("")); # 
[j,""/nil]
+  var rename_qn_prefix := command.mirror().outerjoin(rename_str.project("")); 
# [j,""/nil]
+
+  var replaces := command.ord_uselect(UPDATE_REPLACE).mirror(); # [j,j] 
(replaces)
+  var replace_strings := 
command.mirror().outerjoin(replaces.leftjoin(item2).leftjoin(str_values)); # 
[j,REPLSTR/nil]
+
+  var inserts := command.ord_uselect(UPDATE_INSERT_FIRST, 
UPDATE_REPLACENODE).mirror(); # [j,j] (inserts)
+  var insert_types := inserts.leftjoin(kind2.get_types()); # [j,ATTR/ELEM]
+  var insert_conts := inserts.leftjoin(kind2.get_container()); # [j,ATTR/ELEM]
+  var insert_attrs := insert_types.ord_uselect(ATTR).mirror(); # [j,j]
+  var attr_ins := command.mirror().outerjoin(insert_attrs.leftjoin(item2));
+  var attr_cont_ins := 
command.mirror().outerjoin(insert_attrs.leftjoin(insert_conts));
+  var insert_elems := insert_types.ord_uselect(ELEM).mirror(); # [j,j]
+  var pre_ins := command.mirror().outerjoin(insert_elems.leftjoin(item2));
+  var pre_cont_ins := 
command.mirror().outerjoin(insert_elems.leftjoin(insert_conts));
+
+  ws.fetch(COLLECTED_COMMAND).append(command);
+  ws.fetch(COLLECTED_PRE_TGT).append(pre_tgt);
+  ws.fetch(COLLECTED_PRE_CONT_TGT).append(pre_cont_tgt);
+  ws.fetch(COLLECTED_ATTR_TGT).append(attr_tgt);
+  ws.fetch(COLLECTED_ATTR_CONT_TGT).append(attr_cont_tgt);
+  ws.fetch(COLLECTED_REPLACE_STRINGS).append(replace_strings);
+  ws.fetch(COLLECTED_RENAME_QN_URI).append(rename_qn_uri);
+  ws.fetch(COLLECTED_RENAME_QN_PREFIX).append(rename_qn_prefix);
+  ws.fetch(COLLECTED_RENAME_QN_LOCAL).append(rename_qn_local);
+  ws.fetch(COLLECTED_PRE_INS).append(pre_ins);
+  ws.fetch(COLLECTED_PRE_CONT_INS).append(pre_cont_ins);
+  ws.fetch(COLLECTED_ATTR_INS).append(attr_ins);
+  ws.fetch(COLLECTED_ATTR_CONT_INS).append(attr_cont_ins);
+}
+
+PROC execute_update_tape(bat[void, bat] ws) : void
+{
+  var command := ws.fetch(COLLECTED_COMMAND).append(command);
+  var pre_tgt := ws.fetch(COLLECTED_PRE_TGT).append(pre_tgt);
+  var pre_cont_tgt := ws.fetch(COLLECTED_PRE_CONT_TGT).append(pre_cont_tgt);
+  var attr_tgt := ws.fetch(COLLECTED_ATTR_TGT).append(attr_tgt);
+  var attr_cont_tgt := ws.fetch(COLLECTED_ATTR_CONT_TGT).append(attr_cont_tgt);
+  var replace_strings := 
ws.fetch(COLLECTED_REPLACE_STRINGS).append(replace_strings);
+  var rename_qn_uri := ws.fetch(COLLECTED_RENAME_QN_URI).append(rename_qn_uri);
+  var rename_qn_prefix := 
ws.fetch(COLLECTED_RENAME_QN_PREFIX).append(rename_qn_prefix);
+  var rename_qn_local := 
ws.fetch(COLLECTED_RENAME_QN_LOCAL).append(rename_qn_local);
+  var pre_ins := ws.fetch(COLLECTED_PRE_INS).append(pre_ins);
+  var pre_cont_ins := ws.fetch(COLLECTED_PRE_CONT_INS).append(pre_cont_ins);
+  var attr_ins := ws.fetch(COLLECTED_ATTR_INS).append(attr_ins);
+  var attr_cont_ins := ws.fetch(COLLECTED_ATTR_CONT_INS).append(attr_cont_ins);
+
+  # XXX we should do some reordering of certain insert commands here
+
+  UpdateTape(ws, command, pre_tgt, pre_cont_tgt, attr_tgt, attr_cont_tgt, 
replace_strings, rename_qn_uri, rename_qn_prefix, rename_qn_local, pre_ins, 
pre_cont_ins, attr_ins, attr_cont_ins);
+}
+
 PROC play_update_tape(bat[void, bat] ws, bat[void, oid] item, bat[void, int] 
kind, bat[void,lng] int_values, bat[void,str] str_values) : void
 {
   # [void,oid] list of all conts of affected documents
@@ -3782,6 +3855,19 @@
 b.col_name(nme).print();
 }
 
+PROC do_2phase_commit() : void
+{
+  # write PRECOMMIT record
+  log_trans_precommit(pf_logger, xrpc_qid, ???cid);
+
+  # now wait until we get an ABORT or a COMMIT message
+  xrpc_wait_for_commit(xrpc_qid);
+
+  if (abort) {
+    ERROR("transaction aborted!");
+  }
+}
+
 PROC do_log_updates(BAT[void,bat] ws, BAT[any,any] cont_order, BAT[oid,bit] 
map_pid_changed_bat) : void
 {
   var ws_logtime := usec(); 
@@ -3852,7 +3938,11 @@
     if (ws_log_active)
       ws_log(ws, "commit-LOG_SIZE exec" + str(ws_logtime - (ws_logtime := 
usec()))); 
   }
-  log_trans_end(pf_logger); # write commit record in WAL: ==> THIS IS THE 
COMMIT POINT <==
+  if (xrpc_qid != "") {
+    do_2phase_commit(); # do the complete 2 phase commit stuff
+  } else {
+    log_trans_end(pf_logger); # write commit record in WAL: ==> THIS IS THE 
COMMIT POINT <==
+  }
 
   if (ws_log_active)
     ws_log(ws, "commit-LOG_TRANS_END exec" + str(ws_logtime - usec()));


-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to