Changeset: 535fc010773c for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=535fc010773c Modified Files: sql/backends/monet5/wlr.c sql/test/wlcr/Tests/wlc01.py Branch: Nov2019 Log Message:
Mostly documentation and sending errors proper way. diffs (truncated from 378 to 300 lines): diff --git a/sql/backends/monet5/wlr.c b/sql/backends/monet5/wlr.c --- a/sql/backends/monet5/wlr.c +++ b/sql/backends/monet5/wlr.c @@ -7,13 +7,12 @@ */ /* - * A master can be replicated by taking a binary copy of the 'bat' directory. - * This should be done under control of the program monetdb, e.g. - * monetdb replica <masterlocation> <dbname>+ + * A master can be replicated by taking a binary copy of the 'bat' directory + * when in quiescent mode or a more formal snapshot.. * Alternatively you start with an empty database. * * After restart of a mserver against the newly created image, - * the log files from the master are processed. + * the log files from the master are processed by calling.... * * In replay mode also all queries are executed if they surpass * the latest threshold set for by the master. @@ -39,21 +38,24 @@ #define WLC_ERROR 60 #define _WLR_DEBUG_ + +MT_Lock wlr_lock = MT_LOCK_INITIALIZER("wlr_lock"); /* The current status of the replica processing */ static char wlr_master[IDLENGTH]; -static char wlr_error[FILENAME_MAX]; // errors should stop the process -static int wlr_batches; // the next file to be processed -static lng wlr_tag; // the next transaction id to be processed -static lng wlr_limit = -1; // stop re-processing transactions when limit is reached -static char wlr_timelimit[26]; // stop re-processing transactions when time limit is reached -static char wlr_read[26]; // stop re-processing transactions when time limit is reached -static int wlr_state; // which state RUN/PAUSE -static int wlr_beat; // period between successive synchronisations with master +static char wlr_error[FILENAME_MAX]; // errors should stop the process +static int wlr_batches; // the next file to be processed +static lng wlr_tag; // the next transaction id to be processed +static lng wlr_limit = -1; // stop re-processing transactions when limit is reached +static char wlr_timelimit[26]; // stop re-processing transactions when time limit is reached +static char wlr_read[26]; // stop re-processing transactions when time limit is reached +static int wlr_state; // which state RUN/PAUSE +static int wlr_beat; // period between successive synchronisations with master static MT_Id wlr_thread; #define MAXLINE 2048 +/* Simple read the configuration file */ static str WLRgetConfig(void){ char *path; @@ -62,8 +64,10 @@ WLRgetConfig(void){ int len; str msg = MAL_SUCCEED; - if((path = GDKfilepath(0,0,"wlr.config",0)) == NULL) - throw(MAL,"wlr.getConfig","Could not access wlr.config file\n"); + if((path = GDKfilepath(0, 0, "wlr.config", 0)) == NULL){ + msg = createException(MAL, "wlr.getConfig","Could not access wlr.config file\n"); + return msg; + } fd = fopen(path,"r"); GDKfree(path); if( fd == NULL) @@ -95,11 +99,14 @@ WLRgetConfig(void){ } } } + if (msg == MAL_SUCCEED && wlr_limit < wlr_tag) + msg = createException(MAL, "wlr.getConfig", "inconsistent config record"); bailout: fclose(fd); return msg; } +/* Keep the current status in the config file */ static str WLRsetConfig(void){ char *path; @@ -109,9 +116,9 @@ WLRsetConfig(void){ throw(MAL,"wlr.setMaster","Could not access wlr.config file\n"); fd = open_wastream(path); GDKfree(path); - if( fd == NULL){ - return MAL_SUCCEED; - } + if( fd == NULL) + throw(MAL,"wlr.setMaster","Could not create wlr.config file\n"); + mnstr_printf(fd,"master=%s\n", wlr_master); mnstr_printf(fd,"batches=%d\n", wlr_batches); mnstr_printf(fd,"tag="LLFMT"\n", wlr_tag); @@ -129,14 +136,14 @@ WLRsetConfig(void){ * When the master database exist, we should set the replica administration. * But only once. * - * The log files are identified by a range. It starts with 0 when an empty - * database was used to bootstrap. Otherwise it is the range of the dbmaster. + * The log files are identified by a range. It starts with 0 when an empty databas + * was used to bootstrap. Otherwise it is the range received from the dbmaster. * At any time we should be able to restart the synchronization * process by grabbing a new set of log files. * This calls for keeping track in the replica what log files have been applied. * * Given that the replication thread runs independently, all errors encountered - * should be sent to the system log. + * should be sent to the system logging. */ static str WLRgetMaster(void) @@ -150,7 +157,7 @@ WLRgetMaster(void) return MAL_SUCCEED; /* collect master properties */ - len = snprintf(path,FILENAME_MAX,"..%c%s",DIR_SEP,wlr_master); + len = snprintf(path, FILENAME_MAX, "..%c%s", DIR_SEP, wlr_master); if (len == -1 || len >= FILENAME_MAX) throw(MAL, "wlr.getMaster", "wlc.config filename path is too large"); if((dir = GDKfilepath(0,path,"wlc.config",0)) == NULL) @@ -172,6 +179,15 @@ WLRgetMaster(void) */ static int wlrprocessrunning; +/* each WLR block is turned into a separate MAL block and executed + * This block is re-used as we consider the complete file. + */ + +#define cleanup(){\ + resetMalBlkAndFreeInstructions(mb, 1);\ + trimMalVariables(mb, NULL);\ + pc = 0;} + static void WLRprocess(void *arg) { @@ -203,6 +219,8 @@ WLRprocess(void *arg) fprintf(stderr, "#Could not create user for WLR process\n"); return; } + + /* Cook a log file into a concreate MAL function for multiple transactions */ prev = newFunction(putName("user"), putName("wlr"), FUNCTIONsymbol); if(prev == NULL) { wlrprocessrunning =0; @@ -237,7 +255,6 @@ WLRprocess(void *arg) if( fd == NULL){ fprintf(stderr,"#wlr.process:'%s' can not be accessed \n",path); // Be careful not to miss log files. - // In the future wait for more files becoming available. continue; } sz = getFileSize(fd); @@ -273,7 +290,7 @@ WLRprocess(void *arg) } q= getInstrPtr(mb, mb->stop-1); if( getModuleId(q) == wlrRef && getFunctionId(q) == transactionRef && (currid = getVarConstant(mb, getArg(q,1)).val.lval) < wlr_tag){ - /* skip already executed transactions */ + /* skip already executed transaction log */ } else if( getModuleId(q) == wlrRef && getFunctionId(q) == transactionRef && ( ( (currid = getVarConstant(mb, getArg(q,1)).val.lval) >= wlr_limit && wlr_limit != -1) || @@ -310,24 +327,28 @@ WLRprocess(void *arg) if(mvc_trans(sql) < 0) { fprintf(stderr,"#Allocation failure while starting the transaction \n"); } else { +#ifdef _WLR_DEBUG_ + fprintf(stderr,"#process a transaction\n"); + printFunction(GDKerr, mb, 0, LIST_MAL_DEBUG ); +#endif msg= runMAL(c,mb,0,0); wlr_tag++; if( msg == MAL_SUCCEED) + /* at this point we have updated the replica, but the configuration has not been changed. + * If at this point an error occurs, we could redo the same transaction twice later on. + * The solution is to make sure that we recognize that a transaction has started and is completed successfully + */ msg = WLRsetConfig( ); // ignore warnings if (msg && strstr(msg,"WARNING")) msg = MAL_SUCCEED; if( msg != MAL_SUCCEED){ // they should always succeed - fprintf(stderr,"ERROR in processing batch %d :%s\n", i, msg); + fprintf(stderr,"#ERROR in wlr processing batch %d :%s\n", i, msg); printFunction(GDKerr, mb, 0, LIST_MAL_DEBUG ); if((other = mvc_rollback(sql,0,NULL, false)) != MAL_SUCCEED) //an error was already established GDKfree(other); - // cleanup - printFunction(GDKerr,mb,0,63); - resetMalBlkAndFreeInstructions(mb, 1); - trimMalVariables(mb, NULL); - pc = 0; + cleanup(); } else if((msg = mvc_commit(sql, 0, 0, false)) != MAL_SUCCEED) { fprintf(stderr,"#wlr.process transaction commit failed: %s\n", msg); @@ -341,16 +362,10 @@ WLRprocess(void *arg) fprintf(stderr,"%s",line); printFunction(GDKerr, mb, 0, LIST_MAL_DEBUG ); } - // cleanup - resetMalBlkAndFreeInstructions(mb, 1); - trimMalVariables(mb, NULL); - pc = 0; + cleanup(); } else if ( getModuleId(q) == wlrRef && getFunctionId(q) == rollbackRef ){ - // cleanup - resetMalBlkAndFreeInstructions(mb, 1); - trimMalVariables(mb, NULL); - pc = 0; + cleanup(); } } while( mb->errors == 0 && pc != mb->stop); #ifdef _WLR_DEBUG_ @@ -380,8 +395,9 @@ wrapup: } /* - * Single WLR thread is allowed to run in the background. + * A single WLR thread is allowed to run in the background. * If it happens to crash then replication roll forward is suspended. + * Moreover, the background job can only leave error messages in the merovingian log. * * A timing issue. * The WLRprocess can only start after an SQL environment has been initialized. @@ -427,8 +443,8 @@ WLRprocessScheduler(void *arg) if(strncmp(clktxt, wlr_timelimit,sizeof(wlr_timelimit)) >= 0) MT_sleep_ms(duration); } else - for( ; duration > 0 && wlr_state == WLR_PAUSE; duration -= 100){ - MT_sleep_ms( 100); + for( ; duration > 0 && wlr_state == WLR_PAUSE; duration -= 20){ + MT_sleep_ms( 20); } if( wlr_master[0] && wlr_state != WLR_PAUSE){ if((msg = WLRgetMaster()) != MAL_SUCCEED) { @@ -453,21 +469,22 @@ WLRinit(void) { str msg; - MT_lock_set(&wlc_lock); + MT_lock_set(&wlr_lock); if( wlrprocessrunning){ - MT_lock_unset(&wlc_lock); + MT_lock_unset(&wlr_lock); return MAL_SUCCEED; } if((msg = WLRgetConfig()) != MAL_SUCCEED){ - MT_lock_unset(&wlc_lock); + fprintf(stderr,"%s\n",msg); + MT_lock_unset(&wlr_lock); return msg; } if( wlr_master[0] == 0){ - MT_lock_unset(&wlc_lock); + MT_lock_unset(&wlr_lock); return MAL_SUCCEED; } if( wlr_state != WLR_START){ - MT_lock_unset(&wlc_lock); + MT_lock_unset(&wlr_lock); return MAL_SUCCEED; } @@ -477,10 +494,12 @@ WLRinit(void) throw(SQL,"wlr.init",SQLSTATE(42000) "Starting wlr manager failed"); } wlrprocessrunning ++; - MT_lock_unset(&wlc_lock); + MT_lock_unset(&wlr_lock); return MAL_SUCCEED; } +// The replicate() command can be issued at the SQL console +// which can accept exceptions str WLRreplicate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { str timelimit = wlr_timelimit; @@ -489,8 +508,10 @@ WLRreplicate(Client cntxt, MalBlkPtr mb, (void) mb; // first stop the background process - if((msg = WLRgetConfig()) != MAL_SUCCEED) + if((msg = WLRgetConfig()) != MAL_SUCCEED){ + fprintf(stderr,"%s\n",msg); return msg; + } if( wlr_state != WLR_START){ wlr_state = WLR_PAUSE; while(wlr_state != WLR_START){ @@ -503,8 +524,8 @@ WLRreplicate(Client cntxt, MalBlkPtr mb, if( getArgType(mb, pci, 1) == TYPE_str){ int len; wlr_limit = -1; - if( strcmp(GDKgetenv("gdk_dbname"),*getArgReference_str(stk,pci,1)) == 0) - throw(SQL,"wlr.replicate",SQLSTATE(42000) "Master and replicate should be different"); + if( strcmp(GDKgetenv("gdk_dbname"), *getArgReference_str(stk,pci,1)) == 0) _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list