Update of /cvsroot/monetdb/MonetDB5/src/optimizer
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv19114/src/optimizer

Modified Files:
        opt_replicator.mx 
Log Message:
New version of replicator. Instruction threads are stored in a replica table, a 
MalBlk, with or without result. The first execution of a query template 
searches in the table for instructions for reuse and creates a mapping between 
the local and replicated variables. The following executions use the mapping 
directly.


Index: opt_replicator.mx
===================================================================
RCS file: /cvsroot/monetdb/MonetDB5/src/optimizer/opt_replicator.mx,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -d -r1.20 -r1.21
--- opt_replicator.mx   19 Feb 2008 11:48:05 -0000      1.20
+++ opt_replicator.mx   28 Feb 2008 14:14:28 -0000      1.21
@@ -42,42 +42,66 @@
 
 module replicator;
 
-pattern 
replicator.select(b:bat[:any_1,:any_2],value:any_2,sname:str,tname:str,cname:str):bat[:any_1,:any_2]
+
+pattern 
replicator.bind(sname:str,tname:str,cname:str,access:int):bat[:oid,:any_1]
+address REPbind
+comment "The overloaded bind operator for replicated operations";
+
+pattern replicator.select(b:bat[:any_1,:any_2],value:any_2):bat[:any_1,:any_2]
 address REPselect
 comment "The overloaded select operator for replicated operations";
 
-pattern 
replicator.select(b:bat[:any_1,:any_2],low:any_2,hgh:any_2,sname:str,tname:str,cname:str):bat[:any_1,:any_2]
+pattern 
replicator.select(b:bat[:any_1,:any_2],low:any_2,hgh:any_2):bat[:any_1,:any_2]
 address REPselect
 comment "The overloaded select operator for replicated operations";
 
-pattern 
replicator.select(b:bat[:any_1,:any_2],low:any_2,hgh:any_2,li:bit,hi:bit,sname:str,tname:str,cname:str):bat[:any_1,:any_2]
+pattern 
replicator.select(b:bat[:any_1,:any_2],low:any_2,hgh:any_2,li:bit,hi:bit):bat[:any_1,:any_2]
 address REPselect
 comment "The overloaded select operator for replicated operations";
 
-pattern 
replicator.uselect(b:bat[:any_1,:any_2],value:any_2,sname:str,tname:str,cname:str):bat[:any_1,:oid
 ]
+pattern replicator.uselect(b:bat[:any_1,:any_2],value:any_2):bat[:any_1,:oid ]
 address REPuselect
 comment "The overloaded uselect operator for replicated operations";
 
-pattern 
replicator.uselect(b:bat[:any_1,:any_2],low:any_2,hgh:any_2,sname:str,tname:str,cname:str):bat[:any_1,:oid]
+pattern 
replicator.uselect(b:bat[:any_1,:any_2],low:any_2,hgh:any_2):bat[:any_1,:oid]
 address REPuselect
 comment "The overloaded uselect operator for replicated operations";
 
-pattern 
replicator.uselect(b:bat[:any_1,:any_2],low:any_2,high:any_2,li:bit,hi:bit,sname:str,tname:str,cname:str):bat[:any_1,:oid]
 
+pattern 
replicator.uselect(b:bat[:any_1,:any_2],low:any_2,high:any_2,li:bit,hi:bit):bat[:any_1,:oid]
 
 address REPuselect
 comment "The overloaded uselect operator for replicated operations";
 
-command replicator.init():void
-address REPinit
-comment "Initialize replica table";
+pattern replicator.markT(b:bat[:any_1,:any_2],base:oid):bat[:any_1,:oid] 
+address REPmarkT
+comment "The overloaded markT operator for intermediates reuse";
+
+pattern replicator.reverse(b:bat[:any_1,:any_2]):bat[:any_2,:any_1]
+address REPreverse
+comment "The overloaded reverse operator for intermediates reuse";
+
+pattern 
replicator.join(left:bat[:any_1,:any_2],right:bat[:any_2,:any_3]):bat[:any_1,:any_3]
+address REPjoin
+comment "The overloaded join operator for intermediates reuse";
+
+pattern 
replicator.semijoin(left:bat[:any_1,:any_2],right:bat[:any_1,:any]):bat[:any_1,:any_2]
 
+address REPsemijoin
+comment "The overloaded semijoin operator for intermediates reuse";
+
+
+pattern replicator.start():void
+address REPstart
+comment "Initialize replica record of executed MAL block";
+
+command replicator.stop():void
+address REPstop
+comment "Cleans replica bookkeeping";
+
[EMAIL PROTECTED]
 
 command replicator.dump():void
 address REPdump
 comment "Dump summary of replica table for potential re-use benefits";
 
