[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/_asyncio/, pym/portage/util/futures/
commit: 5e628787e6f4c720680aeeb8beeac88e37988a9e Author: Zac Medico gentoo org> AuthorDate: Wed May 9 07:33:37 2018 + Commit: Zac Medico gentoo org> CommitDate: Wed May 9 07:40:34 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=5e628787 DefaultEventLoopPolicy: raise NotImplementedError, not RecursionError Since the DefaultEventLoopPolicy wraps the underlying asyncio event loop policy, raise NotImplementedError if the current instance is set as the underlying event loop policy. This avoids a RecursionError that would flood the terminal with a large stack trace. pym/portage/util/futures/_asyncio/__init__.py | 6 +++--- pym/portage/util/futures/unix_events.py | 23 ++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/pym/portage/util/futures/_asyncio/__init__.py b/pym/portage/util/futures/_asyncio/__init__.py index 940da4762..acfd59396 100644 --- a/pym/portage/util/futures/_asyncio/__init__.py +++ b/pym/portage/util/futures/_asyncio/__init__.py @@ -32,7 +32,7 @@ except ImportError: import portage portage.proxy.lazyimport.lazyimport(globals(), - 'portage.util.futures.unix_events:DefaultEventLoopPolicy', + 'portage.util.futures.unix_events:_PortageEventLoopPolicy', ) from portage.util._eventloop.asyncio_event_loop import AsyncioEventLoop as _AsyncioEventLoop from portage.util._eventloop.global_event_loop import ( @@ -67,7 +67,7 @@ def get_event_loop_policy(): global _lock, _policy with _lock: if _policy is None: - _policy = DefaultEventLoopPolicy() + _policy = _PortageEventLoopPolicy() return _policy @@ -81,7 +81,7 @@ def set_event_loop_policy(policy): """ global _lock, _policy with _lock: - _policy = policy or DefaultEventLoopPolicy() + _policy = policy or _PortageEventLoopPolicy() def get_event_loop(): diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py index 8eb369f8b..3381eaa7d 100644 --- a/pym/portage/util/futures/unix_events.py +++ b/pym/portage/util/futures/unix_events.py @@ -681,4 +681,25 @@ class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy): return _global_event_loop()._asyncio_child_watcher -DefaultEventLoopPolicy = _PortageEventLoopPolicy +class _AsyncioEventLoopPolicy(_PortageEventLoopPolicy): + """ + A subclass of _PortageEventLoopPolicy which raises + NotImplementedError if it is set as the real asyncio event loop + policy, since this class is intended to *wrap* the real asyncio + event loop policy. + """ + def _check_recursion(self): + if _real_asyncio.get_event_loop_policy() is self: + raise NotImplementedError('this class is only a wrapper') + + def get_event_loop(self): + self._check_recursion() + return super(_AsyncioEventLoopPolicy, self).get_event_loop() + + def get_child_watcher(self): + self._check_recursion() + return super(_AsyncioEventLoopPolicy, self).get_child_watcher() + + +DefaultEventLoopPolicy = (_AsyncioEventLoopPolicy if _asyncio_enabled + else _PortageEventLoopPolicy)
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/_asyncio/, pym/portage/util/futures/
commit: 920b90fd0883dbd36f0290d08c9af49a208c2950 Author: Zac Medico gentoo org> AuthorDate: Wed May 9 04:59:59 2018 + Commit: Zac Medico gentoo org> CommitDate: Wed May 9 04:59:59 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=920b90fd _wrap_loop: default to global_event_loop behavior The default loop returned by _wrap_loop should be consistent with global_event_loop, in order to avoid accidental registration of callbacks with a loop that is not intended to run. Fixes 96cc07326391 ("global_event_loop: use asyncio event loop (bug 654390)") pym/portage/util/futures/_asyncio/__init__.py | 14 ++ pym/portage/util/futures/unix_events.py | 40 +-- 2 files changed, 10 insertions(+), 44 deletions(-) diff --git a/pym/portage/util/futures/_asyncio/__init__.py b/pym/portage/util/futures/_asyncio/__init__.py index 1273afa02..940da4762 100644 --- a/pym/portage/util/futures/_asyncio/__init__.py +++ b/pym/portage/util/futures/_asyncio/__init__.py @@ -35,7 +35,10 @@ portage.proxy.lazyimport.lazyimport(globals(), 'portage.util.futures.unix_events:DefaultEventLoopPolicy', ) from portage.util._eventloop.asyncio_event_loop import AsyncioEventLoop as _AsyncioEventLoop -from portage.util._eventloop.global_event_loop import _asyncio_enabled +from portage.util._eventloop.global_event_loop import ( + _asyncio_enabled, + global_event_loop as _global_event_loop, +) from portage.util.futures.futures import ( CancelledError, Future, @@ -168,14 +171,15 @@ def _wrap_loop(loop=None): @rtype: asyncio.AbstractEventLoop (or compatible) @return: event loop """ - return loop or get_event_loop() + return loop or _global_event_loop() if _asyncio_enabled: - get_event_loop_policy = _real_asyncio.get_event_loop_policy - set_event_loop_policy = _real_asyncio.set_event_loop_policy + # The default loop returned by _wrap_loop should be consistent + # with global_event_loop, in order to avoid accidental registration + # of callbacks with a loop that is not intended to run. def _wrap_loop(loop=None): - loop = loop or get_event_loop() + loop = loop or _global_event_loop() return (loop if hasattr(loop, '_asyncio_wrapper') else _AsyncioEventLoop(loop=loop)) diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py index ce520db00..8eb369f8b 100644 --- a/pym/portage/util/futures/unix_events.py +++ b/pym/portage/util/futures/unix_events.py @@ -681,42 +681,4 @@ class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy): return _global_event_loop()._asyncio_child_watcher -class _AsyncioEventLoopPolicy(_PortageEventLoopPolicy): - """ - Implementation of asyncio.AbstractEventLoopPolicy based on asyncio's - event loop. This supports running event loops in forks, - which is not supported by the default asyncio event loop policy, - see https://bugs.python.org/issue22087 and also - https://bugs.python.org/issue29703 which affects pypy3-5.10.1. - """ - _MAIN_PID = os.getpid() - - def __init__(self): - super(_AsyncioEventLoopPolicy, self).__init__() - self._default_policy = _real_asyncio.DefaultEventLoopPolicy() - - def get_event_loop(self): - """ - Get the event loop for the current context. - - Returns an event loop object implementing the AbstractEventLoop - interface. - - @rtype: asyncio.AbstractEventLoop (or compatible) - @return: the current event loop policy - """ - if os.getpid() == self._MAIN_PID: - return self._default_policy.get_event_loop() - else: - return super(_AsyncioEventLoopPolicy, self).get_event_loop() - - def get_child_watcher(self): - """Get the watcher for child processes.""" - if os.getpid() == self._MAIN_PID: - return self._default_policy.get_child_watcher() - else: - return super(_AsyncioEventLoopPolicy, self).get_child_watcher() - - -DefaultEventLoopPolicy = (_AsyncioEventLoopPolicy if _asyncio_enabled - else _PortageEventLoopPolicy) +DefaultEventLoopPolicy = _PortageEventLoopPolicy
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/, ...
commit: b3b15c451cc21a2c53638f0eacc8396e395dcab3 Author: Zac Medico gentoo org> AuthorDate: Mon May 7 06:24:22 2018 + Commit: Zac Medico gentoo org> CommitDate: Mon May 7 06:28:47 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=b3b15c45 retry: add loop parameter during decoration pym/portage/sync/modules/rsync/rsync.py | 2 +- pym/portage/tests/util/futures/test_retry.py | 16 pym/portage/util/futures/retry.py| 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pym/portage/sync/modules/rsync/rsync.py b/pym/portage/sync/modules/rsync/rsync.py index 1b8941ff6..070798a53 100644 --- a/pym/portage/sync/modules/rsync/rsync.py +++ b/pym/portage/sync/modules/rsync/rsync.py @@ -173,7 +173,7 @@ class RsyncSync(NewBase): loop = global_event_loop() func_coroutine = functools.partial(loop.run_in_executor, None, noisy_refresh_keys) - decorated_func = retry_decorator(func_coroutine) + decorated_func = retry_decorator(func_coroutine, loop=loop) loop.run_until_complete(decorated_func()) out.eend(0) except (GematoException, asyncio.TimeoutError) as e: diff --git a/pym/portage/tests/util/futures/test_retry.py b/pym/portage/tests/util/futures/test_retry.py index 16ecccbc7..7a1e76280 100644 --- a/pym/portage/tests/util/futures/test_retry.py +++ b/pym/portage/tests/util/futures/test_retry.py @@ -85,7 +85,7 @@ class RetryTestCase(TestCase): func_coroutine = self._wrap_coroutine_func(SucceedLater(1)) decorator = retry(try_max=, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine) + decorated_func = decorator(func_coroutine, loop=loop) result = loop.run_until_complete(decorated_func()) self.assertEqual(result, 'success') @@ -94,7 +94,7 @@ class RetryTestCase(TestCase): func_coroutine = self._wrap_coroutine_func(SucceedNever()) decorator = retry(try_max=4, try_timeout=None, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine) + decorated_func = decorator(func_coroutine, loop=loop) done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) self.assertEqual(len(done), 1) self.assertTrue(isinstance(done.pop().exception().__cause__, SucceedNeverException)) @@ -104,7 +104,7 @@ class RetryTestCase(TestCase): func_coroutine = self._wrap_coroutine_func(SucceedNever()) decorator = retry(reraise=True, try_max=4, try_timeout=None, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine) + decorated_func = decorator(func_coroutine, loop=loop) done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) self.assertEqual(len(done), 1) self.assertTrue(isinstance(done.pop().exception(), SucceedNeverException)) @@ -114,7 +114,7 @@ class RetryTestCase(TestCase): func_coroutine = self._wrap_coroutine_func(HangForever()) decorator = retry(try_max=2, try_timeout=0.1, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine) + decorated_func = decorator(func_coroutine, loop=loop) done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) self.assertEqual(len(done), 1) self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError)) @@ -124,7 +124,7 @@ class RetryTestCase(TestCase): func_coroutine = self._wrap_coroutine_func(HangForever()) decorator = retry(reraise=True, try_max=2, try_timeout=0.1, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine) + decorated_func = decorator(func_coroutine, loop=loop) done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) self.assertEqual(len(done), 1) self.assertTrue(isinstance(done.pop().exception(), asyncio.TimeoutError)) @@ -134,7 +134,7 @@ class RetryTestCase(TestCase): func_coroutine = self._wr
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/_asyncio/, pym/portage/tests/util/futures/asyncio/, ...
commit: 85ac23b7c0c58cef72d22281d66d086521c01e3e Author: Zac Medico gentoo org> AuthorDate: Sun May 6 11:05:03 2018 + Commit: Zac Medico gentoo org> CommitDate: Sun May 6 11:41:45 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=85ac23b7 asyncio: add _wrap_loop helper (bug 654390) In order to deal with asyncio event loop compatibility issues, add a _wrap_loop helper. For example, since python3.4 does not have the AbstractEventLoop.create_future() method, this helper function can be used to add a wrapper that implements the create_future method for python3.4. Bug: https://bugs.gentoo.org/654390 pym/portage/dbapi/porttree.py | 12 ++-- .../tests/util/futures/asyncio/test_child_watcher.py | 2 +- .../util/futures/asyncio/test_event_loop_in_fork.py | 8 .../tests/util/futures/asyncio/test_pipe_closed.py| 4 ++-- .../util/futures/asyncio/test_run_until_complete.py | 2 +- .../util/futures/asyncio/test_subprocess_exec.py | 4 ++-- pym/portage/util/futures/_asyncio/__init__.py | 19 ++- pym/portage/util/futures/_asyncio/tasks.py| 7 +-- pym/portage/util/futures/executor/fork.py | 4 ++-- pym/portage/util/futures/iter_completed.py| 7 +++ pym/portage/util/futures/retry.py | 3 +-- 11 files changed, 45 insertions(+), 27 deletions(-) diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py index 801b5658a..3e36024ff 100644 --- a/pym/portage/dbapi/porttree.py +++ b/pym/portage/dbapi/porttree.py @@ -36,7 +36,7 @@ from portage import _encodings from portage import _unicode_encode from portage import OrderedDict from portage.util._eventloop.EventLoop import EventLoop -from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util.futures import asyncio from portage.util.futures.iter_completed import iter_gather from _emerge.EbuildMetadataPhase import EbuildMetadataPhase @@ -325,8 +325,8 @@ class portdbapi(dbapi): @property def _event_loop(self): if portage._internal_caller: - # For internal portage usage, the global_event_loop is safe. - return global_event_loop() + # For internal portage usage, asyncio._wrap_loop() is safe. + return asyncio._wrap_loop() else: # For external API consumers, use a local EventLoop, since # we don't want to assume that it's safe to override the @@ -611,7 +611,7 @@ class portdbapi(dbapi): # to simultaneous instantiation of multiple event loops here. # Callers of this method certainly want the same event loop to # be used for all calls. - loop = loop or global_event_loop() + loop = asyncio._wrap_loop(loop) future = loop.create_future() cache_me = False if myrepo is not None: @@ -751,7 +751,7 @@ class portdbapi(dbapi): a set of alternative URIs. @rtype: asyncio.Future (or compatible) """ - loop = loop or global_event_loop() + loop = asyncio._wrap_loop(loop) result = loop.create_future() def aux_get_done(aux_get_future): @@ -1419,7 +1419,7 @@ def _async_manifest_fetchlist(portdb, repo_config, cp, cpv_list=None, @return: a Future resulting in a Mapping compatible with FetchlistDict @rtype: asyncio.Future (or compatible) """ - loop = loop or global_event_loop() + loop = asyncio._wrap_loop(loop) result = loop.create_future() cpv_list = (portdb.cp_list(cp, mytree=repo_config.location) if cpv_list is None else cpv_list) diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py index dca01be56..8ef497544 100644 --- a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py +++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py @@ -28,7 +28,7 @@ class ChildWatcherTestCase(TestCase): args_tuple = ('hello', 'world') - loop = asyncio.get_event_loop() + loop = asyncio._wrap_loop() future = loop.create_future() def callback(pid, returncode, *args): diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py index 7868d792a..19588bf3a 100644 --- a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py +++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py @@ -11,14 +11,14 @@ from portage.util.futures.unix_events import DefaultEventL
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/
commit: be61882996099322bb3a1e82e71f475b4141ad40 Author: Zac Medico gentoo org> AuthorDate: Tue Apr 24 23:28:08 2018 + Commit: Zac Medico gentoo org> CommitDate: Fri Apr 27 21:33:00 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=be618829 Add iter_gather function (bug 653946) This is similar to asyncio.gather, but takes an iterator of futures as input, and includes support for max_jobs and max_load parameters. For bug 653946, this will be used to asynchronously gather the results of the portdbapi.async_fetch_map calls that are required to generate a Manifest, while using the max_jobs parameter to limit the number of concurrent async_aux_get calls. Bug: https://bugs.gentoo.org/653946 pym/portage/util/futures/iter_completed.py | 73 ++ 1 file changed, 73 insertions(+) diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py index 5ad075305..1d6a9a4bd 100644 --- a/pym/portage/util/futures/iter_completed.py +++ b/pym/portage/util/futures/iter_completed.py @@ -112,3 +112,76 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): # cleanup in case of interruption by SIGINT, etc scheduler.cancel() scheduler.wait() + + +def iter_gather(futures, max_jobs=None, max_load=None, loop=None): + """ + This is similar to asyncio.gather, but takes an iterator of + futures as input, and includes support for max_jobs and max_load + parameters. + + @param futures: iterator of asyncio.Future (or compatible) + @type futures: iterator + @param max_jobs: max number of futures to process concurrently (default + is multiprocessing.cpu_count()) + @type max_jobs: int + @param max_load: max load allowed when scheduling a new future, + otherwise schedule no more than 1 future at a time (default + is multiprocessing.cpu_count()) + @type max_load: int or float + @param loop: event loop + @type loop: EventLoop + @return: a Future resulting in a list of done input futures, in the + same order that they were yielded from the input iterator + @rtype: asyncio.Future (or compatible) + """ + loop = loop or global_event_loop() + loop = getattr(loop, '_asyncio_wrapper', loop) + result = loop.create_future() + futures_list = [] + + def future_generator(): + for future in futures: + futures_list.append(future) + yield future + + completed_iter = async_iter_completed( + future_generator(), + max_jobs=max_jobs, + max_load=max_load, + loop=loop, + ) + + def handle_result(future_done_set): + if result.cancelled(): + if not future_done_set.cancelled(): + # All exceptions must be consumed from future_done_set, in order + # to avoid triggering the event loop's exception handler. + list(future.exception() for future in future_done_set.result() + if not future.cancelled()) + return + + try: + handle_result.current_task = next(completed_iter) + except StopIteration: + result.set_result(futures_list) + else: + handle_result.current_task.add_done_callback(handle_result) + + try: + handle_result.current_task = next(completed_iter) + except StopIteration: + handle_result.current_task = None + result.set_result(futures_list) + else: + handle_result.current_task.add_done_callback(handle_result) + + def cancel_callback(result): + if (result.cancelled() and + handle_result.current_task is not None and + not handle_result.current_task.done()): + handle_result.current_task.cancel() + + result.add_done_callback(cancel_callback) + + return result
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/
commit: a9e8ebaa6979ccf0bb385e457d695bedc7b65bf5 Author: Zac Medico gentoo org> AuthorDate: Tue Apr 17 09:51:36 2018 + Commit: Zac Medico gentoo org> CommitDate: Tue Apr 17 17:44:58 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=a9e8ebaa Add async_iter_completed for asyncio migration (bug 591760) This serves as a wrapper around portage's internal TaskScheduler class, allowing TaskScheduler API consumers to be migrated to use asyncio interfaces. Bug: https://bugs.gentoo.org/591760 .../tests/util/futures/test_iter_completed.py | 37 - pym/portage/util/futures/iter_completed.py | 61 +++--- 2 files changed, 91 insertions(+), 7 deletions(-) diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py index 9c23aefb1..1344523c6 100644 --- a/pym/portage/tests/util/futures/test_iter_completed.py +++ b/pym/portage/tests/util/futures/test_iter_completed.py @@ -5,7 +5,11 @@ import time from portage.tests import TestCase from portage.util._async.ForkProcess import ForkProcess from portage.util._eventloop.global_event_loop import global_event_loop -from portage.util.futures.iter_completed import iter_completed +from portage.util.futures import asyncio +from portage.util.futures.iter_completed import ( + iter_completed, + async_iter_completed, +) class SleepProcess(ForkProcess): @@ -48,3 +52,34 @@ class IterCompletedTestCase(TestCase): for seconds, future in zip(expected_order, iter_completed(future_generator(), max_jobs=True, max_load=None, loop=loop)): self.assertEqual(seconds, future.result()) + + def testAsyncCancel(self): + + loop = global_event_loop()._asyncio_wrapper + input_futures = set() + future_count = 3 + + def future_generator(): + for i in range(future_count): + future = loop.create_future() + loop.call_soon(lambda future: None if future.done() + else future.set_result(None), future) + input_futures.add(future) + yield future + + for future_done_set in async_iter_completed(future_generator(), + max_jobs=True, max_load=None, loop=loop): + future_done_set.cancel() + break + + # With max_jobs=True, async_iter_completed should have executed + # the generator until it raised StopIteration. + self.assertEqual(future_count, len(input_futures)) + + loop.run_until_complete(asyncio.wait(input_futures, loop=loop)) + + # The futures may have results or they may have been cancelled + # by TaskScheduler, and behavior varies depending on the python + # interpreter. + for future in input_futures: + future.cancelled() or future.result() diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py index 8d324de84..5ad075305 100644 --- a/pym/portage/util/futures/iter_completed.py +++ b/pym/portage/util/futures/iter_completed.py @@ -1,6 +1,7 @@ # Copyright 2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +import functools import multiprocessing from portage.util._async.AsyncTaskFuture import AsyncTaskFuture @@ -31,6 +32,38 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None): """ loop = loop or global_event_loop() loop = getattr(loop, '_asyncio_wrapper', loop) + + for future_done_set in async_iter_completed(futures, + max_jobs=max_jobs, max_load=max_load, loop=loop): + for future in loop.run_until_complete(future_done_set): + yield future + + +def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): + """ + An asynchronous version of iter_completed. This yields futures, which + when done, result in a set of input futures that are done. This serves + as a wrapper around portage's internal TaskScheduler class, using + standard asyncio interfaces. + + @param futures: iterator of asyncio.Future (or compatible) + @type futures: iterator + @param max_jobs: max number of futures to process concurrently (default + is multiprocessing.cpu_count()) + @type max_jobs: int + @param max_load: max load allowed when scheduling a new future, + otherwise schedule no more than 1 future at a time (default + is multiprocessing.cpu_count()) + @type max_load: int or float + @param loop: event loop + @type loop: EventLoop + @retur
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/
commit: 9a7b0a006e65f8683716d60574e4f19f8ffd603d Author: Zac Medico gentoo org> AuthorDate: Sat Apr 14 21:29:29 2018 + Commit: Zac Medico gentoo org> CommitDate: Mon Apr 16 00:04:26 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=9a7b0a00 Implement AbstractEventLoop.connect_read_pipe (bug 649588) In python versions that support asyncio, this allows API consumers to use subprocess.PIPE for asyncio.create_subprocess_exec() stdout and stderr parameters. Bug: https://bugs.gentoo.org/649588 .../util/futures/asyncio/test_subprocess_exec.py | 30 pym/portage/util/futures/unix_events.py| 157 - 2 files changed, 184 insertions(+), 3 deletions(-) diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py index d30f48c43..94984fc93 100644 --- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py @@ -2,6 +2,7 @@ # Distributed under the terms of the GNU General Public License v2 import os +import subprocess from portage.process import find_binary from portage.tests import TestCase @@ -161,3 +162,32 @@ class SubprocessExecTestCase(TestCase): f.close() self._run_test(test) + + def testReadTransport(self): + """ + Test asyncio.create_subprocess_exec(stdout=subprocess.PIPE) which + requires an AbstractEventLoop.connect_read_pipe implementation + (and a ReadTransport implementation for it to return). + """ + if not hasattr(asyncio, 'create_subprocess_exec'): + self.skipTest('create_subprocess_exec not implemented for python2') + + args_tuple = (b'hello', b'world') + echo_binary = find_binary("echo") + self.assertNotEqual(echo_binary, None) + echo_binary = echo_binary.encode() + + def test(loop): + with open(os.devnull, 'rb', 0) as devnull: + proc = loop.run_until_complete( + asyncio.create_subprocess_exec( + echo_binary, *args_tuple, + stdin=devnull, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) + + self.assertEqual( + tuple(loop.run_until_complete(proc.stdout.read()).split()), + args_tuple) + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) + + self._run_test(test) diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py index d788c2bea..9d84ab6aa 100644 --- a/pym/portage/util/futures/unix_events.py +++ b/pym/portage/util/futures/unix_events.py @@ -9,12 +9,18 @@ __all__ = ( try: from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher + from asyncio.transports import ReadTransport as _ReadTransport except ImportError: _AbstractChildWatcher = object _BaseSubprocessTransport = object + _ReadTransport = object +import errno +import fcntl import functools +import logging import os +import stat import subprocess from portage.util._eventloop.global_event_loop import ( @@ -82,6 +88,35 @@ class _PortageEventLoop(events.AbstractEventLoop): """ return asyncio.Task(coro, loop=self) + def connect_read_pipe(self, protocol_factory, pipe): + """ + Register read pipe in event loop. Set the pipe to non-blocking mode. + + @type protocol_factory: callable + @param protocol_factory: must instantiate object with Protocol interface + @type pipe: file + @param pipe: a pipe to read from + @rtype: asyncio.Future + @return: Return pair (transport, protocol), where transport supports the + ReadTransport interface. + """ + protocol = protocol_factory() + result = self.create_future() + waiter = self.create_future() + transport = self._make_read_pipe_transport(pipe, protocol, waiter=waiter) + + def waiter_callback(waiter): + try: + waiter.result() + except Exception as e: + transport.close() + result.set_exception(e) + else: + result.set_result((transport, protocol)) + +
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/util/_eventloop/
commit: 608663259a8a6fa78c32205389dfa58d00b6f11c Author: Zac Medico gentoo org> AuthorDate: Sun Apr 15 18:56:43 2018 + Commit: Zac Medico gentoo org> CommitDate: Sun Apr 15 18:58:00 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=60866325 Implement AbstractEventLoop.is_running() (bug 649588) Bug: https://bugs.gentoo.org/649588 pym/portage/util/_eventloop/EventLoop.py | 14 ++ pym/portage/util/futures/unix_events.py | 1 + 2 files changed, 15 insertions(+) diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index 13ce5478e..a928f3138 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -125,6 +125,10 @@ class EventLoop(object): self._poll_event_queue = [] self._poll_event_handlers = {} self._poll_event_handler_ids = {} + # Number of current calls to self.iteration(). A number greater + # than 1 indicates recursion, which is not supported by asyncio's + # default event loop. + self._iteration_depth = 0 # Increment id for each new handler. self._event_handler_id = 0 # New call_soon callbacks must have an opportunity to @@ -262,7 +266,13 @@ class EventLoop(object): @rtype: bool @return: True if events were dispatched. """ + self._iteration_depth += 1 + try: + return self._iteration(*args) + finally: + self._iteration_depth -= 1 + def _iteration(self, *args): may_block = True if args: @@ -822,6 +832,10 @@ class EventLoop(object): self._default_executor = executor return executor.submit(func, *args) + def is_running(self): + """Return whether the event loop is currently running.""" + return self._iteration_depth > 0 + def is_closed(self): """Returns True if the event loop was closed.""" return self._poll_obj is None diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py index 1abc420e1..d788c2bea 100644 --- a/pym/portage/util/futures/unix_events.py +++ b/pym/portage/util/futures/unix_events.py @@ -43,6 +43,7 @@ class _PortageEventLoop(events.AbstractEventLoop): self.call_soon_threadsafe = loop.call_soon_threadsafe self.call_later = loop.call_later self.call_at = loop.call_at + self.is_running = loop.is_running self.is_closed = loop.is_closed self.close = loop.close self.create_future = loop.create_future
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/
commit: d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2 Author: Zac Medico gentoo org> AuthorDate: Thu Apr 12 03:56:25 2018 + Commit: Zac Medico gentoo org> CommitDate: Fri Apr 13 07:10:10 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=d31db4df Implement _PortageEventLoop.subprocess_exec (bug 649588) In python versions that support asyncio, this allows API consumers to use the asyncio.create_subprocess_exec() function with portage's internal event loop. Currently, subprocess.PIPE is not implemented because that would require an implementation of asyncio's private asyncio.unix_events._UnixReadPipeTransport class. However, it's possible to use pipes created with os.pipe() for stdin, stdout, and stderr, as demonstrated in the included unit tests. Bug: https://bugs.gentoo.org/649588 .../util/futures/asyncio/test_subprocess_exec.py | 163 + pym/portage/util/futures/unix_events.py| 98 + 2 files changed, 261 insertions(+) diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py new file mode 100644 index 0..d30f48c43 --- /dev/null +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py @@ -0,0 +1,163 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import os + +from portage.process import find_binary +from portage.tests import TestCase +from portage.util.futures import asyncio +from portage.util.futures.executor.fork import ForkExecutor +from portage.util.futures.unix_events import DefaultEventLoopPolicy +from _emerge.PipeReader import PipeReader + + +def reader(input_file, loop=None): + """ + Asynchronously read a binary input file. + + @param input_file: binary input file + @type input_file: file + @param loop: event loop + @type loop: EventLoop + @return: bytes + @rtype: asyncio.Future (or compatible) + """ + loop = loop or asyncio.get_event_loop() + loop = getattr(loop, '_asyncio_wrapper', loop) + future = loop.create_future() + _Reader(future, input_file, loop) + return future + + +class _Reader(object): + def __init__(self, future, input_file, loop): + self._future = future + self._pipe_reader = PipeReader( + input_files={'input_file':input_file}, scheduler=loop._loop) + + self._future.add_done_callback(self._cancel_callback) + self._pipe_reader.addExitListener(self._eof) + self._pipe_reader.start() + + def _cancel_callback(self, future): + if future.cancelled(): + self._cancel() + + def _eof(self, pipe_reader): + self._pipe_reader = None + self._future.set_result(pipe_reader.getvalue()) + + def _cancel(self): + if self._pipe_reader is not None and self._pipe_reader.poll() is None: + self._pipe_reader.removeExitListener(self._eof) + self._pipe_reader.cancel() + self._pipe_reader = None + + +class SubprocessExecTestCase(TestCase): + def _run_test(self, test): + initial_policy = asyncio.get_event_loop_policy() + if not isinstance(initial_policy, DefaultEventLoopPolicy): + asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) + + try: + test(asyncio.get_event_loop()) + finally: + asyncio.set_event_loop_policy(initial_policy) + + def testEcho(self): + if not hasattr(asyncio, 'create_subprocess_exec'): + self.skipTest('create_subprocess_exec not implemented for python2') + + args_tuple = (b'hello', b'world') + echo_binary = find_binary("echo") + self.assertNotEqual(echo_binary, None) + echo_binary = echo_binary.encode() + + # Use os.pipe(), since this loop does not implement the + # ReadTransport necessary for subprocess.PIPE support. + stdout_pr, stdout_pw = os.pipe() + stdout_pr = os.fdopen(stdout_pr, 'rb', 0) + stdout_pw = os.fdopen(stdout_pw, 'wb', 0) + files = [stdout_pr, stdout_pw] + + def test(loop): + output = None + try: + with open(os.devnull, 'rb', 0) as devnull: + proc = loop.run_until_complete( + asyncio.create_subprocess_exec( + echo_binary, *args_tuple, + stdin=devnull, stdout=stdout_pw, stderr=stdout_pw)) + +
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/util/_eventloop/
commit: 449b7b9f30c869781f7012ca53e9cda4efef6f9b Author: Zac Medico gentoo org> AuthorDate: Thu Apr 12 19:28:47 2018 + Commit: Zac Medico gentoo org> CommitDate: Thu Apr 12 19:28:47 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=449b7b9f Implement AbstractEventLoop.call_exception_handler (bug 649588) When using asyncio with _PortageEventLoopPolicy, this method is required in order to avoid a NotImplementedError if a coroutine raises an unexpected exception. Bug: https://bugs.gentoo.org/649588 pym/portage/util/_eventloop/EventLoop.py | 68 pym/portage/util/futures/unix_events.py | 2 + 2 files changed, 70 insertions(+) diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index 12c199c76..13ce5478e 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -9,6 +9,7 @@ import os import select import signal import sys +import traceback try: import fcntl @@ -842,6 +843,73 @@ class EventLoop(object): close() self._poll_obj = None + def default_exception_handler(self, context): + """ + Default exception handler. + + This is called when an exception occurs and no exception + handler is set, and can be called by a custom exception + handler that wants to defer to the default behavior. + + The context parameter has the same meaning as in + `call_exception_handler()`. + + @param context: exception context + @type context: dict + """ + message = context.get('message') + if not message: + message = 'Unhandled exception in event loop' + + exception = context.get('exception') + if exception is not None: + exc_info = (type(exception), exception, exception.__traceback__) + else: + exc_info = False + + log_lines = [message] + for key in sorted(context): + if key in {'message', 'exception'}: + continue + value = context[key] + if key == 'source_traceback': + tb = ''.join(traceback.format_list(value)) + value = 'Object created at (most recent call last):\n' + value += tb.rstrip() + elif key == 'handle_traceback': + tb = ''.join(traceback.format_list(value)) + value = 'Handle created at (most recent call last):\n' + value += tb.rstrip() + else: + value = repr(value) + log_lines.append('{}: {}'.format(key, value)) + + logging.error('\n'.join(log_lines), exc_info=exc_info) + os.kill(os.getpid(), signal.SIGTERM) + + def call_exception_handler(self, context): + """ + Call the current event loop's exception handler. + + The context argument is a dict containing the following keys: + + - 'message': Error message; + - 'exception' (optional): Exception object; + - 'future' (optional): Future instance; + - 'handle' (optional): Handle instance; + - 'protocol' (optional): Protocol instance; + - 'transport' (optional): Transport instance; + - 'socket' (optional): Socket instance; + - 'asyncgen' (optional): Asynchronous generator that caused + the exception. + + New keys may be introduced in the future. + + @param context: exception context + @type context: dict + """ + self.default_exception_handler(context) + def get_debug(self): """ Get the debug mode (bool) of the event loop. diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py index 6fcef45fa..5434cd942 100644 --- a/pym/portage/util/futures/unix_events.py +++ b/pym/portage/util/futures/unix_events.py @@ -48,6 +48,8 @@ class _PortageEventLoop(events.AbstractEventLoop): self.remove_writer = loop.remove_writer self.run_in_executor = loop.run_in_executor self.time = loop.time + self.default_exception_handler = loop.default_exception_handler + self.call_exception_handler = loop.call_exception_handler self.set_debug = loop.set_debug self.get_debug = loop.get_debug
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/, ...
commit: a78dca7e47f79ad48aee4909ee10688604996b86 Author: Zac Medico gentoo org> AuthorDate: Wed Apr 11 06:44:41 2018 + Commit: Zac Medico gentoo org> CommitDate: Thu Apr 12 08:35:05 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=a78dca7e Implement AbstractEventLoopPolicy.get_child_watcher() (bug 649588) Use a _PortageChildWatcher class to wrap portage's internal event loop and implement asyncio's AbstractChildWatcher interface. Bug: https://bugs.gentoo.org/649588 .../util/futures/asyncio/test_child_watcher.py | 45 +++ pym/portage/util/_eventloop/EventLoop.py | 7 +- pym/portage/util/futures/_asyncio.py | 13 pym/portage/util/futures/unix_events.py| 90 ++ 4 files changed, 152 insertions(+), 3 deletions(-) diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py new file mode 100644 index 0..dca01be56 --- /dev/null +++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py @@ -0,0 +1,45 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import os + +from portage.process import find_binary, spawn +from portage.tests import TestCase +from portage.util.futures import asyncio +from portage.util.futures.unix_events import DefaultEventLoopPolicy + + +class ChildWatcherTestCase(TestCase): + def testChildWatcher(self): + true_binary = find_binary("true") + self.assertNotEqual(true_binary, None) + + initial_policy = asyncio.get_event_loop_policy() + if not isinstance(initial_policy, DefaultEventLoopPolicy): + asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) + + try: + try: + asyncio.set_child_watcher(None) + except NotImplementedError: + pass + else: + self.assertTrue(False) + + args_tuple = ('hello', 'world') + + loop = asyncio.get_event_loop() + future = loop.create_future() + + def callback(pid, returncode, *args): + future.set_result((pid, returncode, args)) + + with asyncio.get_child_watcher() as watcher: + pids = spawn([true_binary], returnpid=True) + watcher.add_child_handler(pids[0], callback, *args_tuple) + + self.assertEqual( + loop.run_until_complete(future), + (pids[0], os.EX_OK, args_tuple)) + finally: + asyncio.set_event_loop_policy(initial_policy) diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index d53a76ba1..12c199c76 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -25,7 +25,7 @@ import portage portage.proxy.lazyimport.lazyimport(globals(), 'portage.util.futures.futures:Future', 'portage.util.futures.executor.fork:ForkExecutor', - 'portage.util.futures.unix_events:_PortageEventLoop', + 'portage.util.futures.unix_events:_PortageEventLoop,_PortageChildWatcher', ) from portage import OrderedDict @@ -190,6 +190,7 @@ class EventLoop(object): self._sigchld_src_id = None self._pid = os.getpid() self._asyncio_wrapper = _PortageEventLoop(loop=self) + self._asyncio_child_watcher = _PortageChildWatcher(self) def create_future(self): """ @@ -424,8 +425,8 @@ class EventLoop(object): self._sigchld_read, self.IO_IN, self._sigchld_io_cb) signal.signal(signal.SIGCHLD, self._sigchld_sig_cb) - # poll now, in case the SIGCHLD has already arrived - self._poll_child_processes() + # poll soon, in case the SIGCHLD has already arrived + self.call_soon(self._poll_child_processes) return source_id def _sigchld_sig_cb(self, signum, frame): diff --git a/pym/portage/util/futures/_asyncio.py b/pym/portage/util/futures/_asyncio.py index 02ab5..0f84f14b7 100644 --- a/pym/portage/util/futures/_asyncio.py +++ b/pym/portage/util/futures/_asyncio.py @@ -3,7 +3,9 @@ __all__ = ( 'ensure_future', + 'get_child_watcher', 'get_event_loop', + 'set_child_watcher', 'get_event_loop_policy', 'set_event_loop_policy', 'sleep', @@ -62,6 +64,17 @@ def get_event_loop(): return get_event_loop_policy().get_event_loop() +def get
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/executor/
commit: ae59758e395393601a389ede4f4b521db6786139 Author: Zac Medico gentoo org> AuthorDate: Thu Apr 12 08:25:17 2018 + Commit: Zac Medico gentoo org> CommitDate: Thu Apr 12 08:30:49 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=ae59758e ForkExecutor: fix shutdown to handle empty self._running_tasks pym/portage/util/futures/executor/fork.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pym/portage/util/futures/executor/fork.py b/pym/portage/util/futures/executor/fork.py index 919a72bfd..51367f934 100644 --- a/pym/portage/util/futures/executor/fork.py +++ b/pym/portage/util/futures/executor/fork.py @@ -96,6 +96,8 @@ class ForkExecutor(object): def shutdown(self, wait=True): self._shutdown = True + if not self._running_tasks and not self._shutdown_future.done(): + self._shutdown_future.set_result(None) if wait: self._loop.run_until_complete(self._shutdown_future)
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/executor/
commit: 52d4689740f5f9fcf1cf7423e3fe4089dbb4c718 Author: Zac Medico gentoo org> AuthorDate: Wed Apr 11 02:01:43 2018 + Commit: Zac Medico gentoo org> CommitDate: Wed Apr 11 02:01:43 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=52d46897 ForkExecutor: support asyncio via _PortageEventLoopPolicy (bug 649588) Support portage's internal EventLoop as well as the _PortageEventLoop asyncio compatibility wrapper, by using the respective _loop and _asyncio_wrapper attributes where appropriate. Bug: https://bugs.gentoo.org/649588 pym/portage/util/futures/executor/fork.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pym/portage/util/futures/executor/fork.py b/pym/portage/util/futures/executor/fork.py index 496b4e892..919a72bfd 100644 --- a/pym/portage/util/futures/executor/fork.py +++ b/pym/portage/util/futures/executor/fork.py @@ -25,7 +25,8 @@ class ForkExecutor(object): """ def __init__(self, max_workers=None, loop=None): self._max_workers = max_workers or multiprocessing.cpu_count() - self._loop = loop or global_event_loop() + loop = loop or global_event_loop() + self._loop = getattr(loop, '_asyncio_wrapper', loop) self._submit_queue = collections.deque() self._running_tasks = {} self._shutdown = False @@ -53,7 +54,7 @@ class ForkExecutor(object): future, proc = self._submit_queue.popleft() future.add_done_callback(functools.partial(self._cancel_cb, proc)) proc.addExitListener(functools.partial(self._proc_exit, future)) - proc.scheduler = self._loop + proc.scheduler = self._loop._loop proc.start() self._running_tasks[id(proc)] = proc
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/util/_eventloop/, ...
commit: 142d08c0636b172fbc00a7f2b10dc07479a57e2d Author: Zac Medico gentoo org> AuthorDate: Sun Mar 4 20:10:55 2018 + Commit: Zac Medico gentoo org> CommitDate: Wed Apr 11 01:44:34 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=142d08c0 Add minimal asyncio.AbstractEventLoop implementation (bug 649588) This provides minimal interoperability with existing asyncio code, by adding a portage.util.futures.unix_events.DefaultEventLoopPolicy class that makes asyncio use portage's internal event loop when an instance is passed into asyncio.set_event_loop_policy(). The get_event_loop() method of this policy returns an instance of a _PortageEventLoop class that wraps portage's internal event loop and implements asyncio's AbstractEventLoop interface. The portage.util.futures.asyncio module refers to the real asyncio module when available, and otherwise falls back to a minimal implementation that works with python2.7. The included EventLoopInForkTestCase demonstrates usage, and works with all supported versions of python, include python2.7. In python3.4 and later, API consumers can use asyncio coroutines, since _PortageEventLoop is compatible with asyncio.Task! Bug: https://bugs.gentoo.org/649588 .../util/futures/asyncio}/__init__.py | 0 .../util/futures/asyncio/__test__.py} | 0 .../futures/asyncio/test_event_loop_in_fork.py | 59 +++ pym/portage/util/_eventloop/EventLoop.py | 11 +- pym/portage/util/futures/__init__.py | 11 ++ pym/portage/util/futures/_asyncio.py | 114 pym/portage/util/futures/events.py | 191 + pym/portage/util/futures/futures.py| 9 +- pym/portage/util/futures/unix_events.py| 91 ++ 9 files changed, 477 insertions(+), 9 deletions(-) diff --git a/pym/portage/util/futures/__init__.py b/pym/portage/tests/util/futures/asyncio/__init__.py similarity index 100% copy from pym/portage/util/futures/__init__.py copy to pym/portage/tests/util/futures/asyncio/__init__.py diff --git a/pym/portage/util/futures/__init__.py b/pym/portage/tests/util/futures/asyncio/__test__.py similarity index 100% copy from pym/portage/util/futures/__init__.py copy to pym/portage/tests/util/futures/asyncio/__test__.py diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py new file mode 100644 index 0..7868d792a --- /dev/null +++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py @@ -0,0 +1,59 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import multiprocessing +import os + +from portage.tests import TestCase +from portage.util.futures import asyncio +from portage.util.futures.unix_events import DefaultEventLoopPolicy + + +def fork_main(parent_conn, child_conn): + parent_conn.close() + loop = asyncio.get_event_loop() + # This fails with python's default event loop policy, + # see https://bugs.python.org/issue22087. + loop.run_until_complete(asyncio.sleep(0.1)) + + +def async_main(fork_exitcode, loop=None): + loop = loop or asyncio.get_event_loop() + + # Since python2.7 does not support Process.sentinel, use Pipe to + # monitor for process exit. + parent_conn, child_conn = multiprocessing.Pipe() + + def eof_callback(proc): + loop.remove_reader(parent_conn.fileno()) + parent_conn.close() + proc.join() + fork_exitcode.set_result(proc.exitcode) + + proc = multiprocessing.Process(target=fork_main, args=(parent_conn, child_conn)) + loop.add_reader(parent_conn.fileno(), eof_callback, proc) + proc.start() + child_conn.close() + + +class EventLoopInForkTestCase(TestCase): + """ + The default asyncio event loop policy does not support loops + running in forks, see https://bugs.python.org/issue22087. + Portage's DefaultEventLoopPolicy supports forks. + """ + + def testEventLoopInForkTestCase(self): + initial_policy = asyncio.get_event_loop_policy() + if not isinstance(initial_policy, DefaultEventLoopPolicy): + asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) + try: + loop = asyncio.get_event_loop() + fork_exitcode = loop.create_future() + # Make async_main fork while the loop is running, which would + # trigger https://bugs.python.org/issue22087 with asyncio's + # default event loop policy. + loop.call_soon(async_main, fork_exitcode) + assert loop.run_until_complete(fork_exitcode) == os.EX_OK + finally: +
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/executor/, pym/portage/util/_eventloop/
commit: 4095be74985c5c2eead5fb480cf37baa11308d62 Author: Zac Medico gentoo org> AuthorDate: Wed Mar 14 08:01:26 2018 + Commit: Zac Medico gentoo org> CommitDate: Mon Apr 2 16:53:23 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=4095be74 Add ForkExecutor (bug 649588) This is useful for asynchronous operations that we might need to cancel if they take too long, since (concurrent. futures.ProcessPoolExecutor tasks are not cancellable). The ability to cancel tasks makes this executor useful as an alternative to portage.exception.AlarmSignal. Also add an asyncio-compatible EventLoop.run_in_executor method that uses ForkExecutor as the default executor, which will later be used to implement the corresponding asyncio.AbstractEventLoop run_in_executor method. Bug: https://bugs.gentoo.org/649588 Reviewed-by: Alec Warner gentoo.org> pym/portage/util/_eventloop/EventLoop.py | 45 - pym/portage/util/futures/executor/__init__.py | 0 pym/portage/util/futures/executor/fork.py | 134 ++ 3 files changed, 178 insertions(+), 1 deletion(-) diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index f472a3dae..1574a6837 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -24,6 +24,7 @@ except ImportError: import portage portage.proxy.lazyimport.lazyimport(globals(), 'portage.util.futures.futures:_EventLoopFuture', + 'portage.util.futures.executor.fork:ForkExecutor', ) from portage import OrderedDict @@ -122,6 +123,7 @@ class EventLoop(object): self._idle_callbacks = OrderedDict() self._timeout_handlers = {} self._timeout_interval = None + self._default_executor = None self._poll_obj = None try: @@ -721,6 +723,46 @@ class EventLoop(object): return self._handle(self.timeout_add( delay * 1000, self._call_soon_callback(callback, args)), self) + def run_in_executor(self, executor, func, *args): + """ + Arrange for a func to be called in the specified executor. + + The executor argument should be an Executor instance. The default + executor is used if executor is None. + + Use functools.partial to pass keywords to the *func*. + + @param executor: executor + @type executor: concurrent.futures.Executor or None + @param func: a function to call + @type func: callable + @return: a Future + @rtype: asyncio.Future (or compatible) + """ + if executor is None: + executor = self._default_executor + if executor is None: + executor = ForkExecutor(loop=self) + self._default_executor = executor + return executor.submit(func, *args) + + def close(self): + """Close the event loop. + + This clears the queues and shuts down the executor, + and waits for it to finish. + """ + executor = self._default_executor + if executor is not None: + self._default_executor = None + executor.shutdown(wait=True) + + if self._poll_obj is not None: + close = getattr(self._poll_obj, 'close') + if close is not None: + close() + self._poll_obj = None + _can_poll_device = None @@ -782,10 +824,11 @@ class _epoll_adapter(object): that is associated with an epoll instance will close automatically when it is garbage collected, so it's not necessary to close it explicitly. """ - __slots__ = ('_epoll_obj',) + __slots__ = ('_epoll_obj', 'close') def __init__(self, epoll_obj): self._epoll_obj = epoll_obj + self.close = epoll_obj.close def register(self, fd, *args): self._epoll_obj.register(fd, *args) diff --git a/pym/portage/util/futures/executor/__init__.py b/pym/portage/util/futures/executor/__init__.py new file mode 100644 index 0..e69de29bb diff --git a/pym/portage/util/futures/executor/fork.py b/pym/portage/util/futures/executor/fork.py new file mode 100644 index 0..496b4e892 --- /dev/null +++ b/pym/portage/util/futures/executor/fork.py @@ -0,0 +1,134 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +__all__ = ( + 'ForkExecutor', +) + +import collections +import functools +import multiprocessing +import os +import sys +import traceback + +from portage.util._async.AsyncFunction import AsyncFun
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/
commit: 71c59145e0c7b631ec3f41e0d711445786d16f8f Author: Zac Medico gentoo org> AuthorDate: Thu Mar 15 02:45:31 2018 + Commit: Zac Medico gentoo org> CommitDate: Thu Mar 15 02:49:02 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=71c59145 portage.util.futures.wait: fix arguments for asyncio compat The bare "*" is not supported in python2.7, and in python3 the bare "*" means that keyword arguments must be used for the arguments that follow. Fixes: e43f6c583ed9 ("Add iter_completed convenience function (bug 648790)") pym/portage/util/futures/iter_completed.py | 2 +- pym/portage/util/futures/wait.py | 16 +++- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py index 1050b6fa7..ad6275b49 100644 --- a/pym/portage/util/futures/iter_completed.py +++ b/pym/portage/util/futures/iter_completed.py @@ -52,7 +52,7 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None): # task_generator is exhausted while future_map: done, pending = loop.run_until_complete( - wait(*list(future_map.values()), return_when=FIRST_COMPLETED)) + wait(list(future_map.values()), return_when=FIRST_COMPLETED)) for future in done: del future_map[id(future)] yield future diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/wait.py index 3f0bdbff5..bd85bb053 100644 --- a/pym/portage/util/futures/wait.py +++ b/pym/portage/util/futures/wait.py @@ -11,15 +11,13 @@ except ImportError: from portage.util._eventloop.global_event_loop import global_event_loop -# Use **kwargs since python2.7 does not allow arguments with defaults -# to follow *futures. -def wait(*futures, **kwargs): +def wait(futures, loop=None, timeout=None, return_when=ALL_COMPLETED): """ Use portage's internal EventLoop to emulate asyncio.wait: https://docs.python.org/3/library/asyncio-task.html#asyncio.wait - @param future: future to wait for - @type future: asyncio.Future (or compatible) + @param futures: futures to wait for + @type futures: asyncio.Future (or compatible) @param timeout: number of seconds to wait (wait indefinitely if not specified) @type timeout: int or float @@ -32,14 +30,6 @@ def wait(*futures, **kwargs): @return: tuple of (done, pending). @rtype: asyncio.Future (or compatible) """ - if not futures: - raise TypeError("wait() missing 1 required positional argument: 'future'") - loop = kwargs.pop('loop', None) - timeout = kwargs.pop('timeout', None) - return_when = kwargs.pop('return_when', ALL_COMPLETED) - if kwargs: - raise TypeError("wait() got an unexpected keyword argument '{}'".\ - format(next(iter(kwargs loop = loop or global_event_loop() result_future = loop.create_future() _Waiter(futures, timeout, return_when, result_future, loop)
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/
commit: d90ff7c046b39de82558655375ea104cfa19b176 Author: Zac Medico gentoo org> AuthorDate: Sun Mar 26 20:08:54 2017 + Commit: Zac Medico gentoo org> CommitDate: Sun Mar 26 20:12:04 2017 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=d90ff7c0 _EventLoopFuture: reduce indent of class body (whitespace only) pym/portage/util/futures/futures.py | 278 ++-- 1 file changed, 139 insertions(+), 139 deletions(-) diff --git a/pym/portage/util/futures/futures.py b/pym/portage/util/futures/futures.py index dd913a1e3..dcf593c01 100644 --- a/pym/portage/util/futures/futures.py +++ b/pym/portage/util/futures/futures.py @@ -42,148 +42,148 @@ _CANCELLED = 'CANCELLED' _FINISHED = 'FINISHED' class _EventLoopFuture(object): + """ + This class provides (a subset of) the asyncio.Future interface, for + use with the EventLoop class, because EventLoop is currently + missing some of the asyncio.AbstractEventLoop methods that + asyncio.Future requires. + """ + + # Class variables serving as defaults for instance variables. + _state = _PENDING + _result = None + _exception = None + _loop = None + + def __init__(self, loop=None): + """Initialize the future. + + The optional loop argument allows explicitly setting the event + loop object used by the future. If it's not provided, the future uses + the default event loop. """ - This class provides (a subset of) the asyncio.Future interface, for - use with the EventLoop class, because EventLoop is currently - missing some of the asyncio.AbstractEventLoop methods that - asyncio.Future requires. + if loop is None: + self._loop = global_event_loop() + else: + self._loop = loop + self._callbacks = [] + + def cancel(self): + """Cancel the future and schedule callbacks. + + If the future is already done or cancelled, return False. Otherwise, + change the future's state to cancelled, schedule the callbacks and + return True. """ + if self._state != _PENDING: + return False + self._state = _CANCELLED + self._schedule_callbacks() + return True - # Class variables serving as defaults for instance variables. - _state = _PENDING - _result = None - _exception = None - _loop = None - - def __init__(self, loop=None): - """Initialize the future. - - The optional loop argument allows explicitly setting the event - loop object used by the future. If it's not provided, the future uses - the default event loop. - """ - if loop is None: - self._loop = global_event_loop() - else: - self._loop = loop - self._callbacks = [] - - def cancel(self): - """Cancel the future and schedule callbacks. - - If the future is already done or cancelled, return False. Otherwise, - change the future's state to cancelled, schedule the callbacks and - return True. - """ - if self._state != _PENDING: - return False - self._state = _CANCELLED - self._schedule_callbacks() - return True - - def _schedule_callbacks(self): - """Internal: Ask the event loop to call all callbacks. - - The callbacks are scheduled to be called as soon as possible. Also - clears the callback list. - """ - callbacks = self._callbacks[:] - if not callbacks: - return - - self._callbacks[:] = [] - for callback in callbacks: - self._loop.call_soon(callback, self) - - def cancelled(self): - """Return True if the future was cancelled.""" - return self._state == _CANCELLED - - def done(self): - """Return True if the future is done. - - Done means either that a result / exception are available, or that the - future was cancelled. - """ - return self._state !=