For readability, it's desirable to make asynchronous code use
coroutines to avoid callbacks when possible. For python2 compatibility,
generators that yield Futures can be used to implement coroutines.

Add a compat_coroutine module which provides a @coroutine decorator
and a coroutine_return function that can be used to return a value
from a generator. The decorated function returns a Future which is
done when the generator is exhausted. Usage is very similar to asyncio
coroutine usage in python3.4 (see unit tests).

Bug: https://bugs.gentoo.org/660426
---
 .../tests/util/futures/test_compat_coroutine.py    | 57 ++++++++++++++
 pym/portage/util/futures/compat_coroutine.py       | 90 ++++++++++++++++++++++
 2 files changed, 147 insertions(+)
 create mode 100644 pym/portage/tests/util/futures/test_compat_coroutine.py
 create mode 100644 pym/portage/util/futures/compat_coroutine.py

diff --git a/pym/portage/tests/util/futures/test_compat_coroutine.py 
b/pym/portage/tests/util/futures/test_compat_coroutine.py
new file mode 100644
index 0000000000..4a1d931b6b
--- /dev/null
+++ b/pym/portage/tests/util/futures/test_compat_coroutine.py
@@ -0,0 +1,57 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import (
+       coroutine,
+       coroutine_return,
+)
+from portage.tests import TestCase
+
+
+class CompatCoroutineTestCase(TestCase):
+
+       def test_returning_coroutine(self):
+               @coroutine
+               def returning_coroutine():
+                       coroutine_return('success')
+                       yield None
+
+               self.assertEqual('success',
+                       
asyncio.get_event_loop().run_until_complete(returning_coroutine()))
+
+       def test_raising_coroutine(self):
+
+               class TestException(Exception):
+                       pass
+
+               @coroutine
+               def raising_coroutine():
+                       raise TestException('exception')
+                       yield None
+
+               self.assertRaises(TestException,
+                       asyncio.get_event_loop().run_until_complete, 
raising_coroutine())
+
+       def test_cancelled_coroutine(self):
+
+               @coroutine
+               def endlessly_sleeping_coroutine(loop=None):
+                       loop = asyncio._wrap_loop(loop)
+                       yield loop.create_future()
+
+               loop = asyncio.get_event_loop()
+               future = endlessly_sleeping_coroutine(loop=loop)
+               loop.call_soon(future.cancel)
+
+               self.assertRaises(asyncio.CancelledError,
+                       loop.run_until_complete, future)
+
+       def test_sleeping_coroutine(self):
+               @coroutine
+               def sleeping_coroutine():
+                       for i in range(3):
+                               x = yield asyncio.sleep(0, result=i)
+                               self.assertEqual(x, i)
+
+               
asyncio.get_event_loop().run_until_complete(sleeping_coroutine())
diff --git a/pym/portage/util/futures/compat_coroutine.py 
b/pym/portage/util/futures/compat_coroutine.py
new file mode 100644
index 0000000000..eea0b2883e
--- /dev/null
+++ b/pym/portage/util/futures/compat_coroutine.py
@@ -0,0 +1,90 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+import functools
+
+
+def coroutine(generator_func):
+       """
+       A decorator for a generator function that behaves as coroutine function.
+       The generator should yield a Future instance in order to wait for it,
+       and the result becomes the result of the current yield-expression,
+       via the PEP 342 generator send() method.
+
+       The decorated function returns a Future which is done when the generator
+       is exhausted. The generator can return a value via the coroutine_return
+       function.
+       """
+       return functools.partial(_generator_future, generator_func)
+
+
+def coroutine_return(result=None):
+       """
+       Return a result from the current coroutine.
+       """
+       raise _CoroutineReturnValue(result)
+
+
+def _generator_future(generator_func, *args, **kwargs):
+       """
+       Call generator_func with the given arguments, and return a Future
+       that is done when the resulting generation is exhausted. If is a
+       keyword argument named 'loop' is given, then it is used instead of
+       the default event loop.
+       """
+       loop = asyncio._wrap_loop(kwargs.get('loop'))
+       result = loop.create_future()
+       _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop)
+       return result
+
+
+class _CoroutineReturnValue(Exception):
+       def __init__(self, result):
+               self.result = result
+
+
+class _GeneratorTask(object):
+       """
+       Asynchronously executes the generator to completion, waiting for
+       the result of each Future that it yields, and sending the result
+       to the generator.
+       """
+       def __init__(self, generator, result, loop):
+               self._generator = generator
+               self._result = result
+               self._loop = loop
+               result.add_done_callback(self._cancel_callback)
+               self._next()
+
+       def _cancel_callback(self, result):
+               if result.cancelled():
+                       self._generator.close()
+
+       def _next(self, previous=None):
+               if self._result.cancelled():
+                       return
+               try:
+                       if previous is None:
+                               future = next(self._generator)
+                       elif previous.cancelled():
+                               self._generator.throw(asyncio.CancelledError())
+                               future = next(self._generator)
+                       elif previous.exception() is None:
+                               future = self._generator.send(previous.result())
+                       else:
+                               self._generator.throw(previous.exception())
+                               future = next(self._generator)
+
+               except _CoroutineReturnValue as e:
+                       if not self._result.cancelled():
+                               self._result.set_result(e.result)
+               except StopIteration:
+                       if not self._result.cancelled():
+                               self._result.set_result(None)
+               except Exception as e:
+                       if not self._result.cancelled():
+                               self._result.set_exception(e)
+               else:
+                       future = asyncio.ensure_future(future, loop=self._loop)
+                       future.add_done_callback(self._next)
-- 
2.13.6


Reply via email to