Er, let me rephrase: I don't recommend using the scheduler for *infinitely
looping background tasks*.
On Monday, June 25, 2012 4:54:30 PM UTC-7, Michael Toomim wrote:
>
> This scenario is working out worse and worse.
>
> Now I'm getting tasks stuck in the 'RUNNING' state... even when there
> aren't any scheduler processes running behind them running! I'm guessing
> the server got killed mid-process, and now it doesn't know how to recover.
> Looks like a bug in the scheduler.
>
> I don't recommend using the scheduler as a task queue to anybody.
>
> On Tuesday, June 12, 2012 10:24:15 PM UTC-7, Michael Toomim wrote:
>>
>> Here's a common scenario. I'm looking for the best implementation using
>> the scheduler.
>>
>> I want to support a set of background tasks (task1, task2...), where each
>> task:
>> • processes a queue of items
>> • waits a few seconds
>>
>> It's safe to have task1 and task2 running in parallel, but I cannot have
>> two task1s running in parallel. They will duplicately process the same
>> queue of items.
>>
>> I found the scheduler supports this nicely with parameters like:
>>
>> db.scheduler_task.insert(function_name='task1',
>> task_name='task1',
>> stop_time = now + timedelta(days=90000),
>> repeats=0,
>> period=10)
>>
>> I can launch 3 workers, and they coordinate amongst themselves to make
>> sure that only one will run the task at a time. Great! This task will last
>> forever...
>>
>> ...but now we encounter my problem...
>>
>> What happens if it crashes, or passes stop_time? Then the task will turn
>> off, and the queue is no longer processed. Or what happens if I reset the
>> database, or install this code on a new server? It isn't nice if I have to
>> re-run the insert function by hand.
>>
>> So how can I ensure there is always EXACTLY ONE of each task in the
>> database?
>>
>> I tried putting this code into models:
>>
>> def initialize_task_queue(task_name):
>> num_tasks = db((db.scheduler_task.function_name == task_name)
>> & ((db.scheduler_task.status == 'QUEUED')
>> | (db.scheduler_task.status == 'ASSIGNED')
>> | (db.scheduler_task.status == 'RUNNING')
>> | (db.scheduler_task.status == 'ACTIVE'))).count()
>>
>> # Add a task if there isn't one already
>> if num_tasks < 1:
>> db.scheduler_task.insert(function_name=task_name,
>> task_name=task_name,
>> stop_time = now + timedelta(days=90000),
>> repeats=0,
>> period=period)
>> db.commit()
>>
>> initialize_task_queue('task1')
>> initialize_task_queue('task2')
>> initialize_task_queue('task3')
>>
>> This worked, except it introduces a race condition! If you start three
>> web2py processes simultaneously (e.g., for three scheduler processes), they
>> will insert duplicate tasks:
>> process 1: count number of 'task1' tasks
>> process 2: count number of 'task1' tasks
>> process 1: there are less than 1, insert a 'task1' task
>> process 2: there are less than 1, insert a 'task1' task
>>
>> I was counting on postgresql's MVCC transaction support to make each of
>> these atomic. Unfortunately, that's not how it works. I do not understand
>> why. As a workaround, I'm currently wrapping the code inside
>> "initialize_task_queue" with postgresql advisory lock:
>>
>> if not db.executesql('select pg_try_advisory_lock(1);')[0][0]:
>> return
>>
>> ... count tasks, add one if needed ...
>>
>> db.executesql('select pg_advisory_unlock(1);')
>>
>> But this sucks.
>> What's a better way to ensure there is always 1 infinite-repeat task in
>> the scheduler? Or... am I using the wrong design entirely?
>>
>
--