Changeset: 7eeaf552e950 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7eeaf552e950
Modified Files:
        sql/backends/monet5/sql_cquery.c
        sql/server/sql_parser.y
Branch: trails
Log Message:

Fixed SQL grammar for start and resume continuous queries. Fixed the internal 
behavior of STOP ALL and PAUSE ALL


diffs (truncated from 488 to 300 lines):

diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -286,20 +286,19 @@ CQlocateMb(MalBlkPtr mb, int* idx, str* 
                        break;
        }
        assert(i != mb->stop);
-       mb2str = instruction2str(mb, NULL, sig, LIST_MAL_CALL);
-       if(mb2str == NULL) {
-               throw(MAL,call,MAL_MALLOC_FAIL);
+       if((mb2str = instruction2str(mb, NULL, sig, LIST_MAL_CALL)) == NULL) {
+               throw(SQL,call,MAL_MALLOC_FAIL);
        }
 
        for (i = 0; i < pnettop; i++){
                if (strcmp(pnet[i].stmt, mb2str) == 0) {
-                       GDKfree(mb2str);
                        *idx = i;
+                       *res = mb2str;
                        return MAL_SUCCEED;
                }
        }
+       *idx = i;
        *res = mb2str;
-       *idx = i;
        return MAL_SUCCEED;
 }
 
