from sqlalchemy import create_engine, Table, Column, Integer, String, DateTime, func, select, and_, MetaData
from sqlalchemy.pool import NullPool

engine = create_engine('sqlite:///jobs.db', poolclass=NullPool)
meta = MetaData()
fifo = Table('fifo', meta, 
        Column("id", Integer, primary_key=True),
        Column("task", String(500), nullable=False),
        Column("arguments", String(500), nullable=False),
        Column("timestamp", DateTime, default=func.now(), nullable=False),
        Column("started", DateTime),
        Column("completed", DateTime),
        Column("procid", String(20)),
        Column("status", String(1000))
)

meta.create_all(engine)

def add_job(fn, arguments):
    engine.execute(fifo.insert(), task=fn.__name__, arguments=repr(arguments))

def process(modname):
    """given a module name, process jobs using functions from that module."""

    def proc(module):
        import sys, os
        module = sys.modules[modname]
        while True:
            
            if not engine.execute(
                    select([func.count(1)]).select_from(fifo).where(
                        and_(fifo.c.procid==None, fifo.c.completed==None)
                    )
                ).scalar():
                time.sleep(2)
                continue
            
            # update jobs with our pid, commit transaction
            engine.execute(
                fifo.update().values(procid=os.getpid(), started=func.now()).\
                where(
                    and_(fifo.c.procid==None, fifo.c.completed==None)
                )
            )
        
            # get all jobs to process, close cursor and rollback transaction
            jobs = engine.execute(
                        fifo.select().where(fifo.c.procid==os.getpid()).\
                        where(fifo.c.completed==None).order_by(fifo.c.timestamp)
                    ).fetchall()
                    
            for job in jobs:
                fn = getattr(module, job['task'])
                arguments = eval(job['arguments']) # substitute with serialization style of choice
            
                try:
                    fn(*arguments)
                    status = "Completed"
                except Exception, e:
                    status = "Failed: %s" % e
                finally:
                    # commit status for each completion individually
                    engine.execute(
                        fifo.update().values(completed=func.now(), status=status).where(fifo.c.id==job['id'])
                    )
            
    import threading
    t = threading.Thread(target=proc, args=(modname, ))
    t.daemon = True
    t.start()
    
            
if __name__ == '__main__':
    import time
    
    process(__name__)
    
    def job1(x, y):
        print "adding x+y: %d"  % (x + y)
        
    def job2(x, y):
        print "multiplying x*y: %d"  % (x * y)
        
    for job, args in (
        (job1, (3, 5)),
        (job2, (5, 12)),
        (job2, (2, 17)),
        (job1, (8, 5)),
    ):
        add_job(job, args)

    time.sleep(3)    
        
    print "completed status:", "\n".join(repr(r) for r in engine.execute(fifo.select()).fetchall())
    

