Changeset: 7eeaf552e950 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7eeaf552e950 Modified Files: sql/backends/monet5/sql_cquery.c sql/server/sql_parser.y Branch: trails Log Message:
Fixed SQL grammar for start and resume continuous queries. Fixed the internal behavior of STOP ALL and PAUSE ALL diffs (truncated from 488 to 300 lines): diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c --- a/sql/backends/monet5/sql_cquery.c +++ b/sql/backends/monet5/sql_cquery.c @@ -286,20 +286,19 @@ CQlocateMb(MalBlkPtr mb, int* idx, str* break; } assert(i != mb->stop); - mb2str = instruction2str(mb, NULL, sig, LIST_MAL_CALL); - if(mb2str == NULL) { - throw(MAL,call,MAL_MALLOC_FAIL); + if((mb2str = instruction2str(mb, NULL, sig, LIST_MAL_CALL)) == NULL) { + throw(SQL,call,MAL_MALLOC_FAIL); } for (i = 0; i < pnettop; i++){ if (strcmp(pnet[i].stmt, mb2str) == 0) { - GDKfree(mb2str); *idx = i; + *res = mb2str; return MAL_SUCCEED; } } + *idx = i; *res = mb2str; - *idx = i; return MAL_SUCCEED; } @@ -317,7 +316,7 @@ CQerror(Client cntxt, MalBlkPtr mb, MalS idx = CQlocate(sch, fcn); if( idx == pnettop) - throw(SQL,"cquery.error","Continuous procedure %s.%s not accessible",sch,fcn); + throw(SQL,"cquery.error","The continuous procedure %s.%s is not accessible\n",sch,fcn); pnet[idx].error = GDKstrdup(error); return MAL_SUCCEED; @@ -336,7 +335,7 @@ CQshow(Client cntxt, MalBlkPtr mb, MalSt idx = CQlocate(sch, fcn); if( idx == pnettop) - throw(SQL,"cquery.show","continuous procedure %s.%s not accessible",sch,fcn); + throw(SQL,"cquery.show","The continuous procedure %s.%s is not accessible\n",sch,fcn); printFunction(cntxt->fdout, pnet[idx].mb, 0, LIST_MAL_NAME | LIST_MAL_VALUE | LIST_MAL_MAPI); return MAL_SUCCEED; @@ -535,7 +534,7 @@ CQprocedure(Client cntxt, MalBlkPtr mb, return msg; s = findSymbolInModule(cntxt->nspace, putName(nme)); if (s == NULL) - throw(SQL, "cqeury.procedure", "Definition of procedure missing"); + throw(SQL, "cquery.procedure", "Definition of procedure missing"); qry = s->def; chkProgram(cntxt->fdout,cntxt->nspace,qry); @@ -596,12 +595,12 @@ CQregister(Client cntxt, MalBlkPtr mb, M (void) pci; - if(cycles <= 0 && cycles != int_nil){ - msg = createException(SQL,"cquery.register","The cycles value must be positive"); + if(cycles < 0 && cycles != int_nil){ + msg = createException(SQL,"cquery.register","The cycles value must be non negative\n"); goto finish; } - if(heartbeats <= 0){ - msg = createException(SQL,"cquery.register","The heartbeats value must be positive"); + if(heartbeats < 0){ + msg = createException(SQL,"cquery.register","The heartbeats value must be non negative\n"); goto finish; } @@ -700,18 +699,22 @@ static str CQresumeInternal(Client cntxt, MalBlkPtr mb, int with_alter) { mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc; - str msg = MAL_SUCCEED, mb2str; + str msg = MAL_SUCCEED, mb2str = NULL; int idx, cycles, heartbeats; +#ifdef DEBUG_CQUERY + fprintf(stderr, "#resume scheduler\n"); +#endif + if(with_alter) { cycles = sqlcontext ? sqlcontext->cycles : int_nil; heartbeats = sqlcontext ? sqlcontext->heartbeats : 1; - if(cycles <= 0 && cycles != int_nil){ - msg = createException(SQL,"cquery.resume","The cycles value must be positive"); + if(cycles < 0 && cycles != int_nil){ + msg = createException(SQL,"cquery.resume","The cycles value must be non negative\n"); goto finish; } - if(heartbeats <= 0){ - msg = createException(SQL,"cquery.resume","The heartbeats value must be positive"); + if(heartbeats < 0){ + msg = createException(SQL,"cquery.resume","The heartbeats value must be non negative\n"); goto finish; } } @@ -722,16 +725,14 @@ CQresumeInternal(Client cntxt, MalBlkPtr goto unlock; } if( idx == pnettop) { - msg = createException(SQL, "cquery.resume", "continuous procedure %s has not yet started\n", mb2str); - GDKfree(mb2str); + msg = createException(SQL, "cquery.resume", "The continuous procedure %s has not yet started\n", mb2str); goto unlock; } - if( pnet[idx].status != CQPAUSE) + if( pnet[idx].status != CQPAUSE) { + msg = createException(SQL, "cquery.resume", "The continuous procedure %s is already running\n", mb2str); goto unlock; + } -#ifdef DEBUG_CQUERY - fprintf(stderr, "#resume scheduler\n"); -#endif pnet[idx].status = CQWAIT; if(with_alter) { pnet[idx].cycles = cycles; @@ -744,43 +745,13 @@ CQresumeInternal(Client cntxt, MalBlkPtr } unlock: + if(mb2str) + GDKfree(mb2str); MT_lock_unset(&ttrLock); finish: return msg; } -static str -CQresumeInternalRanges(int first, int last) -{ - str msg = MAL_SUCCEED; - -#ifdef DEBUG_CQUERY - fprintf(stderr, "#resume scheduler\n"); -#endif - for( ; first < last; first++) - pnet[first].status = CQWAIT; - - /* start the scheduler if needed */ - if(CQinit == 0) { - msg = CQstartScheduler(); - } - return msg; -} - -str -CQresumeAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) -{ - str msg; - (void) cntxt; - (void) mb; - (void) stk; - (void) pci; - MT_lock_set(&ttrLock); - msg = CQresumeInternalRanges(0, pnettop); - MT_lock_unset(&ttrLock); - return msg; -} - str CQresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { @@ -801,7 +772,7 @@ CQresume(Client cntxt, MalBlkPtr mb, Mal } if( k >= 0 ) return CQresumeInternal(cntxt, mb, 1); - throw(SQL,"cquery.resume","continuous procedure %s.%s not found", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k))); + throw(SQL,"cquery.resume","The continuous procedure %s.%s was not found\n", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k))); } str @@ -824,33 +795,52 @@ CQresumeNoAlter(Client cntxt, MalBlkPtr } if( k >= 0 ) return CQresumeInternal(cntxt, mb, 0); - throw(SQL,"cquery.resume","continuous procedure %s.%s not found", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k))); + throw(SQL,"cquery.resume","The continuous procedure %s.%s was not found\n", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k))); } -static str -CQpauseInternalRanges(int first, int last) +str +CQresumeAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { + str msg = MAL_SUCCEED; + int i; + + (void) cntxt; + (void) mb; + (void) stk; + (void) pci; + #ifdef DEBUG_CQUERY - fprintf(stderr, "#pause cqueries\n"); + fprintf(stderr, "#resume scheduler\n"); #endif - for( ; first < last; first++) - pnet[first].status = CQPAUSE; - return MAL_SUCCEED; + + MT_lock_set(&ttrLock); + for(i = 0 ; i < pnettop; i++) + pnet[i].status = CQWAIT; + + /* start the scheduler if needed */ + if(CQinit == 0) { + msg = CQstartScheduler(); + } + MT_lock_unset(&ttrLock); + return msg; } static str CQpauseInternal(MalBlkPtr mb) { int idx; - str msg = MAL_SUCCEED, mb2str; + str msg = MAL_SUCCEED, mb2str = NULL; MT_lock_set(&ttrLock); if(CQlocateMb(mb, &idx, &mb2str, "cquery.pause") != MAL_SUCCEED) { goto finish; } if( idx == pnettop) { - msg = createException(SQL, "cquery.pause", "continuous procedure %s has not yet started\n", mb2str); - GDKfree(mb2str); + msg = createException(SQL, "cquery.pause", "The continuous procedure %s has not yet started\n", mb2str); + goto finish; + } + if( pnet[idx].status == CQPAUSE) { + msg = createException(SQL, "cquery.pause", "The continuous procedure %s is already paused\n", mb2str); goto finish; } // actually wait if the query was running @@ -861,23 +851,11 @@ CQpauseInternal(MalBlkPtr mb) if( pnet[idx].status == CQWAIT) break; } - msg = CQpauseInternalRanges(idx, idx+1); -finish: - MT_lock_unset(&ttrLock); - return msg; -} + pnet[idx].status = CQPAUSE; -str -CQpauseAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) -{ - str msg; - (void) cntxt; - (void) mb; - (void) stk; - (void) pci; - //pause all - MT_lock_set(&ttrLock); - msg = CQpauseInternalRanges(0, pnettop); +finish: + if(mb2str) + GDKfree(mb2str); MT_lock_unset(&ttrLock); return msg; } @@ -902,7 +880,37 @@ CQpause(Client cntxt, MalBlkPtr mb, MalS } if( k >= 0) return CQpauseInternal(mb); - throw(SQL,"cquery.pause","continuous procedure %s.%s not found", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k))); + throw(SQL,"cquery.pause","The continuous procedure %s.%s was not found\n", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k))); +} + +str +CQpauseAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + str msg = MAL_SUCCEED; + int i; + + (void) cntxt; + (void) mb; + (void) stk; + (void) pci; + +#ifdef DEBUG_CQUERY + fprintf(stderr, "#pause cqueries\n"); +#endif + + MT_lock_set(&ttrLock); + for(i = 0 ; i < pnettop; i++) { + while( pnet[i].status == CQRUNNING ){ + MT_lock_unset(&ttrLock); + MT_sleep_ms(5); + MT_lock_set(&ttrLock); + if( pnet[i].status == CQWAIT) + break; + } + pnet[i].status = CQPAUSE; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list