I may have found the problem:

* *asyncio.futures.Future* is not threadsafe because it makes use of 
*_loop.call_soon()* internally in *set_result()*, as opposed to 
*call_soon_threadsafe()*. This is only correct if *_loop* happens to match 
the current event loop.

I temporarily patched *BaseEventLoop.call_soon* to always be reinterpreted 
as *call_soon_threadsafe* and my decorator worked just fine.

Suggestions:

(S1) Instead of {*call_soon*, *call_soon_threadsafe*}, I wonder if it would 
make better sense to have {*call_soon* (threadsafe), *call_soon_unsafe* (not 
threadsafe)}. This makes the safe behavior the default.

(S2) Alternatively make *call_soon* detect whether it is being called 
inappropriately (on the wrong thread) and raise an exception. This is 
similar to how sqlite and SWT enforce correct threading usage.

- David

On Saturday, February 15, 2014 9:25:16 PM UTC-8, Guido van Rossum wrote:
>
> I recommend adding "import pdb; pdb.set_trace()" before the yield-from 
> that apparently doesn't return and stepping through things (perhaps using 
> Emacs gud).
>
>
> On Sat, Feb 15, 2014 at 8:54 PM, David Foster <[email protected]<javascript:>
> > wrote:
>
>> > I could imagine that the database connection is tied to a thread but 
>> not to a loop
>>
>> Yes. For the UI and DB threads, my presumption was that there was a 
>> dedicated loop for each that wraps around a single thread.
>>
>>    - The UI loop for wxWidgets could be implemented on top of the 
>>    wx.CallAfter primitive which schedules on the native UI loop.
>>    - A dedicated DB thread is needed by the sqlite interface, which 
>>    requires that all calls made relative to a single database be made on the 
>>    same thread. 
>>
>> > So you can probably implement the @loop_affinity() decorator, but I 
>> don't see how you could implement switch_to_loop() using the published 
>> Tulip API -- you'd have to modify Task._step() to add another special case 
>> to the block starting with "if isinstance(result, futures.Future):" and 
>> then use a similar strategy as I mentioned above.
>>
>> Indeed. I prototyped a modification to Task._step() that recognized a 
>> SwitchToLoop(...) command that was yielded by the switch_to_loop function.
>>
>> > Fortunately, a proper multi-event-loop solution should be possible 
>> using call_soon_threadsafe(). [...]
>>
>> Here's a first stab:
>>
>> def loop_affinity(other_loop):
>>     def decorator(coro):
>>         @asyncio.coroutine
>>         def decorated(*args, **kwargs):
>>             run_coro_result = Future()
>>             
>>             def run_coro():
>>                 coro_result = Task(coro(*args, **kwargs))
>>                 
>>                 def coro_is_done(coro_result):
>>                     try:
>>                         run_coro_result.set_result(coro_result.result())
>>                     except Exception as e:
>>                         run_coro_result.set_exception(e)
>>                     except BaseException as e:
>>                         run_coro_result.set_exception(e)
>>                         raise e
>>                 coro_result.add_done_callback(coro_is_done)
>>             other_loop.call_soon_threadsafe(run_coro)
>>             
>>             print('run_coro_result: waiting')
>>             yield from run_coro_result
>>             print('run_coro_result: done')    # FIXME: Never get here :(
>>         
>>         return decorated
>>     return decorator
>>
>> I must be getting some detail slightly off since the line marked as FIXME 
>> is never reached. Strange, since set_result() is getting called properly.
>>
>> --
>> David Foster
>> http://dafoster.net/
>>
>> On Saturday, February 15, 2014 7:03:51 PM UTC-8, Guido van Rossum wrote:
>>
>>> Thinking about this a little more, I suppose an executor wouldn't be the 
>>> right solution, because what you want to do in the other loop/thread is 
>>> schedule (and wait for) a coroutine on that thread's event loop, not just 
>>> run a function in a thread. At least that's how you've written your example.
>>>
>>> (I could imagine that the database connection is tied to a thread but 
>>> not to a loop, and then things are a little bit simpler, but not much -- 
>>> you would have to implement a new executor whose "thread pool" is a single, 
>>> pre-existing thread.)
>>>
>>> Fortunately, a proper multi-event-loop solution should be possible using 
>>> call_soon_threadsafe(). What you have to do is in the current loop use the 
>>> other loop's call_soon_threadsafe() to schedule a helper function that 
>>> starts your coroutine (it'll have to wrap it in a Task created in the 
>>> context of the other loop). In order to be able to wait for that in the 
>>> original loop, you have to create a Future in the original loop, and add a 
>>> callback to the Task that uses the original loop's call_soon_threadsafe() 
>>> to pass the result to this Future.
>>>
>>> This sounds complicated, and it is, but you can wrap it all up in a 
>>> helper function (or a decorator with the signature you showed in your 
>>> example), and it's the price you pay for a complex scenario (the 
>>> call_soon_threadsafe() calls are needed whenever control transfers between 
>>> threads).
>>>
>>> So you can probably implement the @loop_affinity() decorator, but I 
>>> don't see how you could implement switch_to_loop() using the published 
>>> Tulip API -- you'd have to modify Task._step() to add another special case 
>>> to the block starting with "if isinstance(result, futures.Future):" and 
>>> then use a similar strategy as I mentioned above.
>>>
>>> Now, that doesn't make the use case any less interesting, but I do 
>>> recommend that you try to write the decorator version first, and use it for 
>>> a while, before you decide that you need to switch loops in the middle. (I 
>>> somehow expect that the helper-function/decorator version will take care of 
>>> most of your needs, if you just structure your code appropriately.)
>>>
>>>
>>>
>>>
>>> On Sat, Feb 15, 2014 at 3:36 PM, David Foster <[email protected]> wrote:
>>>
>>>> *(Migrating this thread from the python-ideas list)*
>>>>
>>>> Tulip does not appear to address a scenario of mine where I need 
>>>> different parts of the same coroutine to run on specific threads or event 
>>>> loops. Consider the following code which needs different parts executed on 
>>>> a UI thread, database thread, and background thread:
>>>>
>>>> def tree_node_clicked(tree_node):
>>>>     print('Clicked: ' + repr(tree_node))
>>>>     def db_task():
>>>>         db_node = fetch_node_from_db(tree_node.url)
>>>>
>>>>         def process_node(db_node):
>>>>             tree_node.set_contents(db_node.contents)
>>>>             # (done)
>>>>
>>>>         if db_node is not None:
>>>>             def bg_task():
>>>>                 db_node = fetch_node_from_internet(tree_node.url)
>>>>                 def db_task():
>>>>                     insert_into_db(db_node)
>>>>                     ui_call_soon(process_node, db_node)
>>>>                 db_call_soon(db_task)
>>>>             bg_call_soon(bg_task)
>>>>         else:
>>>>             ui_call_soon(process_node, db_node)
>>>>     db_call_soon(db_task)
>>>>
>>>> Imagine if you could write this function like the following:
>>>>
>>>> ui_loop = ...
>>>> db_loop = ...
>>>> bg_loop = ...
>>>>
>>>> @asyncio.coroutine
>>>> def tree_node_clicked(tree_node):
>>>>     print('Clicked: ' + repr(tree_node))
>>>>
>>>>     yield from switch_to_loop(db_loop)
>>>>     db_node = fetch_node_from_db(tree_node)
>>>>     if db_node is not None:
>>>>         yield from switch_to_loop(bg_loop)
>>>>         db_node = fetch_node_from_internet(tree_node.url)
>>>>
>>>>         yield from switch_to_loop(db_loop)
>>>>         insert_into_db(db_node)
>>>>
>>>>     yield from switch_to_loop(ui_loop)
>>>>     tree_node.set_contents(db_node.contents)
>>>>
>>>> (The preceding relies on a hypothetical *switch_to_loop* primitive.)
>>>>
>>>> Or even better: If you created a decorator like *@loop_affinity* that 
>>>> automatically called switch_to_loop(...) if the current event loop wasn't 
>>>> correct, you could even write the following even-more simplified version:
>>>>
>>>> @asyncio.coroutine
>>>> def tree_node_clicked(tree_node):
>>>>     print('Clicked: ' + repr(tree_node))
>>>>     db_node = yield from fetch_node_from_db(tree_node)
>>>>     if db_node is not None:
>>>>         db_node = yield from fetch_node_from_internet(tree_node.url)
>>>>         yield from insert_into_db(db_node)
>>>>     yield from tree_node.set_contents(db_node.contents)
>>>>
>>>> @loop_affinity(db_loop)
>>>> def fetch_node_from_db(...): ...
>>>>
>>>> @loop_affinity(bg_loop)
>>>> def fetch_node_from_internet(...): ...
>>>>
>>>> @loop_affinity(db_loop)
>>>> def insert_into_db(...): ...
>>>>
>>>> @loop_affinity(ui_loop)
>>>> def set_contents(...): ...
>>>>
>>>> Wouldn't that be just grand?
>>>>
>>>> Is there a way to implement a primitive similar to switch_to_loop(...) 
>>>> on top of existing primitives in Tulip? Or would that be considered a new 
>>>> feature?
>>>>
>>>> Jonathan Slenders and Guido suggested on python-ideas that I would be 
>>>> able to implement this idea (although perhaps not switch_to_loop) using 
>>>> run_in_executor. Regarding specifically the implementation 
>>>> of switch_to_loop, it is not clear to me how I could get access to the 
>>>> next 
>>>> part of the current coroutine which I would need to pass to 
>>>> run_in_executor 
>>>> (or something similar).
>>>>
>>>
>>>
>>>
>>> -- 
>>> --Guido van Rossum (python.org/~guido) 
>>>
>>
>
>
> -- 
> --Guido van Rossum (python.org/~guido) 
>

Reply via email to