I may have found the problem:
* *asyncio.futures.Future* is not threadsafe because it makes use of
*_loop.call_soon()* internally in *set_result()*, as opposed to
*call_soon_threadsafe()*. This is only correct if *_loop* happens to match
the current event loop.
I temporarily patched *BaseEventLoop.call_soon* to always be reinterpreted
as *call_soon_threadsafe* and my decorator worked just fine.
Suggestions:
(S1) Instead of {*call_soon*, *call_soon_threadsafe*}, I wonder if it would
make better sense to have {*call_soon* (threadsafe), *call_soon_unsafe* (not
threadsafe)}. This makes the safe behavior the default.
(S2) Alternatively make *call_soon* detect whether it is being called
inappropriately (on the wrong thread) and raise an exception. This is
similar to how sqlite and SWT enforce correct threading usage.
- David
On Saturday, February 15, 2014 9:25:16 PM UTC-8, Guido van Rossum wrote:
>
> I recommend adding "import pdb; pdb.set_trace()" before the yield-from
> that apparently doesn't return and stepping through things (perhaps using
> Emacs gud).
>
>
> On Sat, Feb 15, 2014 at 8:54 PM, David Foster <[email protected]<javascript:>
> > wrote:
>
>> > I could imagine that the database connection is tied to a thread but
>> not to a loop
>>
>> Yes. For the UI and DB threads, my presumption was that there was a
>> dedicated loop for each that wraps around a single thread.
>>
>> - The UI loop for wxWidgets could be implemented on top of the
>> wx.CallAfter primitive which schedules on the native UI loop.
>> - A dedicated DB thread is needed by the sqlite interface, which
>> requires that all calls made relative to a single database be made on the
>> same thread.
>>
>> > So you can probably implement the @loop_affinity() decorator, but I
>> don't see how you could implement switch_to_loop() using the published
>> Tulip API -- you'd have to modify Task._step() to add another special case
>> to the block starting with "if isinstance(result, futures.Future):" and
>> then use a similar strategy as I mentioned above.
>>
>> Indeed. I prototyped a modification to Task._step() that recognized a
>> SwitchToLoop(...) command that was yielded by the switch_to_loop function.
>>
>> > Fortunately, a proper multi-event-loop solution should be possible
>> using call_soon_threadsafe(). [...]
>>
>> Here's a first stab:
>>
>> def loop_affinity(other_loop):
>> def decorator(coro):
>> @asyncio.coroutine
>> def decorated(*args, **kwargs):
>> run_coro_result = Future()
>>
>> def run_coro():
>> coro_result = Task(coro(*args, **kwargs))
>>
>> def coro_is_done(coro_result):
>> try:
>> run_coro_result.set_result(coro_result.result())
>> except Exception as e:
>> run_coro_result.set_exception(e)
>> except BaseException as e:
>> run_coro_result.set_exception(e)
>> raise e
>> coro_result.add_done_callback(coro_is_done)
>> other_loop.call_soon_threadsafe(run_coro)
>>
>> print('run_coro_result: waiting')
>> yield from run_coro_result
>> print('run_coro_result: done') # FIXME: Never get here :(
>>
>> return decorated
>> return decorator
>>
>> I must be getting some detail slightly off since the line marked as FIXME
>> is never reached. Strange, since set_result() is getting called properly.
>>
>> --
>> David Foster
>> http://dafoster.net/
>>
>> On Saturday, February 15, 2014 7:03:51 PM UTC-8, Guido van Rossum wrote:
>>
>>> Thinking about this a little more, I suppose an executor wouldn't be the
>>> right solution, because what you want to do in the other loop/thread is
>>> schedule (and wait for) a coroutine on that thread's event loop, not just
>>> run a function in a thread. At least that's how you've written your example.
>>>
>>> (I could imagine that the database connection is tied to a thread but
>>> not to a loop, and then things are a little bit simpler, but not much --
>>> you would have to implement a new executor whose "thread pool" is a single,
>>> pre-existing thread.)
>>>
>>> Fortunately, a proper multi-event-loop solution should be possible using
>>> call_soon_threadsafe(). What you have to do is in the current loop use the
>>> other loop's call_soon_threadsafe() to schedule a helper function that
>>> starts your coroutine (it'll have to wrap it in a Task created in the
>>> context of the other loop). In order to be able to wait for that in the
>>> original loop, you have to create a Future in the original loop, and add a
>>> callback to the Task that uses the original loop's call_soon_threadsafe()
>>> to pass the result to this Future.
>>>
>>> This sounds complicated, and it is, but you can wrap it all up in a
>>> helper function (or a decorator with the signature you showed in your
>>> example), and it's the price you pay for a complex scenario (the
>>> call_soon_threadsafe() calls are needed whenever control transfers between
>>> threads).
>>>
>>> So you can probably implement the @loop_affinity() decorator, but I
>>> don't see how you could implement switch_to_loop() using the published
>>> Tulip API -- you'd have to modify Task._step() to add another special case
>>> to the block starting with "if isinstance(result, futures.Future):" and
>>> then use a similar strategy as I mentioned above.
>>>
>>> Now, that doesn't make the use case any less interesting, but I do
>>> recommend that you try to write the decorator version first, and use it for
>>> a while, before you decide that you need to switch loops in the middle. (I
>>> somehow expect that the helper-function/decorator version will take care of
>>> most of your needs, if you just structure your code appropriately.)
>>>
>>>
>>>
>>>
>>> On Sat, Feb 15, 2014 at 3:36 PM, David Foster <[email protected]> wrote:
>>>
>>>> *(Migrating this thread from the python-ideas list)*
>>>>
>>>> Tulip does not appear to address a scenario of mine where I need
>>>> different parts of the same coroutine to run on specific threads or event
>>>> loops. Consider the following code which needs different parts executed on
>>>> a UI thread, database thread, and background thread:
>>>>
>>>> def tree_node_clicked(tree_node):
>>>> print('Clicked: ' + repr(tree_node))
>>>> def db_task():
>>>> db_node = fetch_node_from_db(tree_node.url)
>>>>
>>>> def process_node(db_node):
>>>> tree_node.set_contents(db_node.contents)
>>>> # (done)
>>>>
>>>> if db_node is not None:
>>>> def bg_task():
>>>> db_node = fetch_node_from_internet(tree_node.url)
>>>> def db_task():
>>>> insert_into_db(db_node)
>>>> ui_call_soon(process_node, db_node)
>>>> db_call_soon(db_task)
>>>> bg_call_soon(bg_task)
>>>> else:
>>>> ui_call_soon(process_node, db_node)
>>>> db_call_soon(db_task)
>>>>
>>>> Imagine if you could write this function like the following:
>>>>
>>>> ui_loop = ...
>>>> db_loop = ...
>>>> bg_loop = ...
>>>>
>>>> @asyncio.coroutine
>>>> def tree_node_clicked(tree_node):
>>>> print('Clicked: ' + repr(tree_node))
>>>>
>>>> yield from switch_to_loop(db_loop)
>>>> db_node = fetch_node_from_db(tree_node)
>>>> if db_node is not None:
>>>> yield from switch_to_loop(bg_loop)
>>>> db_node = fetch_node_from_internet(tree_node.url)
>>>>
>>>> yield from switch_to_loop(db_loop)
>>>> insert_into_db(db_node)
>>>>
>>>> yield from switch_to_loop(ui_loop)
>>>> tree_node.set_contents(db_node.contents)
>>>>
>>>> (The preceding relies on a hypothetical *switch_to_loop* primitive.)
>>>>
>>>> Or even better: If you created a decorator like *@loop_affinity* that
>>>> automatically called switch_to_loop(...) if the current event loop wasn't
>>>> correct, you could even write the following even-more simplified version:
>>>>
>>>> @asyncio.coroutine
>>>> def tree_node_clicked(tree_node):
>>>> print('Clicked: ' + repr(tree_node))
>>>> db_node = yield from fetch_node_from_db(tree_node)
>>>> if db_node is not None:
>>>> db_node = yield from fetch_node_from_internet(tree_node.url)
>>>> yield from insert_into_db(db_node)
>>>> yield from tree_node.set_contents(db_node.contents)
>>>>
>>>> @loop_affinity(db_loop)
>>>> def fetch_node_from_db(...): ...
>>>>
>>>> @loop_affinity(bg_loop)
>>>> def fetch_node_from_internet(...): ...
>>>>
>>>> @loop_affinity(db_loop)
>>>> def insert_into_db(...): ...
>>>>
>>>> @loop_affinity(ui_loop)
>>>> def set_contents(...): ...
>>>>
>>>> Wouldn't that be just grand?
>>>>
>>>> Is there a way to implement a primitive similar to switch_to_loop(...)
>>>> on top of existing primitives in Tulip? Or would that be considered a new
>>>> feature?
>>>>
>>>> Jonathan Slenders and Guido suggested on python-ideas that I would be
>>>> able to implement this idea (although perhaps not switch_to_loop) using
>>>> run_in_executor. Regarding specifically the implementation
>>>> of switch_to_loop, it is not clear to me how I could get access to the
>>>> next
>>>> part of the current coroutine which I would need to pass to
>>>> run_in_executor
>>>> (or something similar).
>>>>
>>>
>>>
>>>
>>> --
>>> --Guido van Rossum (python.org/~guido)
>>>
>>
>
>
> --
> --Guido van Rossum (python.org/~guido)
>