commit:     85ac23b7c0c58cef72d22281d66d086521c01e3e
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun May  6 11:05:03 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun May  6 11:41:45 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=85ac23b7

asyncio: add _wrap_loop helper (bug 654390)

In order to deal with asyncio event loop compatibility issues, add
a _wrap_loop helper. For example, since python3.4 does not have the
AbstractEventLoop.create_future() method, this helper function can
be used to add a wrapper that implements the create_future method
for python3.4.

Bug: https://bugs.gentoo.org/654390

 pym/portage/dbapi/porttree.py                         | 12 ++++++------
 .../tests/util/futures/asyncio/test_child_watcher.py  |  2 +-
 .../util/futures/asyncio/test_event_loop_in_fork.py   |  8 ++++----
 .../tests/util/futures/asyncio/test_pipe_closed.py    |  4 ++--
 .../util/futures/asyncio/test_run_until_complete.py   |  2 +-
 .../util/futures/asyncio/test_subprocess_exec.py      |  4 ++--
 pym/portage/util/futures/_asyncio/__init__.py         | 19 ++++++++++++++++++-
 pym/portage/util/futures/_asyncio/tasks.py            |  7 +++++--
 pym/portage/util/futures/executor/fork.py             |  4 ++--
 pym/portage/util/futures/iter_completed.py            |  7 +++----
 pym/portage/util/futures/retry.py                     |  3 +--
 11 files changed, 45 insertions(+), 27 deletions(-)

diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py
index 801b5658a..3e36024ff 100644
--- a/pym/portage/dbapi/porttree.py
+++ b/pym/portage/dbapi/porttree.py
@@ -36,7 +36,7 @@ from portage import _encodings
 from portage import _unicode_encode
 from portage import OrderedDict
 from portage.util._eventloop.EventLoop import EventLoop
-from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures import asyncio
 from portage.util.futures.iter_completed import iter_gather
 from _emerge.EbuildMetadataPhase import EbuildMetadataPhase
 
@@ -325,8 +325,8 @@ class portdbapi(dbapi):
        @property
        def _event_loop(self):
                if portage._internal_caller:
-                       # For internal portage usage, the global_event_loop is 
safe.
-                       return global_event_loop()
+                       # For internal portage usage, asyncio._wrap_loop() is 
safe.
+                       return asyncio._wrap_loop()
                else:
                        # For external API consumers, use a local EventLoop, 
since
                        # we don't want to assume that it's safe to override the
@@ -611,7 +611,7 @@ class portdbapi(dbapi):
                # to simultaneous instantiation of multiple event loops here.
                # Callers of this method certainly want the same event loop to
                # be used for all calls.
-               loop = loop or global_event_loop()
+               loop = asyncio._wrap_loop(loop)
                future = loop.create_future()
                cache_me = False
                if myrepo is not None:
@@ -751,7 +751,7 @@ class portdbapi(dbapi):
                        a set of alternative URIs.
                @rtype: asyncio.Future (or compatible)
                """
-               loop = loop or global_event_loop()
+               loop = asyncio._wrap_loop(loop)
                result = loop.create_future()
 
                def aux_get_done(aux_get_future):
@@ -1419,7 +1419,7 @@ def _async_manifest_fetchlist(portdb, repo_config, cp, 
cpv_list=None,
        @return: a Future resulting in a Mapping compatible with FetchlistDict
        @rtype: asyncio.Future (or compatible)
        """
-       loop = loop or global_event_loop()
+       loop = asyncio._wrap_loop(loop)
        result = loop.create_future()
        cpv_list = (portdb.cp_list(cp, mytree=repo_config.location)
                if cpv_list is None else cpv_list)

diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py 
b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
index dca01be56..8ef497544 100644
--- a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
+++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
@@ -28,7 +28,7 @@ class ChildWatcherTestCase(TestCase):
 
                        args_tuple = ('hello', 'world')
 
-                       loop = asyncio.get_event_loop()
+                       loop = asyncio._wrap_loop()
                        future = loop.create_future()
 
                        def callback(pid, returncode, *args):

diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py 
b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
index 7868d792a..19588bf3a 100644
--- a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
+++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
@@ -11,14 +11,14 @@ from portage.util.futures.unix_events import 
DefaultEventLoopPolicy
 
 def fork_main(parent_conn, child_conn):
        parent_conn.close()
-       loop = asyncio.get_event_loop()
+       loop = asyncio._wrap_loop()
        # This fails with python's default event loop policy,
        # see https://bugs.python.org/issue22087.