-command replicator.stat():void
-address REPstat
-comment "Estimate of potential replica benefit";
-
 command replicator.setRetainPolicy(p:sht):void
 address REPsetRetain
 comment "Set replica retainment policy";
@@ -91,7 +115,6 @@
 comment "Set replica cache policy";
 
 
-
 @h
 #ifndef _OPT_REPLICATOR_
 #define _OPT_REPLICATOR_
@@ -99,38 +122,57 @@
 #include "opt_support.h"
 #include "mal_interpreter.h"
 
+
+opt_export str REPbind(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 opt_export str REPselect(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 opt_export str REPuselect(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
-opt_export str REPinit(void);
+opt_export str REPmarkT(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
+opt_export str REPreverse(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
+opt_export str REPjoin(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
+opt_export str REPsemijoin(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 opt_export str REPdump(int *ret);
-opt_export str REPstat(int *ret);
+opt_export str REPstart(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
+opt_export str REPstop(void);
 opt_export str REPsetRetain(int *ret, sht *p);
 opt_export str REPsetReuse(int *ret, sht *p);
 opt_export str REPsetCache(int *ret, sht *p);
 
+typedef str (*sqlbind_fptr)(int *bid, str *sname, str *tname, str *cname, int 
*access);
+typedef struct _sql_functions {
+       sqlbind_fptr sqlbind;
+} sql_functions;
+
+opt_export sql_functions sqlf;
+
 /* #define DEBUG_OPT_REPLICATOR  */
 @-
 @c
 #include "mal_config.h"
 #include "opt_replicator.h"
+#define getVarVal(M,i) VALget(&(M)->var[i]->value)
+#define isResStored(M,i) isVarStored(M, getArg(M->stmt[i],0))
 
+/* repl. bookkeeping for current query execution */
 typedef struct REPLICAREC {
-       str sname, tname, cname;        /* source BAT schema, table, column */
-        bat resbid;            /* intermediate result BAT */
-       int tpe;                
-       bit tail;               /* does result include tail */
+       int ver;                /* version of replica table */
+       int * vmap;             /* map of exec. variables to replica name space 
*/
+ } *Replica, ReplicaRec;
 
-        ValRecord low,hgh;     /* selected tail range */
-        bit li, hi;            /* inclusive bits */
-        size_t cnt;            /* how many tuples */
-       dbl cost;               /* how much time was spent to produce */
-       int ucnteq,ucntsub;     /* usage counts equality and subset */
-       MalBlkPtr malblk;       /* mal instruction block*/
-} *Replica, ReplicaRec;
+typedef struct STAT {
+       lng ticks;              /* micro seconds spent */
+       lng reuse;              /* number of uses */
+       lng cnt;                /* result size in tuples */
+} *StatPtr, StatRec;
 
-static Replica repltable = NULL;
-static int replsz = 0;  /* alloc. mem */
-static int replnr = 0; /* number of replicas */
+
+sql_functions sqlf;
+
+static MalBlkPtr repl = NULL;
+static int replver = 0;  /* version of replica table */
+static int replref = 0;  /* queries currently using repl */
+static StatPtr replstat = NULL;  /* statistics for stored intermediates */
+
+/*static int replnr = 0;  number of replicas */
 static sht retain = 0;  /* replica retainment policy
                        0: baseline, keeps stat, no retain, no reuse
                        1: infinite case, retain all
@@ -145,71 +187,79 @@
                        2: cost-based, throw least beneficial */
 
 
-static void assureSpace(){
-       if ( replsz == 0){
-               replsz = 16;
-               repltable = (ReplicaRec*)GDKzalloc((replsz)*sizeof(ReplicaRec));
-       }
-       else if( replnr == replsz){
-               replsz <<=1;
-               repltable = (ReplicaRec*) GDKrealloc((void*)repltable,
-                                sizeof(ReplicaRec) * replsz);
-       }
+static void assureSpace()
+{
+       StatPtr olds;
+       int sz;
+       if ( repl == NULL){
+               repl = newMalBlk(1000, 1000);
+               initProfiler(repl);
+               replstat = (StatPtr)GDKzalloc(sizeof(StatRec)* 1000);
+       } else          /* sync the repl stat */
+               if(repl->vtop + 1>= repl->vsize){
+                       olds = replstat;
+                       sz = repl->vtop + MAXVARS;
+                       replstat = (StatPtr)GDKzalloc(sizeof(StatRec)* sz);
+                       memcpy(replstat,olds,repl->vtop * sizeof(StatRec));
+                       GDKfree(olds);
+               }
 }
 
-str
-REPinit(){
-       str msg= MAL_SUCCEED;
-
-        if ( repltable != NULL){
-               GDKfree(repltable);
-                replsz = replnr = 0;
-        }
-       assureSpace();
-       return msg;
+int* getMap(MalStkPtr s)
+{
+       if( s->blk->replica )
+               return (((ReplicaRec*)s->blk->replica)->vmap);
+       else return NULL;
 }
 
-static void printReplica(stream *f, int i){
+str
+REPstart(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       Replica r;
+       int i, vc;
 
-       Replica r = repltable+i;
-       if (r==NULL) GDKerror("Empty replica element");
-       stream_printf(f,"%s\t%s\t%s\t", r->sname, r->tname, r->cname);
-       VALprint(f,&r->low);
-       stream_printf(f,":"); VALprint(f,&r->hgh);
-       stream_printf(f,"\t[%d:%d]\t", r->li,r->hi);
-       stream_printf(f,"%d\t%8.2f ms\t", r->cnt,r->cost/1000.0);
-       stream_printf(f,"%d:%d\n", r->ucnteq,r->ucntsub);
+       (void) mb;
+       (void) pci;
+       vc = stk->blk->vtop;
+       r = (Replica) stk->blk->replica;
+       if( r == NULL) {        /* first execution */
+               r = (Replica) GDKzalloc(sizeof(ReplicaRec));
+               r->ver = replver;
+               r->vmap = (int*)GDKmalloc(sizeof(int)* vc);
+               for(i=0; i<vc; i++)
+                       r->vmap[i] = -1;
+               stk->blk->replica = (ptr) r;
+       } else
+       if( r->ver != replver) { /* replicaRec based on old repl */       
+               GDKfree(r->vmap);       /* reinit var. map */
+               r->ver = replver;
+               r->vmap = (int*)GDKmalloc(sizeof(int)* vc);
+               for(i=0; i<vc; i++)
+                       r->vmap[i] = -1;
+       }
+       replref++;
+       return MAL_SUCCEED;
 }
 
-str
-REPdump(int *ret)
+
+str REPstop(void)
 {
-       int i;
-       (void) ret;
-       stream_printf(GDKout,"Number of replicas %d\n",replnr);
-       stream_printf(GDKout,"Schema\tTable\t\tColumn\tLow : High\t[LI : 
HI]\tCnt\tCost\tEq : Sub\n");
-       for( i=0; i<replnr; i++)
-               printReplica(GDKout, i);
+       replref--;
        return MAL_SUCCEED;
 }
 
+
 str
-REPstat(int *ret)
+REPdump(int *ret)
 {
-       int i;
-       dbl ben = 0;
        (void) ret;
-       stream_printf(GDKout,"Number of replicas %d\n",replnr);
-       for( i=0; i<replnr; i++){
-               if( repltable[i].ucnteq>0 )
-                       ben+= repltable[i].ucnteq * repltable[i].cost;
-               if( repltable[i].ucntsub>0 ) /* approx benefit of subset reuse 
*/
-                       ben+= repltable[i].ucnteq * repltable[i].cost * 0.5;
-       }
-       stream_printf(GDKout,"Potential benefit from %d replicas %10.2f 
ms\n",replnr,ben/1000.0);
+/*     stream_printf(GDKout,"Number of replicas %d\n",replnr);*/
+       printFunction(GDKout,repl, LIST_MAL_ALL);
+
        return MAL_SUCCEED;
 }
 
+
 str
 REPsetRetain(int *ret, sht *p)
 {
@@ -237,22 +287,17 @@
 static int
 OPTreplicatorImplementation(MalBlkPtr mb, MalStkPtr stk, InstrPtr p)
 {
-       int i, actions=0, si;
-       InstrPtr *old;
-       int limit,slimit;
-       sht v,v1;
-       sht *ps, *pt, *pc ;
-       ValPtr sv;
-
+       int i, actions=0;
+       InstrPtr *old, q;
+       int limit,slimit, no_pr, rprof = 0, rs =0;
+       sht *rep ;
 
        (void) stk;
-       ps= (sht*) alloca(sizeof(sht)* mb->vtop); /* bind schema */
-       memset((char*) ps, 0, sizeof(sht)* mb->vtop);
-       pt= (sht*) alloca(sizeof(sht)* mb->vtop); /* bind table */
-       memset((char*) pt, 0, sizeof(sht)* mb->vtop);
-       pc= (sht*) alloca(sizeof(sht)* mb->vtop); /* bind column */
-       memset((char*) pc, 0, sizeof(sht)* mb->vtop);
+                /* vars bound to replicator commands */
+       rep = (sht*) alloca(sizeof(sht)* mb->vtop);
+       memset((char*) rep, 0, sizeof(sht)* mb->vtop);
 
+       no_pr = (mb->profiler == NULL? 1:0);
        old= mb->stmt;
        limit= mb->stop;
        slimit= mb->ssize;
@@ -261,29 +306,67 @@
        for (i = 0; i<limit; i++){
                p= old[i];
                if( (getModuleId(p)== sqlRef && getFunctionId(p)==bindRef) ){
-                       v = getArg(p,0); /* bound var */
-                       ps[v]= getArg(p,1);
-                       pt[v]= getArg(p,2);
-                       pc[v]= getArg(p,3); 
-               }
+                       if(no_pr && rprof ==0){
+                               q = newStmt(mb,"profile","start");      
+                               rprof = 1;
+                       }
+                       if(rs == 0){
+                               q = newStmt(mb,"replicator","start");   
+                               rs = 1;
+                       }
+                       setModuleId(p,replicatorRef);
+                       pushInstruction(mb,p);
+                       rep[getArg(p,0)] = 1;
+               } else
                if( getModuleId(p)== algebraRef && 
                        ( getFunctionId(p) == uselectRef ||
-                         getFunctionId(p) == selectRef
-                       ) && pc[getArg(p,1)]){
-                               v1 = getArg(p,1);
-                               sv = VALnew();
-                                VALcopy(sv,&getVar(mb,ps[v1])->value);
-                                si = defConstant(mb, TYPE_str, sv);
-                               p= pushArgument(mb,p,si);
-
-                               p= pushArgument(mb,p,pt[v1]);
-                                p= pushArgument(mb,p,pc[v1]);
-
- 
+                         getFunctionId(p) == selectRef) &&
+                        rep[getArg(p,1)]){
+                               setModuleId(p,replicatorRef);
+                               pushInstruction(mb,p);
+                               rep[getArg(p,0)] = 1;
+               } else
+               if( getModuleId(p)== algebraRef && 
+                       getFunctionId(p) == markTRef &&
+                       rep[getArg(p,1)] &&
+                       isVarConstant(mb,getArg(p,2))){
+                               setModuleId(p,replicatorRef);
+                               pushInstruction(mb,p);
+                               rep[getArg(p,0)] = 1;
+               } else
+               if( getModuleId(p)== batRef && 
+                       getFunctionId(p) == reverseRef &&
+                       rep[getArg(p,1)]){
                                setModuleId(p,replicatorRef);
                                pushInstruction(mb,p);
+                               rep[getArg(p,0)] = 1;
+               } else
+               if( getModuleId(p)== algebraRef && 
+                       getFunctionId(p) == semijoinRef &&
+                       rep[getArg(p,1)] &&
+                       rep[getArg(p,2)]){
+                               setModuleId(p,replicatorRef);
+                               pushInstruction(mb,p);
+                               rep[getArg(p,0)] = 1;
                } else
+               if( getModuleId(p)== algebraRef && 
+                       getFunctionId(p) == joinRef &&
+                       rep[getArg(p,1)] &&
+                       rep[getArg(p,2)]){
+                               setModuleId(p,replicatorRef);
+                               pushInstruction(mb,p);
+                               rep[getArg(p,0)] = 1;
+               } else {        
+                       if(no_pr && rprof == 1 &&
+                               getModuleId(p)== sqlRef && 
+                               getFunctionId(p)==resultSetRef){
+                                       q = newStmt(mb,"replicator","stop");    
+                                       rs = 0;
+                                       q = newStmt(mb,"profiler","stop");      
+                                       rprof = 0;
+                       }
                        pushInstruction(mb,p);
+               }
        }
        for(; i<slimit; i++)
        if(old[i])
@@ -302,200 +385,206 @@
 @:wrapOptimizer(replicator,OPT_CHECK_ALL)@
 
 
-static int findReplica( str sname, str tname, str cname, int tpe, bit tail,
-       ptr low, ptr hgh, bit li, bit hi, sht * ov)
-{
-       Replica r;
-       int (*cmp) (ptr, ptr);
-       ptr nilptr, plow, phgh;
-       int i, minr = -1;
-       size_t minc = (size_t) GDK_lng_max;
-       bit leq, lsub, heq, hsub;
-       int *rs, ri=0; /* subset replicas */
-
-       (void) reuse;
-       (void) rcache;
-       rs = alloca(replnr * sizeof(*rs)); /* was: int rs[replnr]; */
-       cmp= BATatoms[tpe].atomCmp;
-       nilptr = ATOMnilptr(tpe);
[EMAIL PROTECTED]
+The overloaded algebra operator simply calls the
+underlying implementation and collects statistics on the
+cost.
[EMAIL PROTECTED]
 
+static int newReplica(MalStkPtr s, InstrPtr p, bit keep)
+{
+       int i, j, k, c, bid, r; 
+       ValPtr v, v1;
+       InstrPtr p1;
+       
+       assureSpace();
+       p1 = copyInstruction(p);
+       for(i = 0; i< p->argc; i++){
+               j = p->argv[i];
+               v = &s->stk[j];
+               c = fndConstant(repl, v);
+               if ( c < 0 ){ 
+                       v1 = VALnew();
+                       VALcopy(v1,v);
+                       c = defConstant(repl, v1->vtype, v1);
+               }
+               setArg(p1,i,c);
+       
+               if ((i > 0) ||
+                   ((i == 0) && keep)){
+                       setVarStored(repl,c); 
+                       if (v->vtype == TYPE_bat){
+                               bid = *(int*)VALget(v);
+                               BBPincref(bid,TRUE);
+                       }
+               } else clrVarStored(repl,c); 
+       }
+       k = repl->stop;
+       pushInstruction(repl,p1);
+       r = getDestVar(p1);
+       if (s->blk->profiler)
+               replstat[r].ticks = s->blk->profiler[k].ticks;
+       return k;
+}
 
-       for (i=0; i<replnr; i++){
-               r = repltable + i;
-                       /* check compatibility */
-               if( (strcmp(cname,r->cname)!=0) ||
-                       (strcmp(tname,r->tname)!=0) ||
-                       (strcmp(sname,r->sname)!=0) )
-                       continue;
-               if (tail && !r->tail) continue;
-               if (tpe != r->tpe) continue;
+/* compare values in ValRecords, return 0 on equal */
+int
+VALcmp(ValPtr p, ValPtr q)
+{
 
-                       /* check overlap */
-               leq = lsub = heq = hsub = FALSE;
-               plow = VALget(&r->low);
-               phgh = VALget(&r->hgh);
-               if( ((*cmp)(low, plow) == 0) && (li == r->li) ) leq = TRUE;
-               else if( ((*cmp)(plow, nilptr)==0) ||
-                        ((*cmp)(low, plow) > 0) ||
-                        (((*cmp)(low, plow) == 0) && !li && r->li) ) lsub = 
TRUE;
+       int (*cmp) (ptr, ptr);
+       int tpe;
+       ptr nilptr, pp, pq;
 
-               if( ((*cmp)(hgh, phgh) == 0) && (hi == r->hi) ) heq = TRUE;
-               else if( ((*cmp)(phgh, nilptr)==0) ||
-                        ((*cmp)(hgh, phgh) < 0) ||
-                        (((*cmp)(hgh, phgh) == 0) && !hi && r->hi) ) hsub = 
TRUE;
+       if( p ==0 || q == 0 ) return  -1;
+       if( (tpe = p ->vtype) != q->vtype ) return  -1;
 
-               if (leq && heq) { /* equal replica exists */
-                       *ov = 0;
-                       return i;
-               } else
-               if ( (leq || lsub ) && (heq || hsub) ){ /* query is a subset of 
the replica */
-                       rs[ri] = i; ri++;
-               }
-       }
+       cmp = BATatoms[tpe].atomCmp;
+       nilptr = ATOMnilptr(tpe);
+       pp = VALget(p);
+       pq = VALget(q);
+       if(((*cmp)(pp, nilptr)==0) && ((*cmp)(pq, nilptr)==0)) return 0; /* eq 
nil val */
+       if(((*cmp)(pp, nilptr)==0) || ((*cmp)(pq, nilptr)==0)) return -1;
+       return ((*cmp)(pp, pq));
 
+}
 
-       if( ri ==0){ /* no replica overlaps with query */
-               *ov = -1;
-               return -1;
-       } else {        /* search min replica overlaping with query */
-               for( i=0; i<ri; i++){
-                       r = repltable + rs[i];
-                       if( r->cnt < minc){
-                               minc = r->cnt;
-                               minr = rs[i];
+/* check for replica of instruction p in repl table */
+int
+findReplica(MalStkPtr s,InstrPtr p)
+{
+       int i, j, idx, dif = 0;
+       int *vmap;
+       InstrPtr *st, q;
+       ValPtr pa, qa;
+       
+       if (repl == NULL) return -1;
+       vmap = getMap(s);
+       if( vmap == NULL)
+               GDKerror("Variable map not initialized\n");
+       st = repl->stmt;
+       for (i = 0; i < repl->stop; i++){
+               q = st[i];
+               if((p->argc != q->argc) ||
+                  (getFunctionId(p) != getFunctionId(q)))
+                       continue;
+               else{   
+                       dif = 0;
+                       for (j = 1; j < p->argc; j++){
+                               idx = p->argv[j];
+                               if( vmap[idx] >=0 ){ /* symbol comp */
+                                       if( vmap[idx] != getArg(q,j)){
+                                               dif = 1;
+                                               break;
+                                       }
+                               } else {        /* value comp */                
+                                       pa = &s->stk[idx];
+                                       qa = &getVar(repl,getArg(q,j))->value;
+                                       if( VALcmp(pa,qa) ){
+                                               dif = 1;
+                                               break;
+                                       }
+                               }
                        }
+                       if( dif == 0 ) return i;
                }
-               *ov = 1;
-               return minr;
        }
+       
+       return -1;
 
 }
 
-/* check if replica should be retained */
-static int checkRetain(lng cnt, lng clk)
+str
+REPbind(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
-       if( retain <= 1 ) return 1;     /* 0 keep stat, 1 retain all */
-       if( retain == 2 ) {             /* cost-based */
-               if( clk > 1000) return 1;       /* at least 1 msec spent */
-               else return 0;
-       }
-       if( retain == 3 ) {             /* cost-based per item*/
-               if( cnt && (clk/cnt > 1)) return 1;     /* at least 1 usec 
spent per tuple*/
-               else return 0;
-       }
-       else return 0;                  
-}
-
-
[EMAIL PROTECTED]
-The overloaded algebra operator simply calls the
-underlying implementation and collects statistics on the
-cost.
[EMAIL PROTECTED]
+       int *ret, *acc, ri, rv, *vmap;
+       str msg;
+       str *sname, *tname, *cname;
 
-static Replica newReplica(str sname, str tname, str cname, int tpe, bit tl)
-{
-       Replica r;
-       
-       assureSpace();
-       r = repltable + replnr++;
-       r->sname = GDKstrdup(sname);
-       r->tname = GDKstrdup(tname);
-       r->cname = GDKstrdup(cname);
-       r->tail = tl;
-       r->tpe =tpe;
-       return r;
-}
+       (void) mb;
+       ret= (int*) getArgReference(stk, pci,0);
+       sname = (str*) getArgReference(stk, pci,1);
+       tname = (str*) getArgReference(stk, pci,2);
+       cname = (str*) getArgReference(stk, pci,3);
+       acc = (int*) getArgReference(stk, pci,4);
 
-static void setReplicaRange(Replica r, int tpe, ptr low, ptr hgh, bit li, bit 
hi)
-{
-       if(r != NULL){
-               if( tpe < TYPE_str){
-                       VALset(&r->low,tpe, low);
-                       VALset(&r->hgh,tpe, hgh);
-               } else
-               if( tpe == TYPE_str){ 
-                       VALset(&r->low,tpe, GDKstrdup(low));
-                       VALset(&r->hgh,tpe, GDKstrdup(hgh));
-               } 
-               r->li = li;
-               r->hi = hi;
+       if( sqlf.sqlbind == NULL)
+               throw(MAL, "REPbind", "sqlbind not set");
+               /* always execute cheap bind */
+       msg = sqlf.sqlbind(ret, sname, tname, cname, acc);
+               /*  res var mapping */
+       rv = getArg(pci,0); 
+       vmap = getMap(stk);
+       if( vmap == NULL)
+               throw(MAL, "REPbind", "Variable map not initialized");
+       if( vmap[rv] < 0 ){ /*res var not mapped*/
+               ri = findReplica(stk,pci); /* replica instr */
+               if( ri < 0 ) 
+                       ri = newReplica(stk,pci,FALSE);
+               vmap[rv] = getArg(repl->stmt[ri],0);
        }
-} 
 
+       return msg;
+}
 
 static 
 str REPselectImpl(int *ret, BAT *b, ptr low, ptr hgh, bit li, bit hi, 
-               str sname, str tname, str cname, bit tl)
+                bit tl, MalStkPtr s, InstrPtr p)
 {
-       int tpe, rr;
-       BAT *bn, *br;
-       lng cnt = -1, clk = 0;
-       Replica r;
-       sht ov;
-
-       tpe= ATOMstorage(b->T->type);
-
-       /* replica match */
-        rr = findReplica(sname,tname,cname,tpe,tl, low, hgh, li, hi, &ov);
-
-       /* execute selection if needed */
-       if((retain == 0) ||     /* normal execution + keep stat */
-          ((retain > 0) && (ov < 0))){ /* no overlap in exist. repl. */
-               clk = GDKusec();
-               bn = BAT_select_(b, low, hgh, li, hi, tl, FALSE);
-               clk = GDKusec()-clk;
-
-                if (bn) {
-                       if (!(bn->batDirty&2)) bn = BATsetaccess(bn, BAT_READ);
-                       *ret = bn->batCacheid;
-                       cnt= BATcount(bn);
-                       BBPkeepref(*ret);
-               }
-               else throw(MAL, "REPselectImpl", "GDKerror");
-
-       } else 
-       if ((retain > 0) && (ov > 0)){  /* exec. over subset repl. */ 
-                r = repltable + rr;
-                       r->ucntsub++;
+       BAT *bn;
+       lng cnt = -1;
+       int ri, rr, rv;
+       int * vmap;
 
-                       if ((br = BATdescriptor(r->resbid)) == NULL) {
-                               throw(MAL, "REPselectImpl", "Cannot access 
replica descriptor");
-                }
-                       clk= GDKusec();
-                       bn = BAT_select_(br, low, hgh, li, hi, tl, FALSE);
-                clk= GDKusec()-clk;
+       rv = getArg(p,0); 
+       vmap = getMap(s);
+       if( vmap == NULL)
+               throw(MAL, "REPbind", "Variable map not initialized");
 
-                       BBPreleaseref(br->batCacheid);
-               if (bn) {
-                               if (!(bn->batDirty&2)) bn = BATsetaccess(bn, 
BAT_READ);
-                        *ret = bn->batCacheid;
-                       cnt = BATcount(bn);
-                               BBPkeepref(*ret);
-                       }
-                else throw(MAL, "REPselectImpl", "GDKerror");
+       if( vmap[rv] < 0 ){             /*res var not mapped*/
+               ri = findReplica(s,p);  /* replica instr */
+               if(ri >= 0 && isResStored(repl,ri)){            /*reuse */ 
+                       rr = getArg(repl->stmt[ri],0);
+                       *ret = *(int *)getVarVal(repl,rr);
+                       BBPincref(*ret,TRUE);
+                       replstat[rr].reuse++;  
+               } else { /* compute */
+                       bn = BAT_select_(b, low, hgh, li, hi, tl, FALSE);
+                       if (bn) {
+                                       if (!(bn->batDirty&2)) bn = 
BATsetaccess(bn, BAT_READ);
+                               *ret = bn->batCacheid;
+                               cnt= BATcount(bn);
+                               BBPkeepref(*ret);
+                       }
+                               else throw(MAL, "REPselectImpl", "GDKerror");
+                       if( ri < 0 ){   /* add replica item */
+                               ri = newReplica(s,p,TRUE);
+                               rr = getArg(repl->stmt[ri],0);
+                               replstat[rr].cnt = cnt;
+                       }
+                       else rr = getArg(repl->stmt[ri],0);
+               } 
+                       /* set mapping after first exec */
+               vmap[rv] = rr;
        }
-
-       /* update replica table */
-       if(ov == 0 ){           /* equality, reuse replica rr*/
-               r = repltable + rr;     /* no new intermediate */
-               r->ucnteq++;
-               if( retain > 0 ){  
-                       *ret = r->resbid;
-                       BBPincref(*ret,TRUE);
-               }
-       } else                  /* new intermed. -> ev. new entry */
-       if( checkRetain(cnt,clk) ){             
-               r = newReplica(sname,tname,cname,tpe,tl);
-               setReplicaRange(r,tpe,low,hgh,li,hi);
-               r->cnt = cnt;
-               r->cost = clk;
-               r->ucnteq = r->ucntsub = 0;
-               if( retain > 0){
-                       r->resbid = *ret;
+       else {          /* check and reuse mapped var*/
+               rr = vmap[rv];
+               if( isVarStored(repl,rr) ){             /*reuse */ 
+                       *ret = *(int *)getVarVal(repl,rr);
                        BBPincref(*ret,TRUE);
+                       replstat[rr].reuse++;  
+               }
+               else {  /* recompute */
+                       bn = BAT_select_(b, low, hgh, li, hi, tl, FALSE);
+                       if (bn) {
+                                       if (!(bn->batDirty&2)) bn = 
BATsetaccess(bn, BAT_READ);
+                               *ret = bn->batCacheid;
+                               BBPkeepref(*ret);
+                       }
+                               else throw(MAL, "REPselectImpl", "GDKerror");
                }
        }
+
        return MAL_SUCCEED;
 }
 
@@ -507,17 +596,16 @@
        bit t=TRUE,*li=&t,*hi=&t;
        BAT *b;
        sht argcnt = pci->argc;
-       str sname, tname, cname;
 
        (void) mb;
        ret= (int*) getArgReference(stk, pci,0);
        bid= (int*) getArgReference(stk, pci,1);
        low= (ptr) getArgReference(stk, pci,2);
-       if( argcnt >= 7)
+       if( argcnt >= 4)
                 hgh = (ptr) getArgReference(stk,pci,3);
         else hgh = low;
 
-       if( argcnt==9){
+       if( argcnt > 4){
                li= (bit*) getArgReference(stk, pci,4);
                hi= (bit*) getArgReference(stk, pci,5);
        }
@@ -534,11 +622,8 @@
                         else hgh = *(str *)hgh;
         }
 
-       sname = *(str *) getArgReference(stk, pci,argcnt-3);
-       tname = *(str *) getArgReference(stk, pci,argcnt-2);
-       cname = *(str *) getArgReference(stk, pci,argcnt-1);
-       
-       REPselectImpl(ret, b, low, hgh, *li, *hi, sname,tname,cname, uselect ? 
FALSE : TRUE);
+
+       REPselectImpl(ret, b, low, hgh, *li, *hi, uselect ? FALSE : TRUE, stk, 
pci);
        BBPreleaseref(b->batCacheid);
        return MAL_SUCCEED;
 }
@@ -556,4 +641,128 @@
        return REPselect_wrap(mb, stk, pci, 1);
 }
 
+
+
+
+
+str
+REPmarkT(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       int *ret, *bid, ri, rv, rr, *vmap;
+       oid *base;
+       str msg = MAL_SUCCEED;
+
+       (void) mb;
+       ret= (int*) getArgReference(stk, pci,0);
+       bid= (int*) getArgReference(stk, pci,1);
+       base= (oid*) getArgReference(stk, pci,2);
+
+       rv = getArg(pci,0); 
+       vmap = getMap(stk);
+       if( vmap == NULL)
+               throw(MAL, "REPbind", "Variable map not initialized");
+       if( vmap[rv] < 0 ){ /*res var not mapped*/
+               ri = findReplica(stk,pci); /* replica instr */
+               if( ri < 0 ){
+                       msg = ALGtmark(ret, bid, base); 
+                       ri = newReplica(stk,pci,FALSE);
+               } else {
+                       rr = getArg(repl->stmt[ri],0);
+                       if( isVarStored(repl,rr) )
+                               msg = ALGtmark(ret, bid, base);
+                       else {                  /*reuse */ 
+                               *ret = *(int *)getVarVal(repl,rr);
+                               BBPincref(*ret,TRUE);
+                               replstat[rr].reuse++; 
+                       } 
+               }       
+               vmap[rv] = getArg(repl->stmt[ri],0); /*1st exec always maps */
+       } else{
+               rr = vmap[rv];
+               if( isVarStored(repl,rr) ){             /*reuse */ 
+                       *ret = *(int *)getVarVal(repl,rr);
+                       BBPincref(*ret,TRUE);
+                       replstat[rr].reuse++;  
+               }
+               else    /* recompute */
+                       msg = ALGtmark(ret, bid, base);
+       }
+
+       return msg;
+}
+
+str
+REPreverse(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+
+       int *ret, *bid;
+       lng clk = 0;
+       BAT *b, *bn = NULL;
+
+       (void) mb;
+       ret= (int*) getArgReference(stk, pci,0);
+       bid= (int*) getArgReference(stk, pci,1);
+
+        if ((b = BATdescriptor(*bid)) == NULL) {
+                throw(MAL, "bat.reverse", "Cannot access descriptor");
+        }
+
+       clk = GDKusec();
+       bn = BATmirror(b);
+       clk = GDKusec()-clk;
+
+        if (bn) {
+                *ret = bn->batCacheid;
+                BBPkeepref(bn->batCacheid);
+        } else
+               throw(MAL, "bat.reverse", "GDKerror");
+
+       newReplica(stk,pci,FALSE);
+       return MAL_SUCCEED;
+
+}
+
+str
+REPjoin(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+
+       int *ret, *lid, *rid;
+       lng clk = 0;
+       str msg;
+
+       (void) mb;
+       ret = (int*) getArgReference(stk, pci,0);
+       lid = (int*) getArgReference(stk, pci,1);
+       rid = (int*) getArgReference(stk, pci,2);
+       
+       clk = GDKusec();
+       msg = ALGjoin(ret, lid, rid);
+       clk = GDKusec()-clk;
+
+       newReplica(stk,pci,TRUE);
+       return msg;
+}
+
+str
+REPsemijoin(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+
+       int *ret, *lid, *rid;
+       lng clk = 0;
+       str msg;
+
+       (void) mb;
+       ret = (int*) getArgReference(stk, pci,0);
+       lid = (int*) getArgReference(stk, pci,1);
+       rid = (int*) getArgReference(stk, pci,2);
+       
+       clk = GDKusec();
+       msg = ALGsemijoin(ret, lid, rid);
+       clk = GDKusec()-clk;
+
+       newReplica(stk,pci,TRUE);
+       return msg;
+}
+
+
 @}


-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Monetdb-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-checkins

Reply via email to