On Sun, 17 May 2009 21:34:58 -0700 (PDT), Allen Fowler <allen.fow...@yahoo.com> 
wrote:

>
>Hello,
>
>I have several CGI and cron scripts and that I would like coordinate via a 
>"First In
>/ First Out" style buffer.    That is, some processes are adding work
>units, and some take the oldest and start work on them.
>
>Could SQLite be used for this?  
>
>It would seem very complex to use SQL for just a FIFO, but then again, SQLite 
>would take acre of all ACID / concurrency issues.
>
>Has this been done before?
>
>Thanks,
>:)

For what it's worth, here you go.
Perhaps you can borrow a few ideas from it.


================= mkschema.sql =====================
--
-- schema for database job.db3
--
PRAGMA page_size=8192;
PRAGMA default_cache_size=512;

CREATE TABLE statustext (
        status    CHAR(1) PRIMARY KEY DEFAULT NULL
                CONSTRAINT sttxt_valid_status CHECK (status IN 
('W','I','R','T','A','C')),
        sttext VARCHAR(16)
);
INSERT INTO statustext (status,sttext)
 VALUES ('W','Wait');
INSERT INTO statustext (status,sttext)
 VALUES ('I','Initializing');
INSERT INTO statustext (status,sttext)
 VALUES ('R','Running');
INSERT INTO statustext (status,sttext)
 VALUES ('T','Terminated');
INSERT INTO statustext (status,sttext)
 VALUES ('A','Abend');
INSERT INTO statustext (status,sttext)
 VALUES ('C','Cancelled'); -- cancelled before dispatched

CREATE TABLE jobs (
        jobid  INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
        TSN       CHAR(4),
        jobprio   INTEGER  DEFAULT 9
 CONSTRAINT jobs_valid_prio
 CHECK (jobprio > 0 AND jobprio < 10),
        status    CHAR(1)  DEFAULT 'W'
 CONSTRAINT jobs_valid_status
 CHECK (status IN ('W','I','R','T','A','C')),
        userid    VARCHAR(8) NOT NULL,
        dtcreate  DATETIME DEFAULT CURRENT_TIMESTAMP,
        dtinit    DATETIME DEFAULT NULL,
        dtstart   DATETIME DEFAULT NULL,
        dtstop    DATETIME DEFAULT NULL,
        dtmodify  DATETIME DEFAULT CURRENT_TIMESTAMP,
        dtdnload  DATETIME DEFAULT NULL,
        cmnd      VARCHAR(254),  -- cmnd\*.cmd to execute
        pars      VARCHAR(254),  -- parameters for procedure 
(host,userid,filename[,type,elementname])
        rc        INTEGER,       -- ERRORLEVEL
        endmsg    VARCHAR(254),  -- message from dispatcher
        sysout    VARCHAR(254),  -- logfile
        dnload    VARCHAR(254)   -- file to download
);
CREATE INDEX idx_jobs_tsn ON jobs(TSN);

CREATE TABLE pars (
        parid INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
        jobid INTEGER CONSTRAINT fk_pars_jobs REFERENCES jobs (jobid)
                ON DELETE CASCADE,
        partx TEXT
);

CREATE TRIGGER jobs_ins AFTER INSERT ON jobs
FOR EACH ROW BEGIN
        UPDATE jobs 
                SET TSN = substr(10000000 + NEW.jobid,5,4)
                WHERE jobid = NEW.jobid;
        DELETE FROM jobs 
                WHERE jobid < (NEW.jobid - 9999) 
                AND TSN <= NEW.TSN;
END;

CREATE TRIGGER jobs_upd AFTER UPDATE ON jobs
FOR EACH ROW BEGIN
        UPDATE jobs SET
                rc       = CASE
                        WHEN OLD.status == 'W' AND  NEW.status == 'I'
 THEN NULL
                        ELSE NEW.rc END,
                endmsg   = CASE
                        WHEN OLD.status == 'W' AND  NEW.status == 'I'
 THEN NULL
                        ELSE NEW.endmsg END,
                dtinit   = CASE
                        WHEN OLD.status == 'W' AND  NEW.status == 'I'
 THEN CURRENT_TIMESTAMP
                        ELSE OLD.dtinit END,
                dtstart  = CASE
                        WHEN OLD.status == 'I' AND  NEW.status == 'R'
 THEN CURRENT_TIMESTAMP
                        ELSE OLD.dtstart END,
                dtstop   = CASE
                        WHEN OLD.status == 'R' AND (NEW.status == 'A' OR 
NEW.status == 'T') THEN CURRENT_TIMESTAMP
                        ELSE OLD.dtstop END,
                dtmodify = CASE
                        WHEN NEW.status = OLD.status THEN dtmodify
                        ELSE CURRENT_TIMESTAMP END
        WHERE jobid = NEW.jobid;
END;

-- on delete cascade
CREATE TRIGGER jobs_del AFTER DELETE ON jobs
FOR EACH ROW BEGIN
        DELETE FROM pars WHERE pars.jobid = OLD.jobid;
END;

-- references
CREATE TRIGGER fki_pars_jobs
BEFORE INSERT ON pars
FOR EACH ROW BEGIN
        SELECT RAISE(ROLLBACK, 'insert on table "pars" violates foreign key 
constraint "fk_pars_jobs(i)"')
        WHERE NEW.jobid IS NOT NULL AND (SELECT jobid FROM jobs WHERE jobid = 
NEW.jobid) IS NULL;
END;

-- references
CREATE TRIGGER fku_pars_jobs
BEFORE UPDATE ON pars
FOR EACH ROW BEGIN
        SELECT RAISE(ROLLBACK, 'update on table "pars" violates foreign key 
constraint "fk_pars_jobs(u)"')
        WHERE NEW.jobid IS NOT NULL AND (SELECT jobid FROM jobs WHERE jobid = 
NEW.jobid) IS NULL;
END;

CREATE VIEW shjobsta AS -- for use in .cmd scripts
 SELECT
TSN
,status
,userid
,datetime(dtcreate,'localtime') AS spoolin
,datetime(dtinit  ,'localtime') AS dispatch
,datetime(dtstart ,'localtime') AS logon
,datetime(dtstop  ,'localtime') AS logoff
,cmnd
,pars
,rc
,endmsg
 FROM jobs
 ORDER BY jobid DESC;


CREATE VIEW show_job_status AS -- information for users
 SELECT
'<a href="shtsn.php?TSN='|| TSN ||'">'|| TSN ||'</a>' AS TSN
,sttext AS status
,userid
,datetime(dtcreate,'localtime') AS spoolin
,    time(dtstart ,'localtime') AS logon
,    time(dtstop  ,'localtime') AS logoff
,cmnd
,rc
,endmsg
,datetime(dtdnload,'localtime') AS downloaded
 FROM jobs INNER JOIN statustext USING (status)
 ORDER BY jobid DESC LIMIT 50;

CREATE VIEW show_tsn AS -- information for users, with download links after 
termination
 SELECT TSN
,sttext AS status
,userid
,datetime(dtcreate,'localtime') AS spoolin
,datetime(dtinit  ,'localtime') AS dispatch
,datetime(dtstart ,'localtime') AS logon
,datetime(dtstop  ,'localtime') AS logoff
,datetime(dtdnload,'localtime') AS download
,cmnd
,'<a href="shpar.php?TSN='|| TSN ||'">'|| pars ||'</a>' as parameters
,rc
,endmsg
,CASE WHEN status = 'A' OR status = 'T'
 THEN '<a href="shsysout.php?TSN='|| TSN  ||'">'|| sysout ||'</a>'
 ELSE '' END AS sysout
,CASE WHEN status = 'T'
 THEN '<a href="getresult/'|| TSN || '.zip">' || dnload || '</a>'
 ELSE '' END AS download_file
 FROM jobs INNER JOIN statustext USING (status)
 ORDER BY jobid DESC LIMIT 50;


CREATE VIEW show_par AS -- information for users
 SELECT TSN,userid,partx AS parameterwaarde
 FROM jobs INNER JOIN pars USING (jobid)
 WHERE jobid > (SELECT (SELECT MAX(jobid) FROM jobs) - 50)
 ORDER BY jobid DESC, parid;


CREATE VIEW get_result AS -- download
 SELECT TSN
,userid
,CASE WHEN status = 'T' THEN dnload ELSE '' END AS dnload
 FROM jobs
 ORDER BY jobid DESC;


CREATE VIEW get_sysout AS
 SELECT TSN
,userid
,CASE WHEN status = 'T' THEN sysout ELSE '' END AS sysout
 FROM jobs
 ORDER BY jobid DESC;

CREATE VIEW get_init_prop AS
 SELECT TSN
,status
,userid
,cmnd
,pars
   FROM jobs WHERE status == 'I' ORDER BY jobprio,TSN LIMIT 1;
PRAGMA user_version=2;

DROP VIEW IF EXISTS dispatch_job;
CREATE VIEW dispatch_job AS
 SELECT TSN
,status
,userid
,cmnd
,pars
   FROM jobs WHERE status == 'W' ORDER BY jobprio,TSN LIMIT 1;
PRAGMA user_version=2;

DROP TRIGGER IF EXISTS upd_init_job;
DROP TRIGGER IF EXISTS upd_dispatch_job;
CREATE TRIGGER upd_dispatch_job
INSTEAD OF UPDATE ON dispatch_job
WHEN NEW.status = 'I'
BEGIN
        UPDATE jobs SET status = 'I' WHERE TSN = NEW.TSN;
END;
-- procedure to dispatch a job:
-- BEGIN;
-- UPDATE dispatch_job set STATUS = 'I';
-- SELECT * FROM get_init_prop;
-- COMMIT;
--
-- calculate refresh interval of status pages
-- SELECT CURRENT_TIMESTAMP,MAX(dtmodify),
-- strftime('%s',CURRENT_TIMESTAMP),
-- strftime('%s',MAX(dtmodify)),
-- strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify)) AS ago
-- FROM jobs;

CREATE VIEW refreshinterval_admin AS
SELECT CASE
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) <    60
 THEN   15 -- less than a minute   : every 15 seconds
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) <   180
 THEN   30 -- less than 3 minutes  : every 30 seconds
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) <   360
 THEN   60 -- less than 6 minutes  : every minute
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) <  1800
 THEN  600 -- less than 30 minutes : every 10 mins
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) <  3600
 THEN 1200 -- less than an hour    : every 20 mins
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 86400
 THEN 3600 -- less than an day     : every hour
 ELSE 7200 END AS refresh -- else every two hours
FROM jobs;
--
DROP VIEW IF EXISTS refreshinterval_user;
CREATE VIEW refreshinterval_user AS
SELECT userid,CASE
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) <    60
 THEN   15 -- less than a minute   : every 15 seconds
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) <   180
 THEN   30 -- less than 3 minutes  : every 30 seconds
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) <   360
 THEN   60 -- less than 6 minutes  : every minute
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) <  1800
 THEN  600 -- less than 30 minutes : every 10 mins
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) <  3600
 THEN 1200 -- less than an hour    : every 20 mins
 WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 86400
 THEN 3600 -- less than an day     : every hour
 ELSE 7200 END AS refresh -- else every two hours, for users per user
FROM jobs GROUP BY userid;
PRAGMA user_version=4;


============ dispatch.cmd ==============
@echo off
:: Job dispatcher
goto MAIN

:: --------------------------------------
:: level 3 subroutines called by level 2
:: --------------------------------------

:: --
:: Show job status
:: --
:: Note: use  quoted string for selects/updates WHERE TSN==
:SHJOBSTA
set SQL=SELECT * FROM shjobsta WHERE TSN=='%JOB_TSN%';
%GNU_ECHO% "%SQL_PFX%.mode line\n%SQL%" | %SQLITE% %PAR_SDB%
if errorlevel 1 goto SQLERR
goto Z

:: --------------------------------------
:: level 2 subroutines called by DISPATCH
:: --------------------------------------

:: --
:: set date and time for display and log
:: --
:GETDT
set MYDATE=%DATE%
set MYTIME=%TIME:~0,8%
set MYTIME=%MYTIME: =0%
goto Z

:: --
:: retrieve a command and its params from the scheduling database
:: --
:GETJOB
echo getjob
:: unset job properties
for %%v in (TSN status dtcreate dtstart dtstop dtmodify userid cmnd pars parlst 
rc endmsg sysout dnload) do set JOB_%%v=
:: create a stream to set new job properties (set JOB_keyword=value)
set SQL=BEGIN;\nUPDATE dispatch_job set STATUS = 'I';\nSELECT * FROM 
get_init_prop;\nCOMMIT;
%GNU_ECHO% "%SQL_PFX%.mode line\n%SQL%" | %SQLITE% %PAR_SDB% | %GNU_AWK%
"{gsub(/^[[:blank:]]+/,\"\");gsub(/[[:blank:]]=[[:blank:]]/,\"=\");print \"set 
JOB_\" $0}" >%TMPDIR%\SSN#%PAR_SSN%.bat
if errorlevel 1 goto SQLERR
call %TMPDIR%\SSN#%PAR_SSN%.bat
del  %TMPDIR%\SSN#%PAR_SSN%.bat
if "%JOB_TSN%"=="" goto QEMPTY
if "%JOB_TSN%"=="%PRV_TSN%" goto DUPTSN
set PRV_TSN=%JOB_TSN%
set JOB_rc=0
:: Retrieve parameter list from database
set JOB_parlst=%TMPDIR%/%JOB_TSN%.par
set JOB_dnload=%TMPDIR%/%JOB_TSN%.zip
if exist "%TMPDIR%\%JOB_TSN%.zip" del "%TMPDIR%\%JOB_TSN%.zip"
set SQL=SELECT partx FROM pars INNER JOIN jobs USING (jobid) WHERE 
TSN=='%JOB_TSN%';
%GNU_ECHO% "%SQL_PFX%.mode list\n%SQL%" | %SQLITE% %PAR_SDB% >%JOB_parlst%
if errorlevel 1 goto SQLERR
goto Z

:DUPTSN
echo Duplicate TSN %JOB_TSN%, stop dispatching
set JOB_TSN=
:QEMPTY
echo No more waiting jobs, last job was %PRV_TSN%.
goto Z

:: --
:: execute a job
:: --
:EXEJOB
@echo off
echo exejob %JOB_TSN%
set SQL=UPDATE jobs SET status = 'R' WHERE TSN=='%JOB_TSN%';
%GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB%
if errorlevel 1 goto SQLERR
set JOB_endmsg=Undefined cmnd
if not exist cmnd\%JOB_cmnd%.cmd goto :CMDNF
call :SHJOBSTA >"%JOB_sysout%" 2>&1
set JOB_endmsg=Error in `%JOB_cmnd%.cmd`
@echo on
@call cmnd\%JOB_cmnd%.cmd %JOB_pars% >>"%JOB_sysout%" 2>&1
set JOB_rc=%ERRORLEVEL%
if "%JOB_rc%"=="" set JOB_rc=0
@echo off
if "%JOB_rc%"=="0" goto NORMEND
if "%JOB_rc%"=="1" goto WARNING
if "%JOB_rc%"=="2" goto WARNING
if "%JOB_rc%"=="3" goto WARNING
if "%JOB_rc%"=="4" goto WARNING
if "%JOB_rc%"=="5" goto WARNING
if "%JOB_rc%"=="6" goto WARNING
if "%JOB_rc%"=="7" goto WARNING
if "%JOB_rc%"=="8" goto WARNING
if "%JOB_rc%"=="9" goto WARNING
goto ABEND

:: No error or just a warning (1)
:WARNING
set JOB_endmsg=%JOB_endmsg% (with warnings)
:NORMEND
if "%JOB_endmsg%"=="" set JOB_endmsg=Normal end
echo %JOB_endmsg%
if NOT exist %JOB_dnload% set JOB_dnload=
set SQL=UPDATE jobs SET status='T', rc='%JOB_rc%', endmsg='%JOB_endmsg%', 
sysout = CASE WHEN '%JOB_sysout%'=='' THEN
NULL ELSE '%JOB_sysout:log/=%' END, dnload= CASE WHEN '%JOB_dnload%'=='' THEN 
NULL ELSE '%JOB_dnload:tmp/=%' END WHERE
TSN=='%JOB_TSN%' AND status=='R';
%GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB%
if errorlevel 1 goto SQLERR
call :SHJOBSTA >>"%JOB_sysout%" 2>&1
if exist "%TMPDIR%\%JOB_TSN%.par" del "%TMPDIR%\%JOB_TSN%.par"
goto Z

:CMDNF
@echo off
if "%JOB_endmsg%"=="" set JOB_endmsg=No procedure found to handle command 
'%JOB_cmnd%'
:: set errorlevel without having to use exit /b
%GNU_AWK% "BEGIN{exit 253}"
:ABEND
@echo off
if "%JOB_endmsg%"=="" set JOB_endmsg=No reason specified
echo %MYDATE% %MYTIME% %JOB_TSN% Abend # %JOB_rc% `%JOB_endmsg%`
SQL=UPDATE jobs SET status = 'A', rc = '%JOB_rc%', endmsg = '%JOB_endmsg%', 
sysout = CASE WHEN '%JOB_sysout%'=='' THEN
NULL ELSE '%JOB_sysout:log/=%' END WHERE TSN=='%JOB_TSN%';
%GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB%
if errorlevel 1 goto SQLERR
call :SHJOBSTA
:: The user proc may have failed, but the dispatcher is healthy
exit /B 0

:SQLERR
echo Error during SQL processing, can't continue
echo Offending statement:
echo %SQL%
exit /B 3

:: ======================================
:: level 1 subroutines called by MAIN
:: ======================================

:: --
:: Dispatch a job (fetch from queue, execute)
:: --
:DISPATCH
:DISPNEXT
call :GETJOB
if errorlevel 1 goto R
call :GETDT
if "%JOB_TSN%"=="" goto DISPDONE
set JOB_sysout=log/%JOB_TSN%.txt
PROMPT $$$S
call :EXEJOB
if errorlevel 1 goto R
PROMPT $P$G
call :GETDT
echo %MYDATE% %MYTIME% 8 %0 %PAR_SSN% %JOB_TSN% %JOB_rc% %JOB_endmsg%
echo %MYDATE% %MYTIME% 8 %0 %PAR_SSN% %JOB_TSN% %JOB_rc% 
%JOB_endmsg%>>log\log.txt
goto DISPNEXT

:DISPDONE
call :GETDT
echo %MYDATE% %MYTIME% 9 %0 %PAR_SSN%
echo %MYDATE% %MYTIME% 9 %0 %PAR_SSN% >>log\log.txt
goto Z

::
:: Reset a TSN from status A to W
:RESET
set SQL=UPDATE jobs SET status = 'W' WHERE TSN=='%1' AND status IN 
('A','I','T','R');
%GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB%
if errorlevel 1 goto SQLERR
set PAR_RESTART=
goto Z

:: ====================================
:: level 0 MAIN entrypoint
:: dispatch [RESTART=TSN#] [SSN=SSN#] [SDB=jobdbpathfile]
::
:: ====================================

:MAIN
call \data\opt\cfg\setenv.cmd
call \data\opt\cfg\setdir.cmd
set SQL_PFX=.echo off\n.bail on\n.timeout 1000\n
cd /D %0\..
call \data\opt\cfg\%cfg%\setdrives.cmd >log\drives.txt
if errorlevel 1 goto P01
:: reset all possible parameters
for %%p in (RESTART SDB SSN) do set PAR_%%p=
:: set defaults
:: - schedule serial number (TSN is fetched from the job database)
set PAR_SSN=@@@@
:: - job database
set PAR_SDB=%DRIV6%\data\opt\db\li\job.db3

:GETPAR
if "%1"=="" goto PROCESS
if "%2"=="" goto P02
set PAR_%1=%2
shift
shift
goto GETPAR

:PROCESS
set PAR_ >log\SSN#%PAR_SSN%.txt
set DRIV >>log\SSN#%PAR_SSN%.txt
if DEFINED PAR_RESTART call :RESET %PAR_RESTART% >>log\SSN#%PAR_SSN%.txt
set PRV_TSN=@@@@
echo %MYDATE% %MYTIME% 1 %0 %PAR_SSN% %PAR_SDB% >>log\log.txt
echo %MYDATE% %MYTIME% 1 %0 %PAR_SSN% %PAR_SDB% >>log\SSN#%PAR_SSN%.txt

:: dispatcher loop, one task at a time
call :DISPATCH >>log\SSN#%PAR_SSN%.txt 2>&1
if "%PAR_SSN%"=="@@@@" goto Z
@cls
@exit /b 0

:: MAIN Environment and Parameter errors
:P01
echo Can't get all required driveletters.
goto R

:P02
echo Parameters must be specified as pairs 'name value' or 'name=value'
echo dispatch [RESTART=TSN#] [SSN=SSN#] [SDB=jobdbpathfile]
goto R

:: Dispatcher errors
:R
PROMPT $P$G
exit /B 1
:Z
@echo off



======= php fragment to create a job with or without parameters ====
======= it's part of a class which extends PDO                  ====
======= I prefer  php_pdo_sqlite_external                       ====


        function enter_job($userid,$cmnd,$parlist,$ntuid,$ntpsw,$jobprio = 8){
                $msg = '';
                /* Execute a prepared statement by passing an array of values  
--> */
                $sql = 'INSERT INTO jobs (userid,cmnd,pars,jobprio) VALUES 
(:userid,:cmnd,:pars,:jobprio)';
                $stjob = $this->prepare($sql);
                $this->beginTransaction();

                $stjob->execute(array(':userid' => $userid, ':cmnd' => $cmnd, 
':pars' => '@list', ':jobprio' => $jobprio));
        /*
         *
         *      This will accommodate up to 99 999 999 requests,
         *      then we have to reset by deleting the database.
         *      It will be rebuilt automatically
         *               01234567 oO0
         */
                $jobid = $this->lastInsertId();
                $tsn   = substr(10000000 + $jobid,4,4);

                if (isset($parlist)){
                        /* there are parameters */
                        $sql = 'INSERT INTO pars (jobid,partx) VALUES 
(:jobid,:partx)';
                        $stpar = $this->prepare($sql);
                        if (is_array($parlist)){
                                /* we got a text array with params */
                                foreach ($parlist as $aval){
                                        $stpar->execute(array(':jobid' => 
$jobid, ':partx' => $aval));
                                }
                        } else if (is_object($parlist)) {
                                /* we got a resultset from a query as paramlist 
*/
                                while ($row = $parlist->fetch(PDO::FETCH_NUM)){
                                        $stpar->execute(array(':jobid' => 
$jobid, ':partx' => $row[0]));
                                }
                        }
                } else {
                }
                $this->commit();

/*
 * ugly code to launch the dispatcher asynchronously using 
 * Windows schtasks.exe is left to the imagination of the reader
 */

        }
-- 
  (  Kees Nuyt
  )
c[_]
_______________________________________________
sqlite-users mailing list
sqlite-users@sqlite.org
http://sqlite.org:8080/cgi-bin/mailman/listinfo/sqlite-users

Reply via email to