Changeset: 535fc010773c for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=535fc010773c
Modified Files:
        sql/backends/monet5/wlr.c
        sql/test/wlcr/Tests/wlc01.py
Branch: Nov2019
Log Message:

Mostly documentation and sending errors proper way.


diffs (truncated from 378 to 300 lines):

diff --git a/sql/backends/monet5/wlr.c b/sql/backends/monet5/wlr.c
--- a/sql/backends/monet5/wlr.c
+++ b/sql/backends/monet5/wlr.c
@@ -7,13 +7,12 @@
  */
 
 /*
- * A master can be replicated by taking a binary copy of the 'bat' directory.
- * This should be done under control of the program monetdb, e.g.
- * monetdb replica <masterlocation> <dbname>+
+ * A master can be replicated by taking a binary copy of the 'bat' directory
+ * when in quiescent mode or a more formal snapshot..
  * Alternatively you start with an empty database.
  * 
  * After restart of a mserver against the newly created image,
- * the log files from the master are processed.
+ * the log files from the master are processed by calling....
  *
  * In replay mode also all queries are executed if they surpass
  * the latest threshold set for by the master.
@@ -39,21 +38,24 @@
 #define WLC_ERROR 60
 
 #define _WLR_DEBUG_
+  
+MT_Lock     wlr_lock = MT_LOCK_INITIALIZER("wlr_lock");
 
 /* The current status of the replica  processing */
 static char wlr_master[IDLENGTH];
-static char wlr_error[FILENAME_MAX];           // errors should stop the 
process
-static int wlr_batches;        // the next file to be processed
-static lng wlr_tag;                    // the next transaction id to be 
processed
-static lng wlr_limit = -1;             // stop re-processing transactions when 
limit is reached
-static char wlr_timelimit[26];         // stop re-processing transactions when 
time limit is reached
-static char wlr_read[26];              // stop re-processing transactions when 
time limit is reached
-static int wlr_state;          // which state RUN/PAUSE
-static int wlr_beat;           // period between successive synchronisations 
with master
+static char wlr_error[FILENAME_MAX];   // errors should stop the process
+static int     wlr_batches;                            // the next file to be 
processed
+static lng     wlr_tag;                                        // the next 
transaction id to be processed
+static lng     wlr_limit = -1;                         // stop re-processing 
transactions when limit is reached
+static char wlr_timelimit[26];                 // stop re-processing 
transactions when time limit is reached
+static char wlr_read[26];                              // stop re-processing 
transactions when time limit is reached
+static int     wlr_state;                                      // which state 
RUN/PAUSE
+static int     wlr_beat;                                       // period 
between successive synchronisations with master
 static MT_Id wlr_thread;
 
 #define MAXLINE 2048
 
+/* Simple read the configuration file */
 static str
 WLRgetConfig(void){
        char *path;
@@ -62,8 +64,10 @@ WLRgetConfig(void){
        int len;
        str msg = MAL_SUCCEED;
 
-       if((path = GDKfilepath(0,0,"wlr.config",0)) == NULL)
-               throw(MAL,"wlr.getConfig","Could not access wlr.config file\n");
+       if((path = GDKfilepath(0, 0, "wlr.config", 0)) == NULL){
+               msg = createException(MAL, "wlr.getConfig","Could not access 
wlr.config file\n");
+               return msg;
+       }
        fd = fopen(path,"r");
        GDKfree(path);
        if( fd == NULL)
@@ -95,11 +99,14 @@ WLRgetConfig(void){
                        }
                }
        }
+       if (msg == MAL_SUCCEED && wlr_limit < wlr_tag)
+                       msg = createException(MAL, "wlr.getConfig", 
"inconsistent config record");
 bailout:
        fclose(fd);
        return msg;
 }
 
+/* Keep the current status in the config file */
 static str
 WLRsetConfig(void){
        char *path;
@@ -109,9 +116,9 @@ WLRsetConfig(void){
                throw(MAL,"wlr.setMaster","Could not access wlr.config file\n");
        fd = open_wastream(path);
        GDKfree(path);
-       if( fd == NULL){
-               return MAL_SUCCEED;
-       }
+       if( fd == NULL)
+               throw(MAL,"wlr.setMaster","Could not create wlr.config file\n");
+
        mnstr_printf(fd,"master=%s\n", wlr_master);
        mnstr_printf(fd,"batches=%d\n", wlr_batches);
        mnstr_printf(fd,"tag="LLFMT"\n", wlr_tag);
@@ -129,14 +136,14 @@ WLRsetConfig(void){
  * When the master database exist, we should set the replica administration.
  * But only once.
  *
- * The log files are identified by a range. It starts with 0 when an empty
- * database was used to bootstrap. Otherwise it is the range of the dbmaster.
+ * The log files are identified by a range. It starts with 0 when an empty 
databas
+ * was used to bootstrap. Otherwise it is the range received from the dbmaster.
  * At any time we should be able to restart the synchronization
  * process by grabbing a new set of log files.
  * This calls for keeping track in the replica what log files have been 
applied.
  *
  * Given that the replication thread runs independently, all errors encountered
- * should be sent to the system log.
+ * should be sent to the system logging.
  */
 static str
 WLRgetMaster(void)
@@ -150,7 +157,7 @@ WLRgetMaster(void)
                return MAL_SUCCEED;
 
        /* collect master properties */
-       len = snprintf(path,FILENAME_MAX,"..%c%s",DIR_SEP,wlr_master);
+       len = snprintf(path, FILENAME_MAX, "..%c%s", DIR_SEP, wlr_master);
        if (len == -1 || len >= FILENAME_MAX)
                throw(MAL, "wlr.getMaster", "wlc.config filename path is too 
large");
        if((dir = GDKfilepath(0,path,"wlc.config",0)) == NULL)
@@ -172,6 +179,15 @@ WLRgetMaster(void)
  */
 static int wlrprocessrunning;
 
+/* each WLR block is turned into a separate MAL block and executed
+ * This block is re-used as we consider the complete file.
+ */
+
+#define cleanup(){\
+       resetMalBlkAndFreeInstructions(mb, 1);\
+       trimMalVariables(mb, NULL);\
+       pc = 0;}
+
 static void
 WLRprocess(void *arg)
 {
@@ -203,6 +219,8 @@ WLRprocess(void *arg)
                fprintf(stderr, "#Could not create user for WLR process\n");
                return;
        }
+
+       /* Cook a log file into a concreate MAL function for multiple 
transactions */
        prev = newFunction(putName("user"), putName("wlr"), FUNCTIONsymbol);
        if(prev == NULL) {
                wlrprocessrunning =0;
@@ -237,7 +255,6 @@ WLRprocess(void *arg)
                if( fd == NULL){
                        fprintf(stderr,"#wlr.process:'%s' can not be accessed 
\n",path);
                        // Be careful not to miss log files.
-                       // In the future wait for more files becoming available.
                        continue;
                }
                sz = getFileSize(fd);
@@ -273,7 +290,7 @@ WLRprocess(void *arg)
                        }
                        q= getInstrPtr(mb, mb->stop-1);
                        if( getModuleId(q) == wlrRef && getFunctionId(q) == 
transactionRef && (currid = getVarConstant(mb, getArg(q,1)).val.lval) < 
wlr_tag){
-                               /* skip already executed transactions */
+                               /* skip already executed transaction log */
                        } else
                        if( getModuleId(q) == wlrRef && getFunctionId(q) == 
transactionRef &&
                                ( ( (currid = getVarConstant(mb, 
getArg(q,1)).val.lval) >= wlr_limit && wlr_limit != -1) ||
@@ -310,24 +327,28 @@ WLRprocess(void *arg)
                                        if(mvc_trans(sql) < 0) {
                                                fprintf(stderr,"#Allocation 
failure while starting the transaction \n");
                                        } else {
+#ifdef _WLR_DEBUG_
+                                               fprintf(stderr,"#process a 
transaction\n");
+                                               printFunction(GDKerr, mb, 0, 
LIST_MAL_DEBUG );
+#endif
                                                msg= runMAL(c,mb,0,0);
                                                wlr_tag++;
                                                if( msg == MAL_SUCCEED)
+                                                       /* at this point we 
have updated the replica, but the configuration has not been changed.
+                                                        * If at this point an 
error occurs, we could redo the same transaction twice later on.
+                                                        * The solution is to 
make sure that we recognize that a transaction has started and is completed 
successfully
+                                                        */
                                                        msg = WLRsetConfig( );
                                                // ignore warnings
                                                if (msg && 
strstr(msg,"WARNING"))
                                                        msg = MAL_SUCCEED;
                                                if( msg != MAL_SUCCEED){
                                                        // they should always 
succeed
-                                                       fprintf(stderr,"ERROR 
in processing batch %d :%s\n", i, msg);
+                                                       fprintf(stderr,"#ERROR 
in wlr processing batch %d :%s\n", i, msg);
                                                        printFunction(GDKerr, 
mb, 0, LIST_MAL_DEBUG );
                                                        if((other = 
mvc_rollback(sql,0,NULL, false)) != MAL_SUCCEED) //an error was already 
established
                                                                GDKfree(other);
-                                                       // cleanup
-                                                       
printFunction(GDKerr,mb,0,63);
-                                                       
resetMalBlkAndFreeInstructions(mb, 1);
-                                                       trimMalVariables(mb, 
NULL);
-                                                       pc = 0;
+                                                       cleanup();
                                                } else
                                                if((msg = mvc_commit(sql, 0, 0, 
false)) != MAL_SUCCEED) {
                                                        
fprintf(stderr,"#wlr.process transaction commit failed: %s\n", msg);
@@ -341,16 +362,10 @@ WLRprocess(void *arg)
                                        fprintf(stderr,"%s",line);
                                        printFunction(GDKerr, mb, 0, 
LIST_MAL_DEBUG );
                                }
-                               // cleanup
-                               resetMalBlkAndFreeInstructions(mb, 1);
-                               trimMalVariables(mb, NULL);
-                               pc = 0;
+                               cleanup();
                        } else
                        if ( getModuleId(q) == wlrRef && getFunctionId(q) == 
rollbackRef ){
-                               // cleanup
-                               resetMalBlkAndFreeInstructions(mb, 1);
-                               trimMalVariables(mb, NULL);
-                               pc = 0;
+                               cleanup();
                        }
                } while( mb->errors == 0 && pc != mb->stop);
 #ifdef _WLR_DEBUG_
@@ -380,8 +395,9 @@ wrapup:
 }
 
 /*
- *  Single WLR thread is allowed to run in the background.
+ *  A single WLR thread is allowed to run in the background.
  *  If it happens to crash then replication roll forward is suspended.
+ *  Moreover, the background job can only leave error messages in the 
merovingian log.
  *
  * A timing issue.
  * The WLRprocess can only start after an SQL environment has been initialized.
@@ -427,8 +443,8 @@ WLRprocessScheduler(void *arg)
                        if(strncmp(clktxt, wlr_timelimit,sizeof(wlr_timelimit)) 
>= 0) 
                                MT_sleep_ms(duration);
                } else
-                       for( ; duration > 0  && wlr_state == WLR_PAUSE; 
duration -= 100){
-                               MT_sleep_ms( 100);
+                       for( ; duration > 0  && wlr_state == WLR_PAUSE; 
duration -= 20){
+                               MT_sleep_ms( 20);
                        }
                if( wlr_master[0] && wlr_state != WLR_PAUSE){
                        if((msg = WLRgetMaster()) != MAL_SUCCEED) {
@@ -453,21 +469,22 @@ WLRinit(void)
 {
        str msg;
 
-       MT_lock_set(&wlc_lock);
+       MT_lock_set(&wlr_lock);
        if( wlrprocessrunning){
-               MT_lock_unset(&wlc_lock);
+               MT_lock_unset(&wlr_lock);
                return MAL_SUCCEED;
        }
        if((msg = WLRgetConfig()) != MAL_SUCCEED){
-               MT_lock_unset(&wlc_lock);
+               fprintf(stderr,"%s\n",msg);
+               MT_lock_unset(&wlr_lock);
                return msg;
        }
        if( wlr_master[0] == 0){
-               MT_lock_unset(&wlc_lock);
+               MT_lock_unset(&wlr_lock);
                return MAL_SUCCEED;
        }
        if( wlr_state != WLR_START){
-               MT_lock_unset(&wlc_lock);
+               MT_lock_unset(&wlr_lock);
                return MAL_SUCCEED;
        }
        
@@ -477,10 +494,12 @@ WLRinit(void)
                        throw(SQL,"wlr.init",SQLSTATE(42000) "Starting wlr 
manager failed");
        }
        wlrprocessrunning ++;
-       MT_lock_unset(&wlc_lock);
+       MT_lock_unset(&wlr_lock);
        return MAL_SUCCEED;
 }
 
+// The replicate() command can be issued at the SQL console
+// which can accept exceptions
 str
 WLRreplicate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {      str timelimit  = wlr_timelimit;
@@ -489,8 +508,10 @@ WLRreplicate(Client cntxt, MalBlkPtr mb,
        (void) mb;
 
        // first stop the background process
-       if((msg = WLRgetConfig()) != MAL_SUCCEED)
+       if((msg = WLRgetConfig()) != MAL_SUCCEED){
+               fprintf(stderr,"%s\n",msg);
                return msg;
+       }
        if( wlr_state != WLR_START){
                wlr_state = WLR_PAUSE;
                while(wlr_state != WLR_START){
@@ -503,8 +524,8 @@ WLRreplicate(Client cntxt, MalBlkPtr mb,
                if( getArgType(mb, pci, 1) == TYPE_str){
                        int len;
                        wlr_limit = -1;
-                       if( 
strcmp(GDKgetenv("gdk_dbname"),*getArgReference_str(stk,pci,1)) == 0)
-                               throw(SQL,"wlr.replicate",SQLSTATE(42000) 
"Master and replicate should be different");
+                       if( strcmp(GDKgetenv("gdk_dbname"), 
*getArgReference_str(stk,pci,1)) == 0)
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to