It was this check in who broke the compilation in SQL.
Martin forgot to remove the calls for
OCTOPUSdrop
From sql/src/backend/monet5/sql.mx
Romulo
Martin Kersten wrote:
> Update of /cvsroot/monetdb/MonetDB5/src/scheduler
> In directory 23jxhf1.ch3.sourceforge.com:/tmp/cvs-serv21275
>
> Modified Files:
> run_octopus.mx
> Log Message:
> A new round for the octopus scheduler. Code still in testing phase.
>
>
> U run_octopus.mx
> Index: run_octopus.mx
> ===================================================================
> RCS file: /cvsroot/monetdb/MonetDB5/src/scheduler/run_octopus.mx,v
> retrieving revision 1.17
> retrieving revision 1.18
> diff -u -d -r1.17 -r1.18
> --- run_octopus.mx 29 Jan 2009 18:17:10 -0000 1.17
> +++ run_octopus.mx 13 Apr 2009 14:23:01 -0000 1.18
> @@ -30,17 +30,17 @@
> re-directing requests to multiple sites. If there are no sites known,
> then the code is executed linearly as is.
>
> -The scheduler runs all tentacles asynchronously.
> +The scheduler runs all tentacles asynchronously if possible.
> To make our live easier, we assume that all tentacles are
> grouped together in a guarded block as follows:
>
> @verbatim
> -barrier (parallel,a):= scheduler.octopus(timeout);
> -a:= octopus.tentacle_1();
> +barrier (parallel,version):= scheduler.octopus(timeout);
> +a:= octopus.tentacle_1(sitename,fcnname,version);
> ...
> -b:= octopus.tentacle_n();
> -a:= mat.pack(a,...,b);
> -exit (parallel,a);
> +b:= octopus.tentacle_n(sitename,fcnname,version);
> +exit (parallel,version);
> +z:= mat.pack(a,...,b);
> @end verbatim
>
> This way the MAL flow of control simplifies skipping to the end
> @@ -50,27 +50,17 @@
> Allowing MAL instructions inbetween complicates our work,
> because it would mean that we have to do a flow analysis.
>
> -To make this work the scheduler needs a list of database worker.
> -For the time being, this is an explicitly administered list here.
> -When the octopus scheduling is called, we check the connection with
> -the remote site. If it is down, it is re-activated using Merovingian.
> -
> +To make this work the scheduler needs a list of databases to play with.
> +For the time being this consists of all the database known
> +and ending with the phrase 'sea'.
> +This list is obtained through the remote module using the
> +support of Merovingian. The default is to use the local
> +database as a target.
> @{
> @mal
> -pattern scheduler.octopus(timeout:int)(:bit, :bat[:any_1,:any_2])
> +pattern scheduler.octopus(t:int)(:bit,version:int)
> address OCTOPUSrun
> -comment "Run the program block in parallel, but don't wait longer then t
> seconds";
> -
> -pattern scheduler.worker(dbnme:str, usr:str, pw:str)
> -address OCTOPUSworker
> -comment "Add a new worker to the known list ";
> -pattern scheduler.worker(dbnme:str, usr:str, pw:str, host:str, port:int)
> -address OCTOPUSworker
> -comment "Add a worker site to the known list ";
> -
> -pattern scheduler.drop(dbnme:str)
> -address OCTOPUSdrop
> -comment "Remove a worker from the list";
> +comment "Run the program block in parallel, but don't wait longer then t
> seconds. Also fix a consistent database version.";
> @h
> #ifndef _RUN_OCTOPUS
> #define _RUN_OCTOPUS
> @@ -78,7 +68,7 @@
> #include "mal_instruction.h"
> #include "mal_client.h"
>
> -/*#define DEBUG_RUN_OCTOPUS to trace processing */
> +#define DEBUG_RUN_OCTOPUS /* to trace processing */
>
> #ifdef WIN32
> #ifndef LIBRUN_OCTOPUS
> @@ -91,8 +81,6 @@
> #endif
>
> octopus_export str OCTOPUSrun(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
> InstrPtr p);
> -octopus_export str OCTOPUSworker(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
> InstrPtr p);
> -octopus_export str OCTOPUSdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
> InstrPtr p);
> #endif /* MAL_RUN_OCTOPUS */
>
> @+ Octopus scheduling implementation
> @@ -108,155 +96,156 @@
> #include "remote.h"
> #include "alarm.h"
>
> -#define SITEasleep 0
> -#define SITElocal 1
> -#define SITEremote 2
> +typedef struct REGMAL{
> + str fcn;
> + struct REGMAL *nxt;
> +} *Registry;
>
> typedef struct {
> - str alias;
> - str db; /* connection parameters */
> + str name;
> str usr;
> - str pw;
> - str host; /* used when merovigian is not running */
> - int port;
> - int status; /* asleep, local, remote */
> -} Site;
> + str pwd;
> + Registry nxt; /* list of registered queries */
> +} Sea;
>
> #define MAXSITES 2048 /* should become dynamic at some point */
> -static Site *sites;
> -static int nrsites = 0;
> +static Sea sea[MAXSITES];
> +static int nrsea = 0;
>
> static str
> OCTOPUSdiscover(Client cntxt){
> bat b1 = 0, b2 = 0;
> - BAT *b;
> + BAT *l1, *l2;
> + BUN p,q;
> str msg = MAL_SUCCEED;
> + BATiter bi;
>
> - (void) cntxt;
> - (void) b2;
> + sea[nrsea].usr = GDKstrdup("monetdb");
> + sea[nrsea].pwd = GDKstrdup("monetdb");
> + sea[nrsea++].name= GDKstrdup(GDKgetenv("gdk_dbname"));
> /* determine if sites are reachable */
> msg = RMTgetList(&b1,&b2);
> if ( msg != MAL_SUCCEED)
> return msg;
> - b = BATdescriptor(b1);
> - if ( b == NULL)
> + l1 = BATdescriptor(b1);
> + if ( l1 == NULL)
> throw(MAL,"octopus.discover","No database list available");
> - BBPunfix(b1);
> + l2 = BATdescriptor(b2);
> + if ( l2 == NULL){
> + BBPreleaseref(b1);
> + throw(MAL,"octopus.discover","No database list available");
> + }
> + /* add the databases to the working set */
> + bi= bat_iterator(l1);
> + BATloop(l1,p,q){
> + str t= (str) BUNtail(bi,p);
> +
> + if (nrsea ==MAXSITES) break;
> + if (strlen(t) >= 3 && strcmp("sea", t+strlen(t)-3) == 0){
> + sea[nrsea].usr = GDKstrdup("monetdb");
> + sea[nrsea].pwd = GDKstrdup("monetdb");
> + sea[nrsea++].name= GDKstrdup(t);
> +#ifdef DEBUG_RUN_OCTOPUS
> + stream_printf(cntxt->fdout,"Found site %s\n",t);
> +#else
> + (void) cntxt;
> +#endif
> + }
> + }
> +#ifdef DEBUG_RUN_OCTOPUS
> + stream_printf(cntxt->fdout,"Seas %d\n",nrsea);
> +#endif
> + BBPreleaseref(b1);
> + BBPreleaseref(b2);
> return MAL_SUCCEED;
> }
>
> @-
> -The replica is identified by database name. The host and port
> -should address a merovingian to ensure the database instance is
> -started. The default is to contact the local merovingian at
> -default port 50000.
> +We first register the tentacle at all sites and keep
> +a list of those already sent.
> @c
> -str
> -OCTOPUSworker(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
> +static int
> +OCTOPUSfind(Sea s, str qry){
> + Registry r;
> + for ( r= s.nxt; r; r= r->nxt)
> + if ( strcmp(qry, r->fcn)==0)
> + return 1;
> + return 0;
> +}
> +
> +...@-
> +The functions called by the octopus.exec_qry are to be
> +registered at all sites.
> +...@c
> +static str
> +OCTOPUSregister(Client cntxt, MalBlkPtr mb, InstrPtr p)
> {
> - int idx;
> + int i;
> + str conn, fname, msg = MAL_SUCCEED;
> +
> + fname = getVarConstant(mb,getArg(p,2)).val.sval;
> + for ( i= 0; i< nrsea; i++){
> + msg= RMTconnect(&conn, &sea[i].name, &sea[i].usr, &sea[i].pwd);
> + if (msg ){
> + stream_printf(cntxt->fdout,"!%s\n",msg);
> + GDKfree(msg);
> + msg = NULL;
> + continue;
> + }
> + if( !OCTOPUSfind(sea[i], fname) ){
> + msg = RMTregisterInternal(cntxt, conn, octopusRef,
> fname);
>
> - (void) mb;
> - if (nrsites == MAXSITES)
> - throw(MAL,"scheduler.worker","Too many worker");
> - mal_set_lock(mal_contextLock,"scheduler.worker");
> - if (nrsites == 0)
> - sites = (Site *) GDKzalloc(sizeof(Site) * MAXSITES);
> - idx = nrsites++;
> - sites[idx].alias = NULL;
> - sites[idx].db = GDKstrdup(*(str*) getArgReference(stk,pci,1));
> - sites[idx].usr = GDKstrdup(*(str*) getArgReference(stk,pci,2));
> - sites[idx].pw = GDKstrdup(*(str*) getArgReference(stk,pci,3));
> - if (pci->argc > 4){
> - sites[idx].host = GDKstrdup(*(str*) getArgReference(stk,pci,4));
> - sites[idx].port = *(int*) getArgReference(stk,pci,5);
> - } else {
> - sites[idx].host = GDKstrdup("localhost");
> - sites[idx].port = 50000;
> - }
> - mal_unset_lock(mal_contextLock,"scheduler.worker");
> #ifdef DEBUG_RUN_OCTOPUS
> - stream_printf(cntxt->fdout,"# added worker %s %s %s %s\n",
> - sites[idx].alias, sites[idx].usr, sites[idx].pw);
> - sites[idx].db, sites[idx].usr, sites[idx].pw);
> + stream_printf(cntxt->fdout,"octopus.%s registered at
> site %s\n",
> + fname,sea[i].name);
> + stream_printf(cntxt->fdout,"reply: %s\n",msg?msg:"ok");
> #else
> - (void) cntxt;
> + (void) cntxt;
> #endif
> - return MAL_SUCCEED;
> -}
> -str
> -OCTOPUSdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
> -{
> - int i,j;
> - str alias = *(str*) getArgReference(stk,pci,1);
> -
> - (void) cntxt;
> - (void) mb;
> - mal_set_lock(mal_contextLock,"scheduler.drop");
> - for (i=j=0; i<nrsites; i++){
> - if( strcmp(sites[i].alias, alias) ==0) {
> - GDKfree(sites[i].alias);
> - GDKfree(sites[i].db);
> - GDKfree(sites[i].usr);
> - GDKfree(sites[i].pw);
> - GDKfree(sites[i].host);
> - continue;
> + if ( msg == MAL_SUCCEED){
> + Registry r= (Registry) GDKzalloc(sizeof(struct
> REGMAL));
> + r->fcn = GDKstrdup(getFunctionId(p));
> + r->nxt = sea[i].nxt;
> + sea[i].nxt = r;
> + }
> }
> - sites[j++] = sites[i];
> }
> - nrsites = j;
> - mal_unset_lock(mal_contextLock,"scheduler.drop");
> - if ( i == j )
> - throw(MAL,"scheduler.drop","Site not found");
> - return MAL_SUCCEED;
> + GDKfree(conn);
> + return msg;
> }
> @-
> -The policy to check for sites is a multiphase phase process.
> -First, we try to re-use a site where the operation was ran before.
> -If not available, we select a non-used worker.
> -If all this fails, we pick a random site to execute the plan.
> +The work division looks at the system opportunities and
> +replaces the target site in all instructions.
> +The first policy is to simply perform round robin.
> +The more advanced way is to negotiat with the remote sites.
> @c
> static str
> -OCTOPUSexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
> +OCTOPUSworkdivision(Client cntxt, MalBlkPtr mb, int pc)
> {
> - int i=0, tries= nrsites * 2;
> + static int rr=0;
> str msg = MAL_SUCCEED;
> + InstrPtr p;
> + ValPtr cst;
>
> -redo:
> + for (; pc< mb->stop; pc++){
> + if ( nrsea >1 && rr == 0) rr++; /* ignore default */
> + p= getInstrPtr(mb,pc);
> + if ( p->barrier == EXITsymbol)
> + break;
> + assert( isVarConstant(mb, getArg(p,1)) );
> + cst = &getVarConstant(mb, getArg(p,1));
> + if( cst->val.sval)
> + GDKfree(cst->val.sval);
> + cst->val.sval= GDKstrdup(sea[rr].name);
> + cst->len = strlen(cst->val.sval);
> #ifdef DEBUG_RUN_OCTOPUS
> - stream_printf(cntxt->fdout,"octopus.exec site selected %d\n",i);
> + stream_printf(cntxt->fdout,"octopus site selected
> %s\n",sea[rr].name);
> + printInstruction(cntxt->fdout,mb,0,p,LIST_MAL_STMT);
> #else
> - (void) cntxt;
> -#endif
> -
> - /* register the plan remotely */
> - msg = RMTregisterInternal(cntxt, sites[i].alias,
> - getModuleId(pci), getFunctionId(pci));
> -
> - /* ignore a duplicate definition */
> - if (msg != MAL_SUCCEED && !strstr(msg,"Function already defined")){
> -#ifdef DEBUG_RUN_OCTOPUS
> - stream_printf(cntxt->fdout,"octopus.exec failed to register
> plan %s.%s at site %s\n",getModuleId(pci),getFunctionId(pci),sites[i].alias);
> - stream_printf(cntxt->fdout,"reply: %s\n",msg);
> -#endif
> - if (--tries <= 0)
> - return msg;
> - goto redo;
> - }
> -
> - /* execute the plan as an independent process thread if it is local*/
> - /* otherwise activate it on the remote site passing parameters as well
> */
> - msg =runMALprocess(cntxt,mb,stk, getPC(mb,pci), getPC(mb,pci)+1);
> - if ( msg != MAL_SUCCEED){
> -#ifdef DEBUG_RUN_OCTOPUS
> - stream_printf(cntxt->fdout,"octopus.exec failed to run remote
> plan\n");
> + (void) cntxt;
> #endif
> - if (--tries <= 0)
> - return msg;
> - goto redo;
> + rr= (rr+1) % nrsea;
> }
> -
> - /* if it fails, we need to find another site */
> return msg;
> }
> @-
> @@ -270,37 +259,52 @@
> We should be careful in accessing a site that runs out
> of clients or any failure. It may cause the scheduler to
> wait forever.
> +
> +The database version should indicate to the tentacles
> +if it is time to refresh their caches.
> +It should be obtained from the recycler where we
> +know when updates have been taken place.
> @c
> str
> OCTOPUSrun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p)
> {
> int *res = (int*) getArgReference(stk,p,0);
> + int *version = (int*) getArgReference(stk,p,1);
> int timeout = *(int*) getArgReference(stk,p,2);
> - int j,fnd, i = getPC(mb,p);
> + int j,fnd, i = getPC(mb,p), threadcnt=0;
> str msg = MAL_SUCCEED;
> *res = 0; /* skip the block */
>
> - if ( OCTOPUSdiscover(cntxt) == 0 ){
> + *version = 0;
> +
> + if ( (msg= OCTOPUSdiscover(cntxt)) ){
> #ifdef DEBUG_RUN_OCTOPUS
> stream_printf(cntxt->fdout,"#Run in local serial mode\n");
> #endif
> *res = 1;
> - return MAL_SUCCEED;
> + return msg;
> + }
> +...@-
> +Register the tentacle functions at all sites.
> +...@c
> + if (nrsea > 1) {
> + for (j= i+1; j<mb->stop ; j++){
> + p= getInstrPtr(mb,j);
> + if ( p->barrier == EXITsymbol )
> + break;
> + msg= OCTOPUSregister(cntxt,mb,p);
> + if ( msg )
> + return msg;
> + }
> }
> + msg= OCTOPUSworkdivision(cntxt,mb,i+1);
> + if ( msg )
> + return msg;
> +
> /* do the actual parallel work */
> for (i++; i<mb->stop && msg == MAL_SUCCEED; i++){
> p= getInstrPtr(mb,i);
> - /* don't do it remote if we need arguments */
> - if ( p->retc != p->argc){
> -#ifdef DEBUG_RUN_OCTOPUS
> - stream_printf(cntxt->fdout,"#Run in local serial mode due to
> arguments\n");
> -#endif
> - *res = 1;
> - return MAL_SUCCEED;
> - }
> - if ( p->barrier == EXITsymbol )
> - break;
> - if ( getModuleId(p) == matRef && getFunctionId(p) == packRef){
> + if ( p->barrier == EXITsymbol ){
> /* collect the results */
> do{
> fnd = 0;
> @@ -311,10 +315,10 @@
> #endif
> MT_sleep_ms(1000);
> timeout--;
> - } while ( fnd < p->argc-3 && timeout > 0 );
> + } while ( fnd < threadcnt && timeout > 0 );
> if (timeout <= 0)
> - throw(MAL,"scheduler.pack","Execution time
> out");
> - return MATpackInternal(stk,p,1);
> + throw(MAL,"scheduler.octopus","Execution time
> out");
> + break;
> }
> if ( getModuleId(p) != octopusRef)
> throw(MAL,"scheduler.octopus","tentacle expected");
> @@ -323,7 +327,8 @@
> #else
> (void) cntxt;
> #endif
> - msg = OCTOPUSexec(cntxt,mb,stk,p);
> + msg =runMALprocess(cntxt,mb,stk, getPC(mb,p), getPC(mb,p)+1);
> + threadcnt++;
> }
> return msg;
> }
>
>
> ------------------------------------------------------------------------------
> This SF.net email is sponsored by:
> High Quality Requirements in a Collaborative Environment.
> Download a free trial of Rational Requirements Composer Now!
> http://p.sf.net/sfu/www-ibm-com
> _______________________________________________
> Monetdb-checkins mailing list
> [email protected]
> https://lists.sourceforge.net/lists/listinfo/monetdb-checkins
------------------------------------------------------------------------------
This SF.net email is sponsored by:
High Quality Requirements in a Collaborative Environment.
Download a free trial of Rational Requirements Composer Now!
http://p.sf.net/sfu/www-ibm-com
_______________________________________________
Monetdb-developers mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-developers