This scenario is working out worse and worse.

Now I'm getting tasks stuck in the 'RUNNING' state... even when there 
aren't any scheduler processes running behind them running! I'm guessing 
the server got killed mid-process, and now it doesn't know how to recover. 
Looks like a bug in the scheduler.

I don't recommend using the scheduler as a task queue to anybody.

On Tuesday, June 12, 2012 10:24:15 PM UTC-7, Michael Toomim wrote:
>
> Here's a common scenario. I'm looking for the best implementation using 
> the scheduler.
>
> I want to support a set of background tasks (task1, task2...), where each 
> task:
>   • processes a queue of items
>   • waits a few seconds
>
> It's safe to have task1 and task2 running in parallel, but I cannot have 
> two task1s running in parallel. They will duplicately process the same 
> queue of items.
>
> I found the scheduler supports this nicely with parameters like:
>
> db.scheduler_task.insert(function_name='task1',
>                          task_name='task1',
>                          stop_time = now + timedelta(days=90000),
>                          repeats=0,
>                          period=10)
>
> I can launch 3 workers, and they coordinate amongst themselves to make 
> sure that only one will run the task at a time. Great! This task will last 
> forever...
>
> ...but now we encounter my problem...
>
> What happens if it crashes, or passes stop_time? Then the task will turn 
> off, and the queue is no longer processed. Or what happens if I reset the 
> database, or install this code on a new server? It isn't nice if I have to 
> re-run the insert function by hand.
>
> So how can I ensure there is always EXACTLY ONE of each task in the 
> database?
>
> I tried putting this code into models:
>
> def initialize_task_queue(task_name):
>     num_tasks = db((db.scheduler_task.function_name == task_name)
>                    & ((db.scheduler_task.status == 'QUEUED')
>                       | (db.scheduler_task.status == 'ASSIGNED')
>                       | (db.scheduler_task.status == 'RUNNING')
>                       | (db.scheduler_task.status == 'ACTIVE'))).count()
>
>     # Add a task if there isn't one already
>     if num_tasks < 1:
>         db.scheduler_task.insert(function_name=task_name,
>                                  task_name=task_name,
>                                  stop_time = now + timedelta(days=90000),
>                                  repeats=0,
>                                  period=period)
>         db.commit()
>
> initialize_task_queue('task1')
> initialize_task_queue('task2')
> initialize_task_queue('task3')
>
> This worked, except it introduces a race condition! If you start three 
> web2py processes simultaneously (e.g., for three scheduler processes), they 
> will insert duplicate tasks:
>     process 1: count number of 'task1' tasks
>     process 2: count number of 'task1' tasks
>     process 1: there are less than 1, insert a 'task1' task
>     process 2: there are less than 1, insert a 'task1' task
>
> I was counting on postgresql's MVCC transaction support to make each of 
> these atomic. Unfortunately, that's not how it works. I do not understand 
> why. As a workaround, I'm currently wrapping the code inside 
> "initialize_task_queue" with postgresql advisory lock:
>
>     if not db.executesql('select pg_try_advisory_lock(1);')[0][0]:
>         return
>
>     ... count tasks, add one if needed ...
>
>     db.executesql('select pg_advisory_unlock(1);')
>
> But this sucks.
> What's a better way to ensure there is always 1 infinite-repeat task in 
> the scheduler? Or... am I using the wrong design entirely?
>

-- 



Reply via email to