commit: 85ac23b7c0c58cef72d22281d66d086521c01e3e Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Sun May 6 11:05:03 2018 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Sun May 6 11:41:45 2018 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=85ac23b7
asyncio: add _wrap_loop helper (bug 654390) In order to deal with asyncio event loop compatibility issues, add a _wrap_loop helper. For example, since python3.4 does not have the AbstractEventLoop.create_future() method, this helper function can be used to add a wrapper that implements the create_future method for python3.4. Bug: https://bugs.gentoo.org/654390 pym/portage/dbapi/porttree.py | 12 ++++++------ .../tests/util/futures/asyncio/test_child_watcher.py | 2 +- .../util/futures/asyncio/test_event_loop_in_fork.py | 8 ++++---- .../tests/util/futures/asyncio/test_pipe_closed.py | 4 ++-- .../util/futures/asyncio/test_run_until_complete.py | 2 +- .../util/futures/asyncio/test_subprocess_exec.py | 4 ++-- pym/portage/util/futures/_asyncio/__init__.py | 19 ++++++++++++++++++- pym/portage/util/futures/_asyncio/tasks.py | 7 +++++-- pym/portage/util/futures/executor/fork.py | 4 ++-- pym/portage/util/futures/iter_completed.py | 7 +++---- pym/portage/util/futures/retry.py | 3 +-- 11 files changed, 45 insertions(+), 27 deletions(-) diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py index 801b5658a..3e36024ff 100644 --- a/pym/portage/dbapi/porttree.py +++ b/pym/portage/dbapi/porttree.py @@ -36,7 +36,7 @@ from portage import _encodings from portage import _unicode_encode from portage import OrderedDict from portage.util._eventloop.EventLoop import EventLoop -from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util.futures import asyncio from portage.util.futures.iter_completed import iter_gather from _emerge.EbuildMetadataPhase import EbuildMetadataPhase @@ -325,8 +325,8 @@ class portdbapi(dbapi): @property def _event_loop(self): if portage._internal_caller: - # For internal portage usage, the global_event_loop is safe. - return global_event_loop() + # For internal portage usage, asyncio._wrap_loop() is safe. + return asyncio._wrap_loop() else: # For external API consumers, use a local EventLoop, since # we don't want to assume that it's safe to override the @@ -611,7 +611,7 @@ class portdbapi(dbapi): # to simultaneous instantiation of multiple event loops here. # Callers of this method certainly want the same event loop to # be used for all calls. - loop = loop or global_event_loop() + loop = asyncio._wrap_loop(loop) future = loop.create_future() cache_me = False if myrepo is not None: @@ -751,7 +751,7 @@ class portdbapi(dbapi): a set of alternative URIs. @rtype: asyncio.Future (or compatible) """ - loop = loop or global_event_loop() + loop = asyncio._wrap_loop(loop) result = loop.create_future() def aux_get_done(aux_get_future): @@ -1419,7 +1419,7 @@ def _async_manifest_fetchlist(portdb, repo_config, cp, cpv_list=None, @return: a Future resulting in a Mapping compatible with FetchlistDict @rtype: asyncio.Future (or compatible) """ - loop = loop or global_event_loop() + loop = asyncio._wrap_loop(loop) result = loop.create_future() cpv_list = (portdb.cp_list(cp, mytree=repo_config.location) if cpv_list is None else cpv_list) diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py index dca01be56..8ef497544 100644 --- a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py +++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py @@ -28,7 +28,7 @@ class ChildWatcherTestCase(TestCase): args_tuple = ('hello', 'world') - loop = asyncio.get_event_loop() + loop = asyncio._wrap_loop() future = loop.create_future() def callback(pid, returncode, *args): diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py index 7868d792a..19588bf3a 100644 --- a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py +++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py @@ -11,14 +11,14 @@ from portage.util.futures.unix_events import DefaultEventLoopPolicy def fork_main(parent_conn, child_conn): parent_conn.close() - loop = asyncio.get_event_loop() + loop = asyncio._wrap_loop() # This fails with python's default event loop policy, # see https://bugs.python.org/issue22087. - loop.run_until_complete(asyncio.sleep(0.1)) + loop.run_until_complete(asyncio.sleep(0.1, loop=loop)) def async_main(fork_exitcode, loop=None): - loop = loop or asyncio.get_event_loop() + loop = asyncio._wrap_loop(loop) # Since python2.7 does not support Process.sentinel, use Pipe to # monitor for process exit. @@ -48,7 +48,7 @@ class EventLoopInForkTestCase(TestCase): if not isinstance(initial_policy, DefaultEventLoopPolicy): asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) try: - loop = asyncio.get_event_loop() + loop = asyncio._wrap_loop() fork_exitcode = loop.create_future() # Make async_main fork while the loop is running, which would # trigger https://bugs.python.org/issue22087 with asyncio's diff --git a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py index e63829888..c2b468064 100644 --- a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py +++ b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py @@ -58,7 +58,7 @@ class ReaderPipeClosedTestCase(_PipeClosedTestCase, TestCase): if not isinstance(initial_policy, DefaultEventLoopPolicy): asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) - loop = asyncio.get_event_loop() + loop = asyncio._wrap_loop() read_end = os.fdopen(read_end, 'rb', 0) write_end = os.fdopen(write_end, 'wb', 0) try: @@ -95,7 +95,7 @@ class WriterPipeClosedTestCase(_PipeClosedTestCase, TestCase): if not isinstance(initial_policy, DefaultEventLoopPolicy): asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) - loop = asyncio.get_event_loop() + loop = asyncio._wrap_loop() read_end = os.fdopen(read_end, 'rb', 0) write_end = os.fdopen(write_end, 'wb', 0) try: diff --git a/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py b/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py index fc8f198ca..1a37e4922 100644 --- a/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py +++ b/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py @@ -13,7 +13,7 @@ class RunUntilCompleteTestCase(TestCase): asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) try: - loop = asyncio.get_event_loop() + loop = asyncio._wrap_loop() f1 = loop.create_future() f2 = loop.create_future() f1.add_done_callback(f2.set_result) diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py index 98983941d..8dc5fa7b9 100644 --- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py @@ -23,7 +23,7 @@ def reader(input_file, loop=None): @return: bytes @rtype: asyncio.Future (or compatible) """ - loop = loop or asyncio.get_event_loop() + loop = asyncio._wrap_loop(loop) future = loop.create_future() _Reader(future, input_file, loop) return future @@ -61,7 +61,7 @@ class SubprocessExecTestCase(TestCase): asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) try: - test(asyncio.get_event_loop()) + test(asyncio._wrap_loop()) finally: asyncio.set_event_loop_policy(initial_policy) diff --git a/pym/portage/util/futures/_asyncio/__init__.py b/pym/portage/util/futures/_asyncio/__init__.py index 9ae050874..e62de7a69 100644 --- a/pym/portage/util/futures/_asyncio/__init__.py +++ b/pym/portage/util/futures/_asyncio/__init__.py @@ -137,7 +137,7 @@ def sleep(delay, result=None, loop=None): @rtype: asyncio.Future (or compatible) @return: an instance of Future """ - loop = loop or get_event_loop() + loop = _wrap_loop(loop) future = loop.create_future() handle = loop.call_later(delay, future.set_result, result) def cancel_callback(future): @@ -145,3 +145,20 @@ def sleep(delay, result=None, loop=None): handle.cancel() future.add_done_callback(cancel_callback) return future + + +def _wrap_loop(loop=None): + """ + In order to deal with asyncio event loop compatibility issues, + use this function to wrap the loop parameter for functions + that support it. For example, since python3.4 does not have the + AbstractEventLoop.create_future() method, this helper function + can be used to add a wrapper that implements the create_future + method for python3.4. + + @type loop: asyncio.AbstractEventLoop (or compatible) + @param loop: event loop + @rtype: asyncio.AbstractEventLoop (or compatible) + @return: event loop + """ + return loop or get_event_loop() diff --git a/pym/portage/util/futures/_asyncio/tasks.py b/pym/portage/util/futures/_asyncio/tasks.py index 5f10d3c7b..b20765b7a 100644 --- a/pym/portage/util/futures/_asyncio/tasks.py +++ b/pym/portage/util/futures/_asyncio/tasks.py @@ -15,7 +15,10 @@ except ImportError: FIRST_COMPLETED ='FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' - +import portage +portage.proxy.lazyimport.lazyimport(globals(), + 'portage.util.futures:asyncio', +) from portage.util._eventloop.global_event_loop import ( global_event_loop as _global_event_loop, ) @@ -40,7 +43,7 @@ def wait(futures, loop=None, timeout=None, return_when=ALL_COMPLETED): @return: tuple of (done, pending). @rtype: asyncio.Future (or compatible) """ - loop = loop or _global_event_loop() + loop = asyncio._wrap_loop(loop) result_future = loop.create_future() _Waiter(futures, timeout, return_when, result_future, loop) return result_future diff --git a/pym/portage/util/futures/executor/fork.py b/pym/portage/util/futures/executor/fork.py index 276ed54f1..72844403c 100644 --- a/pym/portage/util/futures/executor/fork.py +++ b/pym/portage/util/futures/executor/fork.py @@ -13,7 +13,7 @@ import sys import traceback from portage.util._async.AsyncFunction import AsyncFunction -from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util.futures import asyncio class ForkExecutor(object): @@ -25,7 +25,7 @@ class ForkExecutor(object): """ def __init__(self, max_workers=None, loop=None): self._max_workers = max_workers or multiprocessing.cpu_count() - self._loop = loop or global_event_loop() + self._loop = asyncio._wrap_loop(loop) self._submit_queue = collections.deque() self._running_tasks = {} self._shutdown = False diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py index 231b7e3ab..31b5e0c78 100644 --- a/pym/portage/util/futures/iter_completed.py +++ b/pym/portage/util/futures/iter_completed.py @@ -6,7 +6,6 @@ import multiprocessing from portage.util._async.AsyncTaskFuture import AsyncTaskFuture from portage.util._async.TaskScheduler import TaskScheduler -from portage.util._eventloop.global_event_loop import global_event_loop from portage.util.futures import asyncio @@ -30,7 +29,7 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None): @return: iterator of futures that are done @rtype: iterator """ - loop = loop or global_event_loop() + loop = asyncio._wrap_loop(loop) for future_done_set in async_iter_completed(futures, max_jobs=max_jobs, max_load=max_load, loop=loop): @@ -60,7 +59,7 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): input futures that are done @rtype: iterator """ - loop = loop or global_event_loop() + loop = asyncio._wrap_loop(loop) max_jobs = max_jobs or multiprocessing.cpu_count() max_load = max_load or multiprocessing.cpu_count() @@ -133,7 +132,7 @@ def iter_gather(futures, max_jobs=None, max_load=None, loop=None): same order that they were yielded from the input iterator @rtype: asyncio.Future (or compatible) """ - loop = loop or global_event_loop() + loop = asyncio._wrap_loop(loop) result = loop.create_future() futures_list = [] diff --git a/pym/portage/util/futures/retry.py b/pym/portage/util/futures/retry.py index 82012d2f3..8a51669ff 100644 --- a/pym/portage/util/futures/retry.py +++ b/pym/portage/util/futures/retry.py @@ -9,7 +9,6 @@ __all__ = ( import functools from portage.exception import PortageException -from portage.util._eventloop.global_event_loop import global_event_loop from portage.util.futures import asyncio @@ -67,7 +66,7 @@ def _retry(loop, try_max, try_timeout, overall_timeout, delay_func, @return: func return value @rtype: asyncio.Future (or compatible) """ - loop = loop or global_event_loop() + loop = asyncio._wrap_loop(loop) future = loop.create_future() _Retry(future, loop, try_max, try_timeout, overall_timeout, delay_func, reraise, functools.partial(func, *args, **kwargs))