@@ -317,7 +316,7 @@ CQerror(Client cntxt, MalBlkPtr mb, MalS
 
        idx = CQlocate(sch, fcn);
        if( idx == pnettop)
-               throw(SQL,"cquery.error","Continuous procedure %s.%s not 
accessible",sch,fcn);
+               throw(SQL,"cquery.error","The continuous procedure %s.%s is not 
accessible\n",sch,fcn);
 
        pnet[idx].error = GDKstrdup(error);
        return MAL_SUCCEED;
@@ -336,7 +335,7 @@ CQshow(Client cntxt, MalBlkPtr mb, MalSt
 
        idx = CQlocate(sch, fcn);
        if( idx == pnettop)
-               throw(SQL,"cquery.show","continuous procedure %s.%s not 
accessible",sch,fcn);
+               throw(SQL,"cquery.show","The continuous procedure %s.%s is not 
accessible\n",sch,fcn);
 
        printFunction(cntxt->fdout, pnet[idx].mb, 0, LIST_MAL_NAME | 
LIST_MAL_VALUE  | LIST_MAL_MAPI);
        return MAL_SUCCEED;
@@ -535,7 +534,7 @@ CQprocedure(Client cntxt, MalBlkPtr mb, 
                return msg;
        s = findSymbolInModule(cntxt->nspace, putName(nme));
        if (s == NULL)
-               throw(SQL, "cqeury.procedure", "Definition of procedure 
missing");
+               throw(SQL, "cquery.procedure", "Definition of procedure 
missing");
        qry = s->def;
 
        chkProgram(cntxt->fdout,cntxt->nspace,qry);
@@ -596,12 +595,12 @@ CQregister(Client cntxt, MalBlkPtr mb, M
 
        (void) pci;
 
-       if(cycles <= 0 && cycles != int_nil){
-               msg = createException(SQL,"cquery.register","The cycles value 
must be positive");
+       if(cycles < 0 && cycles != int_nil){
+               msg = createException(SQL,"cquery.register","The cycles value 
must be non negative\n");
                goto finish;
        }
-       if(heartbeats <= 0){
-               msg = createException(SQL,"cquery.register","The heartbeats 
value must be positive");
+       if(heartbeats < 0){
+               msg = createException(SQL,"cquery.register","The heartbeats 
value must be non negative\n");
                goto finish;
        }
 
@@ -700,18 +699,22 @@ static str
 CQresumeInternal(Client cntxt, MalBlkPtr mb, int with_alter)
 {
        mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc;
-       str msg = MAL_SUCCEED, mb2str;
+       str msg = MAL_SUCCEED, mb2str = NULL;
        int idx, cycles, heartbeats;
 
+#ifdef DEBUG_CQUERY
+       fprintf(stderr, "#resume scheduler\n");
+#endif
+
        if(with_alter) {
                cycles = sqlcontext ? sqlcontext->cycles : int_nil;
                heartbeats = sqlcontext ? sqlcontext->heartbeats : 1;
-               if(cycles <= 0 && cycles != int_nil){
-                       msg = createException(SQL,"cquery.resume","The cycles 
value must be positive");
+               if(cycles < 0 && cycles != int_nil){
+                       msg = createException(SQL,"cquery.resume","The cycles 
value must be non negative\n");
                        goto finish;
                }
-               if(heartbeats <= 0){
-                       msg = createException(SQL,"cquery.resume","The 
heartbeats value must be positive");
+               if(heartbeats < 0){
+                       msg = createException(SQL,"cquery.resume","The 
heartbeats value must be non negative\n");
                        goto finish;
                }
        }
@@ -722,16 +725,14 @@ CQresumeInternal(Client cntxt, MalBlkPtr
                goto unlock;
        }
        if( idx == pnettop) {
-               msg = createException(SQL, "cquery.resume", "continuous 
procedure %s has not yet started\n", mb2str);
-               GDKfree(mb2str);
+               msg = createException(SQL, "cquery.resume", "The continuous 
procedure %s has not yet started\n", mb2str);
                goto unlock;
        }
-       if( pnet[idx].status != CQPAUSE)
+       if( pnet[idx].status != CQPAUSE) {
+               msg = createException(SQL, "cquery.resume", "The continuous 
procedure %s is already running\n", mb2str);
                goto unlock;
+       }
 
-#ifdef DEBUG_CQUERY
-       fprintf(stderr, "#resume scheduler\n");
-#endif
        pnet[idx].status = CQWAIT;
        if(with_alter) {
                pnet[idx].cycles = cycles;
@@ -744,43 +745,13 @@ CQresumeInternal(Client cntxt, MalBlkPtr
        }
 
 unlock:
+       if(mb2str)
+               GDKfree(mb2str);
        MT_lock_unset(&ttrLock);
 finish:
        return msg;
 }
 
-static str
-CQresumeInternalRanges(int first, int last)
-{
-       str msg = MAL_SUCCEED;
-
-#ifdef DEBUG_CQUERY
-       fprintf(stderr, "#resume scheduler\n");
-#endif
-       for( ; first < last; first++)
-               pnet[first].status = CQWAIT;
-
-       /* start the scheduler if needed */
-       if(CQinit == 0) {
-               msg = CQstartScheduler();
-       }
-       return msg;
-}
-
-str
-CQresumeAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
-       str msg;
-       (void) cntxt;
-       (void) mb;
-       (void) stk;
-       (void) pci;
-       MT_lock_set(&ttrLock);
-       msg = CQresumeInternalRanges(0, pnettop);
-       MT_lock_unset(&ttrLock);
-       return msg;
-}
-
 str
 CQresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
@@ -801,7 +772,7 @@ CQresume(Client cntxt, MalBlkPtr mb, Mal
        }
        if( k >= 0 )
                return CQresumeInternal(cntxt, mb, 1);
-       throw(SQL,"cquery.resume","continuous procedure %s.%s not found", 
getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
+       throw(SQL,"cquery.resume","The continuous procedure %s.%s was not 
found\n", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
 }
 
 str
@@ -824,33 +795,52 @@ CQresumeNoAlter(Client cntxt, MalBlkPtr 
        }
        if( k >= 0 )
                return CQresumeInternal(cntxt, mb, 0);
-       throw(SQL,"cquery.resume","continuous procedure %s.%s not found", 
getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
+       throw(SQL,"cquery.resume","The continuous procedure %s.%s was not 
found\n", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
 }
 
-static str
-CQpauseInternalRanges(int first, int last)
+str
+CQresumeAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
+       str msg = MAL_SUCCEED;
+       int i;
+
+       (void) cntxt;
+       (void) mb;
+       (void) stk;
+       (void) pci;
+
 #ifdef DEBUG_CQUERY
-       fprintf(stderr, "#pause cqueries\n");
+       fprintf(stderr, "#resume scheduler\n");
 #endif
-       for( ; first < last; first++)
-               pnet[first].status = CQPAUSE;
-       return MAL_SUCCEED;
+
+       MT_lock_set(&ttrLock);
+       for(i = 0 ; i < pnettop; i++)
+               pnet[i].status = CQWAIT;
+
+       /* start the scheduler if needed */
+       if(CQinit == 0) {
+               msg = CQstartScheduler();
+       }
+       MT_lock_unset(&ttrLock);
+       return msg;
 }
 
 static str
 CQpauseInternal(MalBlkPtr mb)
 {
        int idx;
-       str msg = MAL_SUCCEED, mb2str;
+       str msg = MAL_SUCCEED, mb2str = NULL;
 
        MT_lock_set(&ttrLock);
        if(CQlocateMb(mb, &idx, &mb2str, "cquery.pause") != MAL_SUCCEED) {
                goto finish;
        }
        if( idx == pnettop) {
-               msg = createException(SQL, "cquery.pause", "continuous 
procedure %s has not yet started\n", mb2str);
-               GDKfree(mb2str);
+               msg = createException(SQL, "cquery.pause", "The continuous 
procedure %s has not yet started\n", mb2str);
+               goto finish;
+       }
+       if( pnet[idx].status == CQPAUSE) {
+               msg = createException(SQL, "cquery.pause", "The continuous 
procedure %s is already paused\n", mb2str);
                goto finish;
        }
        // actually wait if the query was running
@@ -861,23 +851,11 @@ CQpauseInternal(MalBlkPtr mb)
                if( pnet[idx].status == CQWAIT)
                        break;
        }
-       msg = CQpauseInternalRanges(idx, idx+1);
-finish:
-       MT_lock_unset(&ttrLock);
-       return msg;
-}
+       pnet[idx].status = CQPAUSE;
 
-str
-CQpauseAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
-       str msg;
-       (void) cntxt;
-       (void) mb;
-       (void) stk;
-       (void) pci;
-       //pause all
-       MT_lock_set(&ttrLock);
-       msg = CQpauseInternalRanges(0, pnettop);
+finish:
+       if(mb2str)
+               GDKfree(mb2str);
        MT_lock_unset(&ttrLock);
        return msg;
 }
@@ -902,7 +880,37 @@ CQpause(Client cntxt, MalBlkPtr mb, MalS
        }
        if( k >= 0)
                return CQpauseInternal(mb);
-       throw(SQL,"cquery.pause","continuous procedure %s.%s not found", 
getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
+       throw(SQL,"cquery.pause","The continuous procedure %s.%s was not 
found\n", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
+}
+
+str
+CQpauseAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       str msg = MAL_SUCCEED;
+       int i;
+
+       (void) cntxt;
+       (void) mb;
+       (void) stk;
+       (void) pci;
+
+#ifdef DEBUG_CQUERY
+       fprintf(stderr, "#pause cqueries\n");
+#endif
+
+       MT_lock_set(&ttrLock);
+       for(i = 0 ; i < pnettop; i++) {
+               while( pnet[i].status == CQRUNNING ){
+                       MT_lock_unset(&ttrLock);
+                       MT_sleep_ms(5);
+                       MT_lock_set(&ttrLock);
+                       if( pnet[i].status == CQWAIT)
+                               break;
+               }
+               pnet[i].status = CQPAUSE;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to