On Sun, 17 May 2009 21:34:58 -0700 (PDT), Allen Fowler <[email protected]>
wrote:
>
>Hello,
>
>I have several CGI and cron scripts and that I would like coordinate via a
>"First In
>/ First Out" style buffer. That is, some processes are adding work
>units, and some take the oldest and start work on them.
>
>Could SQLite be used for this?
>
>It would seem very complex to use SQL for just a FIFO, but then again, SQLite
>would take acre of all ACID / concurrency issues.
>
>Has this been done before?
>
>Thanks,
>:)
For what it's worth, here you go.
Perhaps you can borrow a few ideas from it.
================= mkschema.sql =====================
--
-- schema for database job.db3
--
PRAGMA page_size=8192;
PRAGMA default_cache_size=512;
CREATE TABLE statustext (
status CHAR(1) PRIMARY KEY DEFAULT NULL
CONSTRAINT sttxt_valid_status CHECK (status IN
('W','I','R','T','A','C')),
sttext VARCHAR(16)
);
INSERT INTO statustext (status,sttext)
VALUES ('W','Wait');
INSERT INTO statustext (status,sttext)
VALUES ('I','Initializing');
INSERT INTO statustext (status,sttext)
VALUES ('R','Running');
INSERT INTO statustext (status,sttext)
VALUES ('T','Terminated');
INSERT INTO statustext (status,sttext)
VALUES ('A','Abend');
INSERT INTO statustext (status,sttext)
VALUES ('C','Cancelled'); -- cancelled before dispatched
CREATE TABLE jobs (
jobid INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
TSN CHAR(4),
jobprio INTEGER DEFAULT 9
CONSTRAINT jobs_valid_prio
CHECK (jobprio > 0 AND jobprio < 10),
status CHAR(1) DEFAULT 'W'
CONSTRAINT jobs_valid_status
CHECK (status IN ('W','I','R','T','A','C')),
userid VARCHAR(8) NOT NULL,
dtcreate DATETIME DEFAULT CURRENT_TIMESTAMP,
dtinit DATETIME DEFAULT NULL,
dtstart DATETIME DEFAULT NULL,
dtstop DATETIME DEFAULT NULL,
dtmodify DATETIME DEFAULT CURRENT_TIMESTAMP,
dtdnload DATETIME DEFAULT NULL,
cmnd VARCHAR(254), -- cmnd\*.cmd to execute
pars VARCHAR(254), -- parameters for procedure
(host,userid,filename[,type,elementname])
rc INTEGER, -- ERRORLEVEL
endmsg VARCHAR(254), -- message from dispatcher
sysout VARCHAR(254), -- logfile
dnload VARCHAR(254) -- file to download
);
CREATE INDEX idx_jobs_tsn ON jobs(TSN);
CREATE TABLE pars (
parid INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
jobid INTEGER CONSTRAINT fk_pars_jobs REFERENCES jobs (jobid)
ON DELETE CASCADE,
partx TEXT
);
CREATE TRIGGER jobs_ins AFTER INSERT ON jobs
FOR EACH ROW BEGIN
UPDATE jobs
SET TSN = substr(10000000 + NEW.jobid,5,4)
WHERE jobid = NEW.jobid;
DELETE FROM jobs
WHERE jobid < (NEW.jobid - 9999)
AND TSN <= NEW.TSN;
END;
CREATE TRIGGER jobs_upd AFTER UPDATE ON jobs
FOR EACH ROW BEGIN
UPDATE jobs SET
rc = CASE
WHEN OLD.status == 'W' AND NEW.status == 'I'
THEN NULL
ELSE NEW.rc END,
endmsg = CASE
WHEN OLD.status == 'W' AND NEW.status == 'I'
THEN NULL
ELSE NEW.endmsg END,
dtinit = CASE
WHEN OLD.status == 'W' AND NEW.status == 'I'
THEN CURRENT_TIMESTAMP
ELSE OLD.dtinit END,
dtstart = CASE
WHEN OLD.status == 'I' AND NEW.status == 'R'
THEN CURRENT_TIMESTAMP
ELSE OLD.dtstart END,
dtstop = CASE
WHEN OLD.status == 'R' AND (NEW.status == 'A' OR
NEW.status == 'T') THEN CURRENT_TIMESTAMP
ELSE OLD.dtstop END,
dtmodify = CASE
WHEN NEW.status = OLD.status THEN dtmodify
ELSE CURRENT_TIMESTAMP END
WHERE jobid = NEW.jobid;
END;
-- on delete cascade
CREATE TRIGGER jobs_del AFTER DELETE ON jobs
FOR EACH ROW BEGIN
DELETE FROM pars WHERE pars.jobid = OLD.jobid;
END;
-- references
CREATE TRIGGER fki_pars_jobs
BEFORE INSERT ON pars
FOR EACH ROW BEGIN
SELECT RAISE(ROLLBACK, 'insert on table "pars" violates foreign key
constraint "fk_pars_jobs(i)"')
WHERE NEW.jobid IS NOT NULL AND (SELECT jobid FROM jobs WHERE jobid =
NEW.jobid) IS NULL;
END;
-- references
CREATE TRIGGER fku_pars_jobs
BEFORE UPDATE ON pars
FOR EACH ROW BEGIN
SELECT RAISE(ROLLBACK, 'update on table "pars" violates foreign key
constraint "fk_pars_jobs(u)"')
WHERE NEW.jobid IS NOT NULL AND (SELECT jobid FROM jobs WHERE jobid =
NEW.jobid) IS NULL;
END;
CREATE VIEW shjobsta AS -- for use in .cmd scripts
SELECT
TSN
,status
,userid
,datetime(dtcreate,'localtime') AS spoolin
,datetime(dtinit ,'localtime') AS dispatch
,datetime(dtstart ,'localtime') AS logon
,datetime(dtstop ,'localtime') AS logoff
,cmnd
,pars
,rc
,endmsg
FROM jobs
ORDER BY jobid DESC;
CREATE VIEW show_job_status AS -- information for users
SELECT
'<a href="shtsn.php?TSN='|| TSN ||'">'|| TSN ||'</a>' AS TSN
,sttext AS status
,userid
,datetime(dtcreate,'localtime') AS spoolin
, time(dtstart ,'localtime') AS logon
, time(dtstop ,'localtime') AS logoff
,cmnd
,rc
,endmsg
,datetime(dtdnload,'localtime') AS downloaded
FROM jobs INNER JOIN statustext USING (status)
ORDER BY jobid DESC LIMIT 50;
CREATE VIEW show_tsn AS -- information for users, with download links after
termination
SELECT TSN
,sttext AS status
,userid
,datetime(dtcreate,'localtime') AS spoolin
,datetime(dtinit ,'localtime') AS dispatch
,datetime(dtstart ,'localtime') AS logon
,datetime(dtstop ,'localtime') AS logoff
,datetime(dtdnload,'localtime') AS download
,cmnd
,'<a href="shpar.php?TSN='|| TSN ||'">'|| pars ||'</a>' as parameters
,rc
,endmsg
,CASE WHEN status = 'A' OR status = 'T'
THEN '<a href="shsysout.php?TSN='|| TSN ||'">'|| sysout ||'</a>'
ELSE '' END AS sysout
,CASE WHEN status = 'T'
THEN '<a href="getresult/'|| TSN || '.zip">' || dnload || '</a>'
ELSE '' END AS download_file
FROM jobs INNER JOIN statustext USING (status)
ORDER BY jobid DESC LIMIT 50;
CREATE VIEW show_par AS -- information for users
SELECT TSN,userid,partx AS parameterwaarde
FROM jobs INNER JOIN pars USING (jobid)
WHERE jobid > (SELECT (SELECT MAX(jobid) FROM jobs) - 50)
ORDER BY jobid DESC, parid;
CREATE VIEW get_result AS -- download
SELECT TSN
,userid
,CASE WHEN status = 'T' THEN dnload ELSE '' END AS dnload
FROM jobs
ORDER BY jobid DESC;
CREATE VIEW get_sysout AS
SELECT TSN
,userid
,CASE WHEN status = 'T' THEN sysout ELSE '' END AS sysout
FROM jobs
ORDER BY jobid DESC;
CREATE VIEW get_init_prop AS
SELECT TSN
,status
,userid
,cmnd
,pars
FROM jobs WHERE status == 'I' ORDER BY jobprio,TSN LIMIT 1;
PRAGMA user_version=2;
DROP VIEW IF EXISTS dispatch_job;
CREATE VIEW dispatch_job AS
SELECT TSN
,status
,userid
,cmnd
,pars
FROM jobs WHERE status == 'W' ORDER BY jobprio,TSN LIMIT 1;
PRAGMA user_version=2;
DROP TRIGGER IF EXISTS upd_init_job;
DROP TRIGGER IF EXISTS upd_dispatch_job;
CREATE TRIGGER upd_dispatch_job
INSTEAD OF UPDATE ON dispatch_job
WHEN NEW.status = 'I'
BEGIN
UPDATE jobs SET status = 'I' WHERE TSN = NEW.TSN;
END;
-- procedure to dispatch a job:
-- BEGIN;
-- UPDATE dispatch_job set STATUS = 'I';
-- SELECT * FROM get_init_prop;
-- COMMIT;
--
-- calculate refresh interval of status pages
-- SELECT CURRENT_TIMESTAMP,MAX(dtmodify),
-- strftime('%s',CURRENT_TIMESTAMP),
-- strftime('%s',MAX(dtmodify)),
-- strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify)) AS ago
-- FROM jobs;
CREATE VIEW refreshinterval_admin AS
SELECT CASE
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 60
THEN 15 -- less than a minute : every 15 seconds
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 180
THEN 30 -- less than 3 minutes : every 30 seconds
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 360
THEN 60 -- less than 6 minutes : every minute
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 1800
THEN 600 -- less than 30 minutes : every 10 mins
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 3600
THEN 1200 -- less than an hour : every 20 mins
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 86400
THEN 3600 -- less than an day : every hour
ELSE 7200 END AS refresh -- else every two hours
FROM jobs;
--
DROP VIEW IF EXISTS refreshinterval_user;
CREATE VIEW refreshinterval_user AS
SELECT userid,CASE
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 60
THEN 15 -- less than a minute : every 15 seconds
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 180
THEN 30 -- less than 3 minutes : every 30 seconds
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 360
THEN 60 -- less than 6 minutes : every minute
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 1800
THEN 600 -- less than 30 minutes : every 10 mins
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 3600
THEN 1200 -- less than an hour : every 20 mins
WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 86400
THEN 3600 -- less than an day : every hour
ELSE 7200 END AS refresh -- else every two hours, for users per user
FROM jobs GROUP BY userid;
PRAGMA user_version=4;
============ dispatch.cmd ==============
@echo off
:: Job dispatcher
goto MAIN
:: --------------------------------------
:: level 3 subroutines called by level 2
:: --------------------------------------
:: --
:: Show job status
:: --
:: Note: use quoted string for selects/updates WHERE TSN==
:SHJOBSTA
set SQL=SELECT * FROM shjobsta WHERE TSN=='%JOB_TSN%';
%GNU_ECHO% "%SQL_PFX%.mode line\n%SQL%" | %SQLITE% %PAR_SDB%
if errorlevel 1 goto SQLERR
goto Z
:: --------------------------------------
:: level 2 subroutines called by DISPATCH
:: --------------------------------------
:: --
:: set date and time for display and log
:: --
:GETDT
set MYDATE=%DATE%
set MYTIME=%TIME:~0,8%
set MYTIME=%MYTIME: =0%
goto Z
:: --
:: retrieve a command and its params from the scheduling database
:: --
:GETJOB
echo getjob
:: unset job properties
for %%v in (TSN status dtcreate dtstart dtstop dtmodify userid cmnd pars parlst
rc endmsg sysout dnload) do set JOB_%%v=
:: create a stream to set new job properties (set JOB_keyword=value)
set SQL=BEGIN;\nUPDATE dispatch_job set STATUS = 'I';\nSELECT * FROM
get_init_prop;\nCOMMIT;
%GNU_ECHO% "%SQL_PFX%.mode line\n%SQL%" | %SQLITE% %PAR_SDB% | %GNU_AWK%
"{gsub(/^[[:blank:]]+/,\"\");gsub(/[[:blank:]]=[[:blank:]]/,\"=\");print \"set
JOB_\" $0}" >%TMPDIR%\SSN#%PAR_SSN%.bat
if errorlevel 1 goto SQLERR
call %TMPDIR%\SSN#%PAR_SSN%.bat
del %TMPDIR%\SSN#%PAR_SSN%.bat
if "%JOB_TSN%"=="" goto QEMPTY
if "%JOB_TSN%"=="%PRV_TSN%" goto DUPTSN
set PRV_TSN=%JOB_TSN%
set JOB_rc=0
:: Retrieve parameter list from database
set JOB_parlst=%TMPDIR%/%JOB_TSN%.par
set JOB_dnload=%TMPDIR%/%JOB_TSN%.zip
if exist "%TMPDIR%\%JOB_TSN%.zip" del "%TMPDIR%\%JOB_TSN%.zip"
set SQL=SELECT partx FROM pars INNER JOIN jobs USING (jobid) WHERE
TSN=='%JOB_TSN%';
%GNU_ECHO% "%SQL_PFX%.mode list\n%SQL%" | %SQLITE% %PAR_SDB% >%JOB_parlst%
if errorlevel 1 goto SQLERR
goto Z
:DUPTSN
echo Duplicate TSN %JOB_TSN%, stop dispatching
set JOB_TSN=
:QEMPTY
echo No more waiting jobs, last job was %PRV_TSN%.
goto Z
:: --
:: execute a job
:: --
:EXEJOB
@echo off
echo exejob %JOB_TSN%
set SQL=UPDATE jobs SET status = 'R' WHERE TSN=='%JOB_TSN%';
%GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB%
if errorlevel 1 goto SQLERR
set JOB_endmsg=Undefined cmnd
if not exist cmnd\%JOB_cmnd%.cmd goto :CMDNF
call :SHJOBSTA >"%JOB_sysout%" 2>&1
set JOB_endmsg=Error in `%JOB_cmnd%.cmd`
@echo on
@call cmnd\%JOB_cmnd%.cmd %JOB_pars% >>"%JOB_sysout%" 2>&1
set JOB_rc=%ERRORLEVEL%
if "%JOB_rc%"=="" set JOB_rc=0
@echo off
if "%JOB_rc%"=="0" goto NORMEND
if "%JOB_rc%"=="1" goto WARNING
if "%JOB_rc%"=="2" goto WARNING
if "%JOB_rc%"=="3" goto WARNING
if "%JOB_rc%"=="4" goto WARNING
if "%JOB_rc%"=="5" goto WARNING
if "%JOB_rc%"=="6" goto WARNING
if "%JOB_rc%"=="7" goto WARNING
if "%JOB_rc%"=="8" goto WARNING
if "%JOB_rc%"=="9" goto WARNING
goto ABEND
:: No error or just a warning (1)
:WARNING
set JOB_endmsg=%JOB_endmsg% (with warnings)
:NORMEND
if "%JOB_endmsg%"=="" set JOB_endmsg=Normal end
echo %JOB_endmsg%
if NOT exist %JOB_dnload% set JOB_dnload=
set SQL=UPDATE jobs SET status='T', rc='%JOB_rc%', endmsg='%JOB_endmsg%',
sysout = CASE WHEN '%JOB_sysout%'=='' THEN
NULL ELSE '%JOB_sysout:log/=%' END, dnload= CASE WHEN '%JOB_dnload%'=='' THEN
NULL ELSE '%JOB_dnload:tmp/=%' END WHERE
TSN=='%JOB_TSN%' AND status=='R';
%GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB%
if errorlevel 1 goto SQLERR
call :SHJOBSTA >>"%JOB_sysout%" 2>&1
if exist "%TMPDIR%\%JOB_TSN%.par" del "%TMPDIR%\%JOB_TSN%.par"
goto Z
:CMDNF
@echo off
if "%JOB_endmsg%"=="" set JOB_endmsg=No procedure found to handle command
'%JOB_cmnd%'
:: set errorlevel without having to use exit /b
%GNU_AWK% "BEGIN{exit 253}"
:ABEND
@echo off
if "%JOB_endmsg%"=="" set JOB_endmsg=No reason specified
echo %MYDATE% %MYTIME% %JOB_TSN% Abend # %JOB_rc% `%JOB_endmsg%`
SQL=UPDATE jobs SET status = 'A', rc = '%JOB_rc%', endmsg = '%JOB_endmsg%',
sysout = CASE WHEN '%JOB_sysout%'=='' THEN
NULL ELSE '%JOB_sysout:log/=%' END WHERE TSN=='%JOB_TSN%';
%GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB%
if errorlevel 1 goto SQLERR
call :SHJOBSTA
:: The user proc may have failed, but the dispatcher is healthy
exit /B 0
:SQLERR
echo Error during SQL processing, can't continue
echo Offending statement:
echo %SQL%
exit /B 3
:: ======================================
:: level 1 subroutines called by MAIN
:: ======================================
:: --
:: Dispatch a job (fetch from queue, execute)
:: --
:DISPATCH
:DISPNEXT
call :GETJOB
if errorlevel 1 goto R
call :GETDT
if "%JOB_TSN%"=="" goto DISPDONE
set JOB_sysout=log/%JOB_TSN%.txt
PROMPT $$$S
call :EXEJOB
if errorlevel 1 goto R
PROMPT $P$G
call :GETDT
echo %MYDATE% %MYTIME% 8 %0 %PAR_SSN% %JOB_TSN% %JOB_rc% %JOB_endmsg%
echo %MYDATE% %MYTIME% 8 %0 %PAR_SSN% %JOB_TSN% %JOB_rc%
%JOB_endmsg%>>log\log.txt
goto DISPNEXT
:DISPDONE
call :GETDT
echo %MYDATE% %MYTIME% 9 %0 %PAR_SSN%
echo %MYDATE% %MYTIME% 9 %0 %PAR_SSN% >>log\log.txt
goto Z
::
:: Reset a TSN from status A to W
:RESET
set SQL=UPDATE jobs SET status = 'W' WHERE TSN=='%1' AND status IN
('A','I','T','R');
%GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB%
if errorlevel 1 goto SQLERR
set PAR_RESTART=
goto Z
:: ====================================
:: level 0 MAIN entrypoint
:: dispatch [RESTART=TSN#] [SSN=SSN#] [SDB=jobdbpathfile]
::
:: ====================================
:MAIN
call \data\opt\cfg\setenv.cmd
call \data\opt\cfg\setdir.cmd
set SQL_PFX=.echo off\n.bail on\n.timeout 1000\n
cd /D %0\..
call \data\opt\cfg\%cfg%\setdrives.cmd >log\drives.txt
if errorlevel 1 goto P01
:: reset all possible parameters
for %%p in (RESTART SDB SSN) do set PAR_%%p=
:: set defaults
:: - schedule serial number (TSN is fetched from the job database)
set PAR_SSN=@@@@
:: - job database
set PAR_SDB=%DRIV6%\data\opt\db\li\job.db3
:GETPAR
if "%1"=="" goto PROCESS
if "%2"=="" goto P02
set PAR_%1=%2
shift
shift
goto GETPAR
:PROCESS
set PAR_ >log\SSN#%PAR_SSN%.txt
set DRIV >>log\SSN#%PAR_SSN%.txt
if DEFINED PAR_RESTART call :RESET %PAR_RESTART% >>log\SSN#%PAR_SSN%.txt
set PRV_TSN=@@@@
echo %MYDATE% %MYTIME% 1 %0 %PAR_SSN% %PAR_SDB% >>log\log.txt
echo %MYDATE% %MYTIME% 1 %0 %PAR_SSN% %PAR_SDB% >>log\SSN#%PAR_SSN%.txt
:: dispatcher loop, one task at a time
call :DISPATCH >>log\SSN#%PAR_SSN%.txt 2>&1
if "%PAR_SSN%"=="@@@@" goto Z
@cls
@exit /b 0
:: MAIN Environment and Parameter errors
:P01
echo Can't get all required driveletters.
goto R
:P02
echo Parameters must be specified as pairs 'name value' or 'name=value'
echo dispatch [RESTART=TSN#] [SSN=SSN#] [SDB=jobdbpathfile]
goto R
:: Dispatcher errors
:R
PROMPT $P$G
exit /B 1
:Z
@echo off
======= php fragment to create a job with or without parameters ====
======= it's part of a class which extends PDO ====
======= I prefer php_pdo_sqlite_external ====
function enter_job($userid,$cmnd,$parlist,$ntuid,$ntpsw,$jobprio = 8){
$msg = '';
/* Execute a prepared statement by passing an array of values
--> */
$sql = 'INSERT INTO jobs (userid,cmnd,pars,jobprio) VALUES
(:userid,:cmnd,:pars,:jobprio)';
$stjob = $this->prepare($sql);
$this->beginTransaction();
$stjob->execute(array(':userid' => $userid, ':cmnd' => $cmnd,
':pars' => '@list', ':jobprio' => $jobprio));
/*
*
* This will accommodate up to 99 999 999 requests,
* then we have to reset by deleting the database.
* It will be rebuilt automatically
* 01234567 oO0
*/
$jobid = $this->lastInsertId();
$tsn = substr(10000000 + $jobid,4,4);
if (isset($parlist)){
/* there are parameters */
$sql = 'INSERT INTO pars (jobid,partx) VALUES
(:jobid,:partx)';
$stpar = $this->prepare($sql);
if (is_array($parlist)){
/* we got a text array with params */
foreach ($parlist as $aval){
$stpar->execute(array(':jobid' =>
$jobid, ':partx' => $aval));
}
} else if (is_object($parlist)) {
/* we got a resultset from a query as paramlist
*/
while ($row = $parlist->fetch(PDO::FETCH_NUM)){
$stpar->execute(array(':jobid' =>
$jobid, ':partx' => $row[0]));
}
}
} else {
}
$this->commit();
/*
* ugly code to launch the dispatcher asynchronously using
* Windows schtasks.exe is left to the imagination of the reader
*/
}
--
( Kees Nuyt
)
c[_]
_______________________________________________
sqlite-users mailing list
[email protected]
http://sqlite.org:8080/cgi-bin/mailman/listinfo/sqlite-users