Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv1129/runtime
Modified Files:
Tag: xrpcdemo
pathfinder.mx pf_support.mx
Log Message:
Support for 2-phase commit.
The code assumes that there is a variable xprc_qid with value != ""
when we're in a P2P situation.
Instead of play_update_tape (which is unchanged), we call
collect_update_tape to collect all updates in the working set. The
necessary bats are always created when we're doing an update (see
ws_udpate).
Once all updates have been collected, we assume that the function
execute_update_tape is called with just the ws as parameter. This
function will get the collected updates and pass them to UpdateTape
(the algebra version of play_update_tape).
Instead of writing the commit record, we call a function
do_2phase_commit (no arguments--feel free to change) to do the work
needed for a two-phase commit. This function needs to be implemented
(there is a stub).
This code is totally untested (except that the main code of
collect_update_tape does transform the update tape into something that
we can store).
U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.3
retrieving revision 1.416.2.1.2.4
diff -u -d -r1.416.2.1.2.3 -r1.416.2.1.2.4
--- pathfinder.mx 29 May 2008 23:11:44 -0000 1.416.2.1.2.3
+++ pathfinder.mx 2 Jun 2008 14:07:55 -0000 1.416.2.1.2.4
@@ -492,6 +492,20 @@
const ADDED_ATTR := ADDED_NID + 1;
const DELETED_TEXT_NID := ADDED_ATTR + 1;
const ADDED_TEXT_NID := DELETED_TEXT_NID + 1;
+# the following are used for XRPC updates
+const COLLECTED_COMMAND := ADDED_TEXT_NID + 1;
+const COLLECTED_PRE_TGT := COLLECTED_COMMAND + 1;
+const COLLECTED_PRE_CONT_TGT := COLLECTED_PRE_TGT + 1;
+const COLLECTED_ATTR_TGT := COLLECTED_PRE_CONT_TGT + 1;
+const COLLECTED_ATTR_CONT_TGT := COLLECTED_ATTR_TGT + 1;
+const COLLECTED_REPLACE_STRINGS := COLLECTED_ATTR_CONT_TGT + 1;
+const COLLECTED_RENAME_QN_URI := COLLECTED_REPLACE_STRINGS + 1;
+const COLLECTED_RENAME_QN_PREFIX := COLLECTED_RENAME_QN_URI + 1;
+const COLLECTED_RENAME_QN_LOCAL := COLLECTED_RENAME_QN_PREFIX + 1;
+const COLLECTED_PRE_INS := COLLECTED_RENAME_QN_LOCAL + 1;
+const COLLECTED_PRE_CONT_INS := COLLECTED_PRE_INS + 1;
+const COLLECTED_ATTR_INS := COLLECTED_PRE_CONT_INS + 1;
+const COLLECTED_ATTR_CONT_INS := COLLECTED_ATTR_INS + 1;
# transaction debugging / performance profiling
@@ -615,6 +629,20 @@
.insert(oid,"added_attr")
.insert(oid,"deleted_text_nid")
.insert(oid,"added_text_nid")
+ # the below are only used for XRPC updates
+ .insert(lng,"collected_command")
+ .insert(oid,"collected_pre_tgt")
+ .insert(oid,"collected_pre_cont_tgt")
+ .insert(oid,"collected_attr_tgt")
+ .insert(oid,"collected_attr_cont_tgt")
+ .insert(str,"collected_replace_strings")
+ .insert(str,"collected_rename_qn_uri")
+ .insert(str,"collected_rename_qn_prefix")
+ .insert(str,"collected_rename_qn_local")
+ .insert(oid,"collected_pre_ins")
+ .insert(oid,"collected_pre_cont_ins")
+ .insert(oid,"collected_attr_ins")
+ .insert(oid,"collected_attr_cont_ins")
.access(BAT_READ).rename("ws_update");
# the bats that get changed using the logger in case of updates reach up until
qn_histogram
U pf_support.mx
Index: pf_support.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pf_support.mx,v
retrieving revision 1.299.4.2
retrieving revision 1.299.4.3
diff -u -d -r1.299.4.2 -r1.299.4.3
--- pf_support.mx 30 May 2008 14:34:28 -0000 1.299.4.2
+++ pf_support.mx 2 Jun 2008 14:07:57 -0000 1.299.4.3
@@ -3351,6 +3351,79 @@
do_update_end(ws, affected_conts);
}
+PROC collect_update_tape(bat[void, bat] ws, bat[void, oid] item, bat[void,
int] kind, bat[void,lng] int_values, bat[void,str] str_values) : void
+{
+ # convert MPS input to Algebra input
+ var cmd := [and]([lng](item.mirror()),
3LL).ord_uselect(0LL).mirror().leftfetchjoin(item).leftfetchjoin(int_values); #
[i,CMD]
+ var node_item := [oid]([+]([lng](cmd.mirror()),
1)).leftfetchjoin(item).tmark([EMAIL PROTECTED]); # [j,ITEM] target nodes
+ var node_kind := [oid]([+]([lng](cmd.mirror()),
1)).leftfetchjoin(kind).tmark([EMAIL PROTECTED]); # [j,KIND] target nodes
+ var item2 := [oid]([+]([lng](cmd.mirror()),
2)).leftfetchjoin(item).tmark([EMAIL PROTECTED]); # [j,ITEM] parameter
+ var kind2 := [oid]([+]([lng](cmd.mirror()),
2)).leftfetchjoin(kind).tmark([EMAIL PROTECTED]); # [j,KIND] parameter
+ var command := cmd.tmark([EMAIL PROTECTED]);
+ var node_types := node_kind.get_types(); # [j,ATTR/ELEM]
+ var node_conts := node_kind.get_container(); # [j,CONT]
+ var attr_tgt := node_types.ord_uselect(ATTR).mirror(); # [j,j] (attributes)
+ var attr_cont_tgt :=
command.mirror().outerjoin(attr_tgt.leftjoin(node_conts)); # [j,CONT/nil]
+ attr_tgt := command.mirror().outerjoin(attr_tgt.leftjoin(node_item)); #
[j,ITEM/nil]
+ var pre_tgt := node_types.ord_uselect(ELEM).mirror(); # [j,j] (elements)
+ var pre_cont_tgt :=
command.mirror().outerjoin(pre_tgt.leftjoin(node_conts)); # [j,CONT/nil]
+ pre_tgt := command.mirror().outerjoin(pre_tgt.leftjoin(node_item)); #
[j,ITEM/nil]
+
+ var renames := command.ord_uselect(UPDATE_RENAME).mirror(); # [j,j] (renames)
+ var rename_str := renames.leftjoin(item2).leftjoin(str_values); #
[j,QNLOCAL/nil]
+ var rename_qn_local := command.mirror().outerjoin(rename_str); #
[j,QNLOCAL/nil]
+ var rename_qn_uri := command.mirror().outerjoin(rename_str.project("")); #
[j,""/nil]
+ var rename_qn_prefix := command.mirror().outerjoin(rename_str.project(""));
# [j,""/nil]
+
+ var replaces := command.ord_uselect(UPDATE_REPLACE).mirror(); # [j,j]
(replaces)
+ var replace_strings :=
command.mirror().outerjoin(replaces.leftjoin(item2).leftjoin(str_values)); #
[j,REPLSTR/nil]
+
+ var inserts := command.ord_uselect(UPDATE_INSERT_FIRST,
UPDATE_REPLACENODE).mirror(); # [j,j] (inserts)
+ var insert_types := inserts.leftjoin(kind2.get_types()); # [j,ATTR/ELEM]
+ var insert_conts := inserts.leftjoin(kind2.get_container()); # [j,ATTR/ELEM]
+ var insert_attrs := insert_types.ord_uselect(ATTR).mirror(); # [j,j]
+ var attr_ins := command.mirror().outerjoin(insert_attrs.leftjoin(item2));
+ var attr_cont_ins :=
command.mirror().outerjoin(insert_attrs.leftjoin(insert_conts));
+ var insert_elems := insert_types.ord_uselect(ELEM).mirror(); # [j,j]
+ var pre_ins := command.mirror().outerjoin(insert_elems.leftjoin(item2));
+ var pre_cont_ins :=
command.mirror().outerjoin(insert_elems.leftjoin(insert_conts));
+
+ ws.fetch(COLLECTED_COMMAND).append(command);
+ ws.fetch(COLLECTED_PRE_TGT).append(pre_tgt);
+ ws.fetch(COLLECTED_PRE_CONT_TGT).append(pre_cont_tgt);
+ ws.fetch(COLLECTED_ATTR_TGT).append(attr_tgt);
+ ws.fetch(COLLECTED_ATTR_CONT_TGT).append(attr_cont_tgt);
+ ws.fetch(COLLECTED_REPLACE_STRINGS).append(replace_strings);
+ ws.fetch(COLLECTED_RENAME_QN_URI).append(rename_qn_uri);
+ ws.fetch(COLLECTED_RENAME_QN_PREFIX).append(rename_qn_prefix);
+ ws.fetch(COLLECTED_RENAME_QN_LOCAL).append(rename_qn_local);
+ ws.fetch(COLLECTED_PRE_INS).append(pre_ins);
+ ws.fetch(COLLECTED_PRE_CONT_INS).append(pre_cont_ins);
+ ws.fetch(COLLECTED_ATTR_INS).append(attr_ins);
+ ws.fetch(COLLECTED_ATTR_CONT_INS).append(attr_cont_ins);
+}
+
+PROC execute_update_tape(bat[void, bat] ws) : void
+{
+ var command := ws.fetch(COLLECTED_COMMAND).append(command);
+ var pre_tgt := ws.fetch(COLLECTED_PRE_TGT).append(pre_tgt);
+ var pre_cont_tgt := ws.fetch(COLLECTED_PRE_CONT_TGT).append(pre_cont_tgt);
+ var attr_tgt := ws.fetch(COLLECTED_ATTR_TGT).append(attr_tgt);
+ var attr_cont_tgt := ws.fetch(COLLECTED_ATTR_CONT_TGT).append(attr_cont_tgt);
+ var replace_strings :=
ws.fetch(COLLECTED_REPLACE_STRINGS).append(replace_strings);
+ var rename_qn_uri := ws.fetch(COLLECTED_RENAME_QN_URI).append(rename_qn_uri);
+ var rename_qn_prefix :=
ws.fetch(COLLECTED_RENAME_QN_PREFIX).append(rename_qn_prefix);
+ var rename_qn_local :=
ws.fetch(COLLECTED_RENAME_QN_LOCAL).append(rename_qn_local);
+ var pre_ins := ws.fetch(COLLECTED_PRE_INS).append(pre_ins);
+ var pre_cont_ins := ws.fetch(COLLECTED_PRE_CONT_INS).append(pre_cont_ins);
+ var attr_ins := ws.fetch(COLLECTED_ATTR_INS).append(attr_ins);
+ var attr_cont_ins := ws.fetch(COLLECTED_ATTR_CONT_INS).append(attr_cont_ins);
+
+ # XXX we should do some reordering of certain insert commands here
+
+ UpdateTape(ws, command, pre_tgt, pre_cont_tgt, attr_tgt, attr_cont_tgt,
replace_strings, rename_qn_uri, rename_qn_prefix, rename_qn_local, pre_ins,
pre_cont_ins, attr_ins, attr_cont_ins);
+}
+
PROC play_update_tape(bat[void, bat] ws, bat[void, oid] item, bat[void, int]
kind, bat[void,lng] int_values, bat[void,str] str_values) : void
{
# [void,oid] list of all conts of affected documents
@@ -3782,6 +3855,19 @@
b.col_name(nme).print();
}
+PROC do_2phase_commit() : void
+{
+ # write PRECOMMIT record
+ log_trans_precommit(pf_logger, xrpc_qid, ???cid);
+
+ # now wait until we get an ABORT or a COMMIT message
+ xrpc_wait_for_commit(xrpc_qid);
+
+ if (abort) {
+ ERROR("transaction aborted!");
+ }
+}
+
PROC do_log_updates(BAT[void,bat] ws, BAT[any,any] cont_order, BAT[oid,bit]
map_pid_changed_bat) : void
{
var ws_logtime := usec();
@@ -3852,7 +3938,11 @@
if (ws_log_active)
ws_log(ws, "commit-LOG_SIZE exec" + str(ws_logtime - (ws_logtime :=
usec())));
}
- log_trans_end(pf_logger); # write commit record in WAL: ==> THIS IS THE
COMMIT POINT <==
+ if (xrpc_qid != "") {
+ do_2phase_commit(); # do the complete 2 phase commit stuff
+ } else {
+ log_trans_end(pf_logger); # write commit record in WAL: ==> THIS IS THE
COMMIT POINT <==
+ }
if (ws_log_active)
ws_log(ws, "commit-LOG_TRANS_END exec" + str(ws_logtime - usec()));
-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins