Er, let me rephrase: I don't recommend using the scheduler for *infinitely looping background tasks*.
On Monday, June 25, 2012 4:54:30 PM UTC-7, Michael Toomim wrote: > > This scenario is working out worse and worse. > > Now I'm getting tasks stuck in the 'RUNNING' state... even when there > aren't any scheduler processes running behind them running! I'm guessing > the server got killed mid-process, and now it doesn't know how to recover. > Looks like a bug in the scheduler. > > I don't recommend using the scheduler as a task queue to anybody. > > On Tuesday, June 12, 2012 10:24:15 PM UTC-7, Michael Toomim wrote: >> >> Here's a common scenario. I'm looking for the best implementation using >> the scheduler. >> >> I want to support a set of background tasks (task1, task2...), where each >> task: >> • processes a queue of items >> • waits a few seconds >> >> It's safe to have task1 and task2 running in parallel, but I cannot have >> two task1s running in parallel. They will duplicately process the same >> queue of items. >> >> I found the scheduler supports this nicely with parameters like: >> >> db.scheduler_task.insert(function_name='task1', >> task_name='task1', >> stop_time = now + timedelta(days=90000), >> repeats=0, >> period=10) >> >> I can launch 3 workers, and they coordinate amongst themselves to make >> sure that only one will run the task at a time. Great! This task will last >> forever... >> >> ...but now we encounter my problem... >> >> What happens if it crashes, or passes stop_time? Then the task will turn >> off, and the queue is no longer processed. Or what happens if I reset the >> database, or install this code on a new server? It isn't nice if I have to >> re-run the insert function by hand. >> >> So how can I ensure there is always EXACTLY ONE of each task in the >> database? >> >> I tried putting this code into models: >> >> def initialize_task_queue(task_name): >> num_tasks = db((db.scheduler_task.function_name == task_name) >> & ((db.scheduler_task.status == 'QUEUED') >> | (db.scheduler_task.status == 'ASSIGNED') >> | (db.scheduler_task.status == 'RUNNING') >> | (db.scheduler_task.status == 'ACTIVE'))).count() >> >> # Add a task if there isn't one already >> if num_tasks < 1: >> db.scheduler_task.insert(function_name=task_name, >> task_name=task_name, >> stop_time = now + timedelta(days=90000), >> repeats=0, >> period=period) >> db.commit() >> >> initialize_task_queue('task1') >> initialize_task_queue('task2') >> initialize_task_queue('task3') >> >> This worked, except it introduces a race condition! If you start three >> web2py processes simultaneously (e.g., for three scheduler processes), they >> will insert duplicate tasks: >> process 1: count number of 'task1' tasks >> process 2: count number of 'task1' tasks >> process 1: there are less than 1, insert a 'task1' task >> process 2: there are less than 1, insert a 'task1' task >> >> I was counting on postgresql's MVCC transaction support to make each of >> these atomic. Unfortunately, that's not how it works. I do not understand >> why. As a workaround, I'm currently wrapping the code inside >> "initialize_task_queue" with postgresql advisory lock: >> >> if not db.executesql('select pg_try_advisory_lock(1);')[0][0]: >> return >> >> ... count tasks, add one if needed ... >> >> db.executesql('select pg_advisory_unlock(1);') >> >> But this sucks. >> What's a better way to ensure there is always 1 infinite-repeat task in >> the scheduler? Or... am I using the wrong design entirely? >> > --