-       loop.run_until_complete(asyncio.sleep(0.1))
+       loop.run_until_complete(asyncio.sleep(0.1, loop=loop))
 
 
 def async_main(fork_exitcode, loop=None):
-       loop = loop or asyncio.get_event_loop()
+       loop = asyncio._wrap_loop(loop)
 
        # Since python2.7 does not support Process.sentinel, use Pipe to
        # monitor for process exit.
@@ -48,7 +48,7 @@ class EventLoopInForkTestCase(TestCase):
                if not isinstance(initial_policy, DefaultEventLoopPolicy):
                        asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
                try:
-                       loop = asyncio.get_event_loop()
+                       loop = asyncio._wrap_loop()
                        fork_exitcode = loop.create_future()
                        # Make async_main fork while the loop is running, which 
would
                        # trigger https://bugs.python.org/issue22087 with 
asyncio's

diff --git a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py 
b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
index e63829888..c2b468064 100644
--- a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
+++ b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
@@ -58,7 +58,7 @@ class ReaderPipeClosedTestCase(_PipeClosedTestCase, TestCase):
                if not isinstance(initial_policy, DefaultEventLoopPolicy):
                        asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
 
-               loop = asyncio.get_event_loop()
+               loop = asyncio._wrap_loop()
                read_end = os.fdopen(read_end, 'rb', 0)
                write_end = os.fdopen(write_end, 'wb', 0)
                try:
@@ -95,7 +95,7 @@ class WriterPipeClosedTestCase(_PipeClosedTestCase, TestCase):
                if not isinstance(initial_policy, DefaultEventLoopPolicy):
                        asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
 
-               loop = asyncio.get_event_loop()
+               loop = asyncio._wrap_loop()
                read_end = os.fdopen(read_end, 'rb', 0)
                write_end = os.fdopen(write_end, 'wb', 0)
                try:

diff --git a/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py 
b/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py
index fc8f198ca..1a37e4922 100644
--- a/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py
+++ b/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py
@@ -13,7 +13,7 @@ class RunUntilCompleteTestCase(TestCase):
                        asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
 
                try:
-                       loop = asyncio.get_event_loop()
+                       loop = asyncio._wrap_loop()
                        f1 = loop.create_future()
                        f2 = loop.create_future()
                        f1.add_done_callback(f2.set_result)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index 98983941d..8dc5fa7b9 100644
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -23,7 +23,7 @@ def reader(input_file, loop=None):
        @return: bytes
        @rtype: asyncio.Future (or compatible)
        """
-       loop = loop or asyncio.get_event_loop()
+       loop = asyncio._wrap_loop(loop)
        future = loop.create_future()
        _Reader(future, input_file, loop)
        return future
@@ -61,7 +61,7 @@ class SubprocessExecTestCase(TestCase):
                        asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
 
                try:
-                       test(asyncio.get_event_loop())
+                       test(asyncio._wrap_loop())
                finally:
                        asyncio.set_event_loop_policy(initial_policy)
 

diff --git a/pym/portage/util/futures/_asyncio/__init__.py 
b/pym/portage/util/futures/_asyncio/__init__.py
index 9ae050874..e62de7a69 100644
--- a/pym/portage/util/futures/_asyncio/__init__.py
+++ b/pym/portage/util/futures/_asyncio/__init__.py
@@ -137,7 +137,7 @@ def sleep(delay, result=None, loop=None):
        @rtype: asyncio.Future (or compatible)
        @return: an instance of Future
        """
-       loop = loop or get_event_loop()
+       loop = _wrap_loop(loop)
        future = loop.create_future()
        handle = loop.call_later(delay, future.set_result, result)
        def cancel_callback(future):
@@ -145,3 +145,20 @@ def sleep(delay, result=None, loop=None):
                        handle.cancel()
        future.add_done_callback(cancel_callback)
        return future
+
+
+def _wrap_loop(loop=None):
+       """
+       In order to deal with asyncio event loop compatibility issues,
+       use this function to wrap the loop parameter for functions
+       that support it. For example, since python3.4 does not have the
+       AbstractEventLoop.create_future() method, this helper function
+       can be used to add a wrapper that implements the create_future
+       method for python3.4.
+
+       @type loop: asyncio.AbstractEventLoop (or compatible)
+       @param loop: event loop
+       @rtype: asyncio.AbstractEventLoop (or compatible)
+       @return: event loop
+       """
+       return loop or get_event_loop()

diff --git a/pym/portage/util/futures/_asyncio/tasks.py 
b/pym/portage/util/futures/_asyncio/tasks.py
index 5f10d3c7b..b20765b7a 100644
--- a/pym/portage/util/futures/_asyncio/tasks.py
+++ b/pym/portage/util/futures/_asyncio/tasks.py
@@ -15,7 +15,10 @@ except ImportError:
        FIRST_COMPLETED ='FIRST_COMPLETED'
        FIRST_EXCEPTION = 'FIRST_EXCEPTION'
 
-
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+       'portage.util.futures:asyncio',
+)
 from portage.util._eventloop.global_event_loop import (
        global_event_loop as _global_event_loop,
 )
@@ -40,7 +43,7 @@ def wait(futures, loop=None, timeout=None, 
return_when=ALL_COMPLETED):
        @return: tuple of (done, pending).
        @rtype: asyncio.Future (or compatible)
        """
-       loop = loop or _global_event_loop()
+       loop = asyncio._wrap_loop(loop)
        result_future = loop.create_future()
        _Waiter(futures, timeout, return_when, result_future, loop)
        return result_future

diff --git a/pym/portage/util/futures/executor/fork.py 
b/pym/portage/util/futures/executor/fork.py
index 276ed54f1..72844403c 100644
--- a/pym/portage/util/futures/executor/fork.py
+++ b/pym/portage/util/futures/executor/fork.py
@@ -13,7 +13,7 @@ import sys
 import traceback
 
 from portage.util._async.AsyncFunction import AsyncFunction
-from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures import asyncio
 
 
 class ForkExecutor(object):
@@ -25,7 +25,7 @@ class ForkExecutor(object):
        """
        def __init__(self, max_workers=None, loop=None):
                self._max_workers = max_workers or multiprocessing.cpu_count()
-               self._loop = loop or global_event_loop()
+               self._loop = asyncio._wrap_loop(loop)
                self._submit_queue = collections.deque()
                self._running_tasks = {}
                self._shutdown = False

diff --git a/pym/portage/util/futures/iter_completed.py 
b/pym/portage/util/futures/iter_completed.py
index 231b7e3ab..31b5e0c78 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -6,7 +6,6 @@ import multiprocessing
 
 from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
 from portage.util._async.TaskScheduler import TaskScheduler
-from portage.util._eventloop.global_event_loop import global_event_loop
 from portage.util.futures import asyncio
 
 
@@ -30,7 +29,7 @@ def iter_completed(futures, max_jobs=None, max_load=None, 
loop=None):
        @return: iterator of futures that are done
        @rtype: iterator
        """
-       loop = loop or global_event_loop()
+       loop = asyncio._wrap_loop(loop)
 
        for future_done_set in async_iter_completed(futures,
                max_jobs=max_jobs, max_load=max_load, loop=loop):
@@ -60,7 +59,7 @@ def async_iter_completed(futures, max_jobs=None, 
max_load=None, loop=None):
                input futures that are done
        @rtype: iterator
        """
-       loop = loop or global_event_loop()
+       loop = asyncio._wrap_loop(loop)
 
        max_jobs = max_jobs or multiprocessing.cpu_count()
        max_load = max_load or multiprocessing.cpu_count()
@@ -133,7 +132,7 @@ def iter_gather(futures, max_jobs=None, max_load=None, 
loop=None):
                same order that they were yielded from the input iterator
        @rtype: asyncio.Future (or compatible)
        """
-       loop = loop or global_event_loop()
+       loop = asyncio._wrap_loop(loop)
        result = loop.create_future()
        futures_list = []
 

diff --git a/pym/portage/util/futures/retry.py 
b/pym/portage/util/futures/retry.py
index 82012d2f3..8a51669ff 100644
--- a/pym/portage/util/futures/retry.py
+++ b/pym/portage/util/futures/retry.py
@@ -9,7 +9,6 @@ __all__ = (
 import functools
 
 from portage.exception import PortageException
-from portage.util._eventloop.global_event_loop import global_event_loop
 from portage.util.futures import asyncio
 
 
@@ -67,7 +66,7 @@ def _retry(loop, try_max, try_timeout, overall_timeout, 
delay_func,
        @return: func return value
        @rtype: asyncio.Future (or compatible)
        """
-       loop = loop or global_event_loop()
+       loop = asyncio._wrap_loop(loop)
        future = loop.create_future()
        _Retry(future, loop, try_max, try_timeout, overall_timeout, delay_func,
                reraise, functools.partial(func, *args, **kwargs))

Reply via email to