Er, let me rephrase: I don't recommend using the scheduler for *infinitely 
looping background tasks*.

On Monday, June 25, 2012 4:54:30 PM UTC-7, Michael Toomim wrote:
>
> This scenario is working out worse and worse.
>
> Now I'm getting tasks stuck in the 'RUNNING' state... even when there 
> aren't any scheduler processes running behind them running! I'm guessing 
> the server got killed mid-process, and now it doesn't know how to recover. 
> Looks like a bug in the scheduler.
>
> I don't recommend using the scheduler as a task queue to anybody.
>
> On Tuesday, June 12, 2012 10:24:15 PM UTC-7, Michael Toomim wrote:
>>
>> Here's a common scenario. I'm looking for the best implementation using 
>> the scheduler.
>>
>> I want to support a set of background tasks (task1, task2...), where each 
>> task:
>>   • processes a queue of items
>>   • waits a few seconds
>>
>> It's safe to have task1 and task2 running in parallel, but I cannot have 
>> two task1s running in parallel. They will duplicately process the same 
>> queue of items.
>>
>> I found the scheduler supports this nicely with parameters like:
>>
>> db.scheduler_task.insert(function_name='task1',
>>                          task_name='task1',
>>                          stop_time = now + timedelta(days=90000),
>>                          repeats=0,
>>                          period=10)
>>
>> I can launch 3 workers, and they coordinate amongst themselves to make 
>> sure that only one will run the task at a time. Great! This task will last 
>> forever...
>>
>> ...but now we encounter my problem...
>>
>> What happens if it crashes, or passes stop_time? Then the task will turn 
>> off, and the queue is no longer processed. Or what happens if I reset the 
>> database, or install this code on a new server? It isn't nice if I have to 
>> re-run the insert function by hand.
>>
>> So how can I ensure there is always EXACTLY ONE of each task in the 
>> database?
>>
>> I tried putting this code into models:
>>
>> def initialize_task_queue(task_name):
>>     num_tasks = db((db.scheduler_task.function_name == task_name)
>>                    & ((db.scheduler_task.status == 'QUEUED')
>>                       | (db.scheduler_task.status == 'ASSIGNED')
>>                       | (db.scheduler_task.status == 'RUNNING')
>>                       | (db.scheduler_task.status == 'ACTIVE'))).count()
>>
>>     # Add a task if there isn't one already
>>     if num_tasks < 1:
>>         db.scheduler_task.insert(function_name=task_name,
>>                                  task_name=task_name,
>>                                  stop_time = now + timedelta(days=90000),
>>                                  repeats=0,
>>                                  period=period)
>>         db.commit()
>>
>> initialize_task_queue('task1')
>> initialize_task_queue('task2')
>> initialize_task_queue('task3')
>>
>> This worked, except it introduces a race condition! If you start three 
>> web2py processes simultaneously (e.g., for three scheduler processes), they 
>> will insert duplicate tasks:
>>     process 1: count number of 'task1' tasks
>>     process 2: count number of 'task1' tasks
>>     process 1: there are less than 1, insert a 'task1' task
>>     process 2: there are less than 1, insert a 'task1' task
>>
>> I was counting on postgresql's MVCC transaction support to make each of 
>> these atomic. Unfortunately, that's not how it works. I do not understand 
>> why. As a workaround, I'm currently wrapping the code inside 
>> "initialize_task_queue" with postgresql advisory lock:
>>
>>     if not db.executesql('select pg_try_advisory_lock(1);')[0][0]:
>>         return
>>
>>     ... count tasks, add one if needed ...
>>
>>     db.executesql('select pg_advisory_unlock(1);')
>>
>> But this sucks.
>> What's a better way to ensure there is always 1 infinite-repeat task in 
>> the scheduler? Or... am I using the wrong design entirely?
>>
>

-- 



Reply via email to