For readability, it's desirable to make asynchronous code use coroutines to avoid callbacks when possible. For python2 compatibility, generators that yield Futures can be used to implement coroutines.
Add a compat_coroutine module which provides a @coroutine decorator and a coroutine_return function that can be used to return a value from a generator. The decorated function returns a Future which is done when the generator is exhausted. Usage is very similar to asyncio coroutine usage in python3.4 (see unit tests). Bug: https://bugs.gentoo.org/660426 --- .../tests/util/futures/test_compat_coroutine.py | 57 ++++++++++++++ pym/portage/util/futures/compat_coroutine.py | 90 ++++++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 pym/portage/tests/util/futures/test_compat_coroutine.py create mode 100644 pym/portage/util/futures/compat_coroutine.py diff --git a/pym/portage/tests/util/futures/test_compat_coroutine.py b/pym/portage/tests/util/futures/test_compat_coroutine.py new file mode 100644 index 0000000000..4a1d931b6b --- /dev/null +++ b/pym/portage/tests/util/futures/test_compat_coroutine.py @@ -0,0 +1,57 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from portage.util.futures import asyncio +from portage.util.futures.compat_coroutine import ( + coroutine, + coroutine_return, +) +from portage.tests import TestCase + + +class CompatCoroutineTestCase(TestCase): + + def test_returning_coroutine(self): + @coroutine + def returning_coroutine(): + coroutine_return('success') + yield None + + self.assertEqual('success', + asyncio.get_event_loop().run_until_complete(returning_coroutine())) + + def test_raising_coroutine(self): + + class TestException(Exception): + pass + + @coroutine + def raising_coroutine(): + raise TestException('exception') + yield None + + self.assertRaises(TestException, + asyncio.get_event_loop().run_until_complete, raising_coroutine()) + + def test_cancelled_coroutine(self): + + @coroutine + def endlessly_sleeping_coroutine(loop=None): + loop = asyncio._wrap_loop(loop) + yield loop.create_future() + + loop = asyncio.get_event_loop() + future = endlessly_sleeping_coroutine(loop=loop) + loop.call_soon(future.cancel) + + self.assertRaises(asyncio.CancelledError, + loop.run_until_complete, future) + + def test_sleeping_coroutine(self): + @coroutine + def sleeping_coroutine(): + for i in range(3): + x = yield asyncio.sleep(0, result=i) + self.assertEqual(x, i) + + asyncio.get_event_loop().run_until_complete(sleeping_coroutine()) diff --git a/pym/portage/util/futures/compat_coroutine.py b/pym/portage/util/futures/compat_coroutine.py new file mode 100644 index 0000000000..eea0b2883e --- /dev/null +++ b/pym/portage/util/futures/compat_coroutine.py @@ -0,0 +1,90 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from portage.util.futures import asyncio +import functools + + +def coroutine(generator_func): + """ + A decorator for a generator function that behaves as coroutine function. + The generator should yield a Future instance in order to wait for it, + and the result becomes the result of the current yield-expression, + via the PEP 342 generator send() method. + + The decorated function returns a Future which is done when the generator + is exhausted. The generator can return a value via the coroutine_return + function. + """ + return functools.partial(_generator_future, generator_func) + + +def coroutine_return(result=None): + """ + Return a result from the current coroutine. + """ + raise _CoroutineReturnValue(result) + + +def _generator_future(generator_func, *args, **kwargs): + """ + Call generator_func with the given arguments, and return a Future + that is done when the resulting generation is exhausted. If is a + keyword argument named 'loop' is given, then it is used instead of + the default event loop. + """ + loop = asyncio._wrap_loop(kwargs.get('loop')) + result = loop.create_future() + _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop) + return result + + +class _CoroutineReturnValue(Exception): + def __init__(self, result): + self.result = result + + +class _GeneratorTask(object): + """ + Asynchronously executes the generator to completion, waiting for + the result of each Future that it yields, and sending the result + to the generator. + """ + def __init__(self, generator, result, loop): + self._generator = generator + self._result = result + self._loop = loop + result.add_done_callback(self._cancel_callback) + self._next() + + def _cancel_callback(self, result): + if result.cancelled(): + self._generator.close() + + def _next(self, previous=None): + if self._result.cancelled(): + return + try: + if previous is None: + future = next(self._generator) + elif previous.cancelled(): + self._generator.throw(asyncio.CancelledError()) + future = next(self._generator) + elif previous.exception() is None: + future = self._generator.send(previous.result()) + else: + self._generator.throw(previous.exception()) + future = next(self._generator) + + except _CoroutineReturnValue as e: + if not self._result.cancelled(): + self._result.set_result(e.result) + except StopIteration: + if not self._result.cancelled(): + self._result.set_result(None) + except Exception as e: + if not self._result.cancelled(): + self._result.set_exception(e) + else: + future = asyncio.ensure_future(future, loop=self._loop) + future.add_done_callback(self._next) -- 2.13.6