Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16:/tmp/cvs-serv4623/runtime
Modified Files:
pathfinder.mx pf_support.mx shredder.mx
Log Message:
- add name param to MT_init_lock/MT_init_sema etc (for lock profiling)
- restructured ws_opencoll to avoid exclusive locking while opening a document
exclusive document access now also depends on a meta bat (shortlock_barrier)
that contains barrier semaphores.
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.324
retrieving revision 1.325
diff -u -d -r1.324 -r1.325
--- pathfinder.mx 7 Apr 2007 14:56:36 -0000 1.324
+++ pathfinder.mx 9 Apr 2007 21:44:58 -0000 1.325
@@ -536,6 +536,10 @@
var pf_free := pflock_get(2); # lock held while freeing ws-es
var pf_free_held := 0LL; # contains wsid that has the lock (for robust
resource release)
+# [coll-lock,barrier] bat for getting non-exclusive/exclusive coll-locks
+var shortlock_barrier := bat(lock,sema,10000).rename("shortlock_barrier");
+var pf_collbarrier_lock := lock_create(); # proptector for shortlock_barrier
bat
+
# master bat *READS* must protect themselves against *independent* master bat
updates, and
# in particular extends. Such BATextends may reallocate the base address of
the heaps.
# Thus readers take a nonexclusive locks, and appends (that potentially
extend) take
@@ -665,6 +669,7 @@
var collection_zombie := bat(oid,lng).rename("collection_zombie");
# bat[oid,lng] contains ws still using the now defunct
collection (TRANSIENT)
+
var doc_collection; # bat[oid,oid] collection-id
var doc_name; # bat[oid,str] document name
var doc_location; # bat[oid,str] document URI
@@ -1221,8 +1226,25 @@
}
if (bit(mode and COLL_SHORTLOCK)) {
var coll_shortlock :=
reverse(ws.fetch(CONT_RUNTIME).fetch(cont)).fetch(RT_LOCK_FREELIST);
- ws.fetch(CONT_LOCKED).append(coll_shortlock);
+
lock_set(coll_shortlock);
+ lock_set(pf_collbarrier_lock);
+ if (shortlock_barrier.exist(coll_shortlock,sema_nil)) { # there
are readers in ws_opencoll!!
+ var barrier := sema_create(0);
+ shortlock_barrier.insert(coll_shortlock,barrier);
+ while(shortlock_barrier.exist(coll_shortlock,sema_nil)) {
+ lock_unset(pf_collbarrier_lock);
+ lock_unset(coll_shortlock);
+ sema_down(barrier); # wait for the readers
+ lock_set(coll_shortlock);
+ lock_set(pf_collbarrier_lock);
+ }
+ shortlock_barrier.delete(coll_shortlock,barrier);
+ sema_destroy(barrier);
+ }
+ lock_unset(pf_collbarrier_lock);
+
+ ws.fetch(CONT_LOCKED).append(coll_shortlock);
if (ws_log_active)
ws_log(ws, nme + " (coll_lock_set) shortlock" + str(ws_logtime
- (ws_logtime := usec())));
checkpoint_protect(ws);
@@ -1413,30 +1435,21 @@
ws_free(ws_id(ws)); # deregister working set from meta tables
}
-# add a collection to the working set.
-# - open all master bats (remap the rid_* bats into pre_*s).
-# - create a runtime index on persistent ocuments (we do not deem index
creation
-# worthwhile on temporary documents taht only exist during this query).
-PROC __ws_opencoll(BAT[void,bat] ws,
- BAT[lock,bat] runtime,
- BAT[str,bat] docBAT,
- str colname,
- oid coll_oid) : oid
+PROC __ws_opencoll(BAT[void,BAT] ws,
+ BAT [str,bat] docBAT,
+ str colname, oid coll_oid, oid cont) : BAT[oid,oid]
{
- # get map, pre and mem from the master bats
+ # get the bats, isolate them, and add them to the working set
var dsk := docBAT.tmark(oid(_MAP_PID));
var map_pid := copy(dsk.fetch(MAP_PID));
var pid_map := map_pid;
var pre, mem, isolate := (ttype(map_pid) = oid);
- var prefix := "runtime" + str(abs(int(runtime))) + "_" +
str(int(coll_oid));
-
+ var free_pages := empty_bat;
+
if (isolate) {
# on each document load, we update the free page list (i.e. garbage
collection)
- var free_pages :=
map_pid.ord_uselect(oid_nil).access(BAT_WRITE).revert(); # list of free pages
- free_pages.bbpname(prefix + "_free_pages");
+ free_pages := map_pid.ord_uselect(oid_nil).access(BAT_WRITE).revert();
# list of free pages
if ((free_pages.count() = 0) and (free_pages.htype() = void))
free_pages.seqbase([EMAIL PROTECTED]); # give it a non-nil head
- var coll_shortlock := reverse(runtime).fetch(RT_LOCK_FREELIST);
- runtime.replace(coll_shortlock, free_pages);
# sort the non-nil entries to get used-page-list (avoids sorting using
positional insert trick)
pid_map :=
map_pid.slice(0,count(map_pid)-(1+count(free_pages))).copy().access(BAT_WRITE);
@@ -1452,7 +1465,6 @@
}
# register new container in ws, and append all bats to it
- var cont := oid(count(ws.fetch(CONT_COLL)));
var cont_bat := constant2bat(cont);
ws.fetch(MAP_PID).append(map_pid);
ws.fetch(PID_MAP).append(pid_map);
@@ -1462,11 +1474,27 @@
ws.fetch(ATTR_CONT).append(cont_bat);
ws.fetch(CONT_COLL).append(coll_oid);
ws.fetch(CONT_NAME).append(colname);
- ws.fetch(CONT_RUNTIME).append(runtime);
mirror(ws_dsk).leftfetchjoin(ws).[append](dsk);
- # initialize runtime on first load (this involves full table scans and may
cost)
+ # region index (start/end attributes, standoff steps) left empty until
first use
+ ws.fetch(REGION_PRE).append(empty_bat);
+ ws.fetch(REGION_START).append(empty_bat);
+ ws.fetch(REGION_END).append(empty_bat);
+
+ return free_pages;
+}
+
+# create a runtime index on persistent documents (we do not deem index creation
+# worthwhile on temporary documents that only exist during this query).
+PROC __ws_indexcoll(BAT[void,bat] ws,
+ BAT[lock,bat] runtime,
+ BAT[oid,any] free_pages, str colname, oid coll_oid, oid
cont) : void
+{
+ var ws_logtime := usec();
+ var prefix := "runtime" + str(abs(int(runtime))) + "_" + colname + "_" +
str(int(coll_oid));
var idx, ins, del, own, unq, vxm, vxi, vxd;
+ var isolate := (ttype(ws.fetch(MAP_PID).fetch(cont)) = oid);
+
if (count(runtime) > RT_QN_NID) {
# a previous query already created the indices, just grab them
own := runtime.fetch(RT_ATTR_OWN);
@@ -1478,6 +1506,10 @@
vxi := runtime.fetch(RT_VX_HSH_NID_INS);
vxd := runtime.fetch(RT_VX_HSH_NID_DEL);
} else {
+ # insert list of free pages
+ free_pages.bbpname(prefix + "_free_pages");
+ runtime.replace(coll_shortlock, free_pages);
+
# only index element qnames that occur <M times, such that M.log(M) < N
var cnt := ws.fetch(QN_HISTOGRAM).fetch(cont);
var N := count(ws.fetch(PRE_SIZE).fetch(cont));
@@ -1518,7 +1550,7 @@
if (isolate) {
# create the shared hash table on ATTR_OWN (essential during
serialization of anything)
reverse(own := copy(own)).accbuild("hash");
- free_nids :=
dsk.fetch(NID_RID).uselect(oid_nil).access(BAT_WRITE).revert();
+ free_nids :=
ws.fetch(NID_RID).fetch(cont).uselect(oid_nil).access(BAT_WRITE).revert();
if ((free_nids.count() = 0) and (free_nids.htype() = void)) {
free_nids.seqbase([EMAIL PROTECTED]);
} else {
@@ -1578,26 +1610,16 @@
runtime.insert(lock_nil,vxm);
runtime.insert(lock_nil,vxi);
runtime.insert(lock_nil,vxd);
+
+ # Set a unique number (mangled from the wsid in the runtime).
+ # it will be zapped by any update. therefore, precense of this id
+ # proves absence of updates. We enforce an odd mangled number (or
with 1)
+ # to avoid ever confusing such numbers with real lock values
+ # (the lock values in RT_LOCK_FREELIST and RT_NID_FREELIST runtime
BUNs are
+ # sometimes used as lookup keys -- should not collide with this
mangled id).
+ reverse(runtime).replace(colname_locks, lock((ws_id(ws) >> 32) or
1LL));
}
}
-
- # region index (start/end attributes, standoff steps)
- ws.fetch(REGION_PRE).append(empty_bat);
- ws.fetch(REGION_START).append(empty_bat);
- ws.fetch(REGION_END).append(empty_bat);
- #
- # Set a unique number (mangled from the wsid in the runtime).
- # it will be zapped by any update. therefore, precense of this id
- # proves absence of updates. We enforce an odd mangled number (or with 1)
- # to avoid ever confusing such numbers with real lock values
- # (the lock values in RT_LOCK_FREELIST and RT_NID_FREELIST runtime BUNs
are
- # sometimes used as lookup keys -- should not collide with this mangled
id).
- reverse(runtime).replace(colname_locks, lock((ws_id(ws) >> 32) or 1LL));
-
- # for accessing ATTR_OWN, we split it in an old (shared, hash-indexed) and
new part.
- ws.fetch(ATTR_OWN_SHARED).append(own);
-
ws.fetch(ATTR_OWN_PRIVATE).append(ws.fetch(ATTR_OWN).fetch(cont).slice(count(own),int_nil));
-
# add the indices to the private transaction environment (ie ws)
if (isolate) { # copy ins/del for isolation
ins := ins.copy().access(BAT_WRITE);
@@ -1606,6 +1628,8 @@
vxi := vxi.copy().access(BAT_WRITE);
vxd := vxd.copy().access(BAT_WRITE);
}
+
+ # store the indices in the working set
ws.fetch(QN_NID).append(idx);
ws.fetch(QN_NID_INS).append(ins);
ws.fetch(QN_NID_DEL).append(del);
@@ -1614,7 +1638,9 @@
ws.fetch(VX_HSH_NID_INS).append(vxi);
ws.fetch(VX_HSH_NID_DEL).append(vxd);
- return cont;
+ # for accessing ATTR_OWN, we split it in an old (shared, hash-indexed) and
new part.
+ ws.fetch(ATTR_OWN_SHARED).append(own);
+
ws.fetch(ATTR_OWN_PRIVATE).append(ws.fetch(ATTR_OWN).fetch(cont).slice(count(own),int_nil));
}
# returns all other queries that have this collection open
@@ -1678,31 +1704,66 @@
PROC ws_opencoll(BAT[void,BAT] ws,
BAT[str,bat] docBAT,
- str name, oid coll_oid) : oid
+ str colname, oid coll_oid) : oid
{
- var ws_logtime := usec(), err, cont, coll_shortlock, runtime;
+ var err, ws_logtime := usec(), runtime;
+ # get a handle to the shared runtime bat (create if first access)
if (coll_oid >= DOCID_MIN) {
lock_set(pf_short);
- err := CATCH(coll_shortlock := reverse(runtime :=
_ws_opencoll(ws_id(ws), coll_oid)).fetch(RT_LOCK_FREELIST));
+ if (ws_log_active)
+ ws_log(ws, sprintf("ws_opencoll(%s) shortlock%s\n", colname,
str(ws_logtime - usec())));
+ err := CATCH(runtime := _ws_opencoll(ws_id(ws), coll_oid));
lock_unset(pf_short);
if (not(isnil(err))) ERROR(err);
if (ws_log_active)
- ws_log(ws, sprintf("_ws_opencoll(%s) exec%s\n", name,
str(ws_logtime - (ws_logtime := usec()))));
-
- lock_set(coll_shortlock); # never lock a collection inside the short
lock
- if (ws_log_active)
- ws_log(ws, sprintf("opencoll(%s) shortlock%s\n", name,
str(ws_logtime - usec())));
+ ws_log(ws, sprintf("_ws_opencoll(%s) exec%s\n", colname,
str(ws_logtime - (ws_logtime := usec()))));
} else {
runtime := runtime(lock_nil,lock_nil);
}
- err := CATCH(cont := __ws_opencoll(ws, runtime, docBAT, name, coll_oid));
+ ws.fetch(CONT_RUNTIME).append(runtime);
+
+ # first opener creates a collection barrier that will block write access
(coll_lock_set)
+ var coll_shortlock := reverse(runtime).fetch(RT_LOCK_FREELIST);
+ var cont := oid(count(ws.fetch(CONT_COLL)));
+
+ # get non-exclusive read-only access
if (coll_oid >= DOCID_MIN) {
- lock_unset(coll_shortlock); # release lock acquired for updatable bats
+ lock_set(coll_shortlock);
+ lock_set(pf_collbarrier_lock);
+ shortlock_barrier.insert(coll_shortlock, sema_nil);
+ lock_unset(pf_collbarrier_lock);
+ lock_unset(coll_shortlock); # allow concurrent __ws_opencoll()
}
- if (not(isnil(err))) ERROR(err);
+ # first get the isolated master bat images (NOTE: avoid coll_shortlock
this bat access)
+ var free_pages;
+ err := CATCH(free_pages := __ws_opencoll(ws, docBAT, colname, coll_oid,
cont));
+
+ # now add the indices. is fast if already created. But here we need
exclusive collection access
+ if (coll_oid >= DOCID_MIN) lock_set(coll_shortlock);
+ var ws_exectime := usec();
+ if (isnil(err))
+ err := CATCH(__ws_indexcoll(ws, runtime, free_pages, colname,
coll_oid, cont));
+
+ # last non-exclusive accesser wakes up exclusive requests (coll_lock_set)
+ if (coll_oid >= DOCID_MIN) {
+ var barriers := empty_bat;
+ lock_set(pf_collbarrier_lock);
+ shortlock_barrier.delete(coll_shortlock, sema_nil);
+ if (not(shortlock_barrier.exist(coll_shortlock, sema_nil))) # is last?
+ barriers := reverse(shortlock_barrier).select(coll_shortlock);
+ lock_unset(pf_collbarrier_lock);
+ if (bit(count(barriers))) [sema_up](barriers);
+ lock_unset(coll_shortlock);
+ }
+
+ # we made sure __ws_opencoll/__ws_indexcoll errors were caught to free the
locks anyway.
+ if (not(isnil(err))) ERROR(err);
+ if (ws_log_active)
+ ws_log(ws, sprintf("__ws_opencoll(%s) shortlock%s exec %s\n",
+ colname, str(ws_logtime - ws_exectime)),
str(ws_exectime - usec()));
return cont;
}
@@ -2091,8 +2152,6 @@
return commitbat;
}
-
-
PROC __runtime_newpage(BAT[lock,bat] runtime) : oid
{
var free_pages := runtime.fetch(RT_LOCK_FREELIST);
@@ -2604,7 +2663,6 @@
{
var wsid := ws_id(ws);
var runtime := ws.fetch(CONT_RUNTIME).fetch(cont);
- var coll_shortlock := reverse(runtime).fetch(RT_LOCK_FREELIST);
# if there is a free page, then this is easy
lock_set(coll_shortlock);
@@ -2629,10 +2687,9 @@
var map_pid := ws.fetch(_MAP_PID).find(cont);
# fully protect/lock the collection during bat extend
- lock_set(coll_shortlock);
- checkpoint_protect(ws);
+ coll_lock_set(ws, cont, COLL_SHORTLOCK, "ws_newpage");
+ var ws_logtime := usec();
sema_down(pf_extend_barrier);
- ws.fetch(CONT_LOCKED).append(coll_shortlock).append(lock_nil);
var last_pid, new_pid := 1 + ((count(rid_size) - 1) >>
REMAP_PAGE_BITS);
@@ -2642,9 +2699,7 @@
# unprotect/lock the collection after bat extend
sema_up(pf_extend_barrier);
- checkpoint_unprotect(ws);
- lock_unset(coll_shortlock);
-
ws.fetch(CONT_LOCKED).reverse().delete(lock_nil).delete(coll_shortlock);
+ coll_lock_unset(ws, cont, COLL_SHORTLOCK, "ws_newpage",
ws_logtime);
var pages_free := reverse(runtime.fetch(RT_LOCK_FREELIST));
var newpids := bat(void,oid);
@@ -2715,11 +2770,10 @@
var nid_off := count(nid_rid);
var nid_cnt;
- lock_set(coll_shortlock);
- checkpoint_protect(ws);
+ coll_lock_set(ws, cont, COLL_SHORTLOCK, "ws_newnids");
+ var ws_logtime := usec();
pf_assert(CATCH(nid_cnt := __ws_newnids(nid_rid, cnt)), "master
update failed (newnids)");
- checkpoint_unprotect(ws);
- lock_unset(coll_shortlock);
+ coll_lock_unset(ws, cont, COLL_SHORTLOCK, "ws_newnids",
ws_logtime);
# put nids in reverse order in the freelist, so they are given out
in order
newnids.access(BAT_WRITE).append(reverse(nid_rid.slice(nid_off,(nid_off :+=
cnt) - 1)));
@@ -5498,6 +5552,7 @@
p++;
switch(*p) {
case 'p': /* profile */
+ if (genType != buf) MT_locktrace_start();
genType = buf; /* now includes "timing-" prefix */
break;
case 'o': /* output format */
@@ -5574,6 +5629,7 @@
xquery_client_reset(ctx, NULL);
}
xquery_client_end(mc, msg);
+ if (genType == buf) MT_locktrace_end();
}
void
@@ -5780,9 +5836,9 @@
bat *
xquery_prelude(void)
{
- MT_init_lock(pf_compiler_lock);
- MT_init_lock(pf_module_lock);
- MT_init_lock(pf_cache_lock);
+ MT_init_lock(pf_compiler_lock, "pf_compiler_lock");
+ MT_init_lock(pf_module_lock, "pf_module_lock");
+ MT_init_lock(pf_cache_lock, "pf_cache_lock");
xmlInitParser();
Index: pf_support.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pf_support.mx,v
retrieving revision 1.205
retrieving revision 1.206
diff -u -d -r1.205 -r1.206
--- pf_support.mx 4 Apr 2007 22:23:56 -0000 1.205
+++ pf_support.mx 9 Apr 2007 21:44:59 -0000 1.206
@@ -432,6 +432,7 @@
const str_nil := str(nil);
const stream_nil := Stream(nil);
const lock_nil := lock(nil);
+const sema_nil := sema(nil);
const timestamp_nil := timestamp(nil);
PROC addValues(bat[void,any::1] container, any::1 delta) : oid
@@ -6946,15 +6947,15 @@
}
bat* pf_support_prelude() {
- MT_init_lock(pf_runtime_lock[0]);
- MT_init_lock(pf_runtime_lock[1]);
- MT_init_lock(pf_runtime_lock[2]);
- MT_init_lock(pf_runtime_lock[3]);
- MT_init_lock(pf_runtime_lock[4]);
- MT_init_lock(pf_runtime_lock[5]);
- MT_init_sema(pf_runtime_sema[0],1);
- MT_init_sema(pf_runtime_sema[1],1);
- MT_init_sema(pf_runtime_sema[2],1);
+ MT_init_lock(pf_runtime_lock[0], "PF_SHORT_LOCK");
+ MT_init_lock(pf_runtime_lock[1], "PF_WAL_LOCK");
+ MT_init_lock(pf_runtime_lock[2], "PF_FREE_LOCK");
+ MT_init_lock(pf_runtime_lock[3], "PF_EXTEND_LOCK");
+ MT_init_lock(pf_runtime_lock[5], "PF_META_LOCK");
+ MT_init_lock(pf_runtime_lock[4], "PF_UPDATE_LOCK");
+ MT_init_sema(pf_runtime_sema[0],1, "PF_META_BARRIER");
+ MT_init_sema(pf_runtime_sema[1],1, "PF_UPDATE_BARRIER");
+ MT_init_sema(pf_runtime_sema[2],1, "PF_EXTEND_BARRIER");
ws_overlaps_ws = BATnew(TYPE_lng, TYPE_lng, 1024);
BBPrename(ws_overlaps_ws->batCacheid, "ws_overlaps_ws");
(void) BATprepareHash(ws_overlaps_ws);
Index: shredder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/shredder.mx,v
retrieving revision 1.124
retrieving revision 1.125
diff -u -d -r1.124 -r1.125
--- shredder.mx 3 Apr 2007 00:06:32 -0000 1.124
+++ shredder.mx 9 Apr 2007 21:44:59 -0000 1.125
@@ -385,6 +385,7 @@
MT_Lock *coll_lock;
MT_Sema *extend_sema;
+ BAT *lock_barrier;
} shredCtxStruct;
/* some string buffer sizes */
@@ -523,17 +524,17 @@
if (shredCtx->coll_lock) {
/* yield the lock every so many nodes to queries that want to start
and need the masters */
#ifdef _POSIX_PRIORITY_SCHEDULING
- if ((shredCtx->nnode_pre&1023) == 0) {
+ if ((shredCtx->nnode_pre&1023) == 0) do {
MT_lock_unset(shredCtx->coll_lock, "shredder_yield");
sched_yield();
MT_lock_set(shredCtx->coll_lock, "shredder_yield");
- }
+ } while (BUNfnd(shredCtx->lock_barrier, &shredCtx->coll_lock));
#else
- if ((shredCtx->nnode_pre&16383) == 0) {
+ if ((shredCtx->nnode_pre&16383) == 0) do {
MT_lock_unset(shredCtx->coll_lock, "shredder_yield");
MT_sleep_ms(1);
MT_lock_set(shredCtx->coll_lock, "shredder_yield");
- }
+ } while (BUNfnd(shredCtx->lock_barrier, &shredCtx->coll_lock));
#endif
}
@@ -1320,6 +1321,9 @@
if (shredCtx->val) {
GDKfree(shredCtx->val);
}
+ if (shredCtx->lock_barrier) {
+ BBPunfix(shredCtx->lock_barrier->batCacheid);
+ }
attrDB_free(&shredCtx->idrefAttrDB);
GDKfree(shredCtx);
return NULL;
@@ -1362,6 +1366,18 @@
}
shredCtx->incremental = (BATcount(docBAT) > 0);
+ /* as we want to release the lock once in a while, but having the lock
alone does not guarantee
+ * excusive access (anynmore), we now after retaking it we must make sure
the lock-barrier BAT does
+ * not contain entries for our coll_lock
+ */
+ i = BBPindex("shortlock_barrier");
+ shredCtx->lock_barrier = i?BATdescriptor(i):NULL;
+ if (shredCtx->lock_barrier == NULL) {
+ if (i) BBPunfix(i);
+ GDKerror("shredder_create: could not open collection_rdonly.\n");
+ return shredder_free(shredCtx, 0);
+ }
+
/* init the idref attribute database */
attrDB_init(&shredCtx->idrefAttrDB);
-------------------------------------------------------------------------
Take Surveys. Earn Cash. Influence the Future of IT
Join SourceForge.net's Techsay panel and you'll get the chance to share your
opinions on IT & business topics through brief surveys-and earn cash
http://www.techsay.com/default.php?page=join.php&p=sourceforge&CID=DEVDEV
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins