[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/_asyncio/, pym/portage/util/futures/

2018-05-09 Thread Zac Medico
commit: 5e628787e6f4c720680aeeb8beeac88e37988a9e
Author: Zac Medico  gentoo  org>
AuthorDate: Wed May  9 07:33:37 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Wed May  9 07:40:34 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=5e628787

DefaultEventLoopPolicy: raise NotImplementedError, not RecursionError

Since the DefaultEventLoopPolicy wraps the underlying asyncio event
loop policy, raise NotImplementedError if the current instance is set
as the underlying event loop policy. This avoids a RecursionError
that would flood the terminal with a large stack trace.

 pym/portage/util/futures/_asyncio/__init__.py |  6 +++---
 pym/portage/util/futures/unix_events.py   | 23 ++-
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/pym/portage/util/futures/_asyncio/__init__.py 
b/pym/portage/util/futures/_asyncio/__init__.py
index 940da4762..acfd59396 100644
--- a/pym/portage/util/futures/_asyncio/__init__.py
+++ b/pym/portage/util/futures/_asyncio/__init__.py
@@ -32,7 +32,7 @@ except ImportError:
 
 import portage
 portage.proxy.lazyimport.lazyimport(globals(),
-   'portage.util.futures.unix_events:DefaultEventLoopPolicy',
+   'portage.util.futures.unix_events:_PortageEventLoopPolicy',
 )
 from portage.util._eventloop.asyncio_event_loop import AsyncioEventLoop as 
_AsyncioEventLoop
 from portage.util._eventloop.global_event_loop import (
@@ -67,7 +67,7 @@ def get_event_loop_policy():
global _lock, _policy
with _lock:
if _policy is None:
-   _policy = DefaultEventLoopPolicy()
+   _policy = _PortageEventLoopPolicy()
return _policy
 
 
@@ -81,7 +81,7 @@ def set_event_loop_policy(policy):
"""
global _lock, _policy
with _lock:
-   _policy = policy or DefaultEventLoopPolicy()
+   _policy = policy or _PortageEventLoopPolicy()
 
 
 def get_event_loop():

diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
index 8eb369f8b..3381eaa7d 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -681,4 +681,25 @@ class 
_PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
return _global_event_loop()._asyncio_child_watcher
 
 
-DefaultEventLoopPolicy = _PortageEventLoopPolicy
+class _AsyncioEventLoopPolicy(_PortageEventLoopPolicy):
+   """
+   A subclass of _PortageEventLoopPolicy which raises
+   NotImplementedError if it is set as the real asyncio event loop
+   policy, since this class is intended to *wrap* the real asyncio
+   event loop policy.
+   """
+   def _check_recursion(self):
+   if _real_asyncio.get_event_loop_policy() is self:
+   raise NotImplementedError('this class is only a 
wrapper')
+
+   def get_event_loop(self):
+   self._check_recursion()
+   return super(_AsyncioEventLoopPolicy, self).get_event_loop()
+
+   def get_child_watcher(self):
+   self._check_recursion()
+   return super(_AsyncioEventLoopPolicy, self).get_child_watcher()
+
+
+DefaultEventLoopPolicy = (_AsyncioEventLoopPolicy if _asyncio_enabled
+   else _PortageEventLoopPolicy)



[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/_asyncio/, pym/portage/util/futures/

2018-05-08 Thread Zac Medico
commit: 920b90fd0883dbd36f0290d08c9af49a208c2950
Author: Zac Medico  gentoo  org>
AuthorDate: Wed May  9 04:59:59 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Wed May  9 04:59:59 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=920b90fd

_wrap_loop: default to global_event_loop behavior

The default loop returned by _wrap_loop should be consistent
with global_event_loop, in order to avoid accidental registration
of callbacks with a loop that is not intended to run.

Fixes 96cc07326391 ("global_event_loop: use asyncio event loop (bug 654390)")

 pym/portage/util/futures/_asyncio/__init__.py | 14 ++
 pym/portage/util/futures/unix_events.py   | 40 +--
 2 files changed, 10 insertions(+), 44 deletions(-)

diff --git a/pym/portage/util/futures/_asyncio/__init__.py 
b/pym/portage/util/futures/_asyncio/__init__.py
index 1273afa02..940da4762 100644
--- a/pym/portage/util/futures/_asyncio/__init__.py
+++ b/pym/portage/util/futures/_asyncio/__init__.py
@@ -35,7 +35,10 @@ portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures.unix_events:DefaultEventLoopPolicy',
 )
 from portage.util._eventloop.asyncio_event_loop import AsyncioEventLoop as 
_AsyncioEventLoop
-from portage.util._eventloop.global_event_loop import _asyncio_enabled
+from portage.util._eventloop.global_event_loop import (
+   _asyncio_enabled,
+   global_event_loop as _global_event_loop,
+)
 from portage.util.futures.futures import (
CancelledError,
Future,
@@ -168,14 +171,15 @@ def _wrap_loop(loop=None):
@rtype: asyncio.AbstractEventLoop (or compatible)
@return: event loop
"""
-   return loop or get_event_loop()
+   return loop or _global_event_loop()
 
 
 if _asyncio_enabled:
-   get_event_loop_policy = _real_asyncio.get_event_loop_policy
-   set_event_loop_policy = _real_asyncio.set_event_loop_policy
+   # The default loop returned by _wrap_loop should be consistent
+   # with global_event_loop, in order to avoid accidental registration
+   # of callbacks with a loop that is not intended to run.
 
def _wrap_loop(loop=None):
-   loop = loop or get_event_loop()
+   loop = loop or _global_event_loop()
return (loop if hasattr(loop, '_asyncio_wrapper')
else _AsyncioEventLoop(loop=loop))

diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
index ce520db00..8eb369f8b 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -681,42 +681,4 @@ class 
_PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
return _global_event_loop()._asyncio_child_watcher
 
 
-class _AsyncioEventLoopPolicy(_PortageEventLoopPolicy):
-   """
-   Implementation of asyncio.AbstractEventLoopPolicy based on asyncio's
-   event loop. This supports running event loops in forks,
-   which is not supported by the default asyncio event loop policy,
-   see https://bugs.python.org/issue22087 and also
-   https://bugs.python.org/issue29703 which affects pypy3-5.10.1.
-   """
-   _MAIN_PID = os.getpid()
-
-   def __init__(self):
-   super(_AsyncioEventLoopPolicy, self).__init__()
-   self._default_policy = _real_asyncio.DefaultEventLoopPolicy()
-
-   def get_event_loop(self):
-   """
-   Get the event loop for the current context.
-
-   Returns an event loop object implementing the AbstractEventLoop
-   interface.
-
-   @rtype: asyncio.AbstractEventLoop (or compatible)
-   @return: the current event loop policy
-   """
-   if os.getpid() == self._MAIN_PID:
-   return self._default_policy.get_event_loop()
-   else:
-   return super(_AsyncioEventLoopPolicy, 
self).get_event_loop()
-
-   def get_child_watcher(self):
-   """Get the watcher for child processes."""
-   if os.getpid() == self._MAIN_PID:
-   return self._default_policy.get_child_watcher()
-   else:
-   return super(_AsyncioEventLoopPolicy, 
self).get_child_watcher()
-
-
-DefaultEventLoopPolicy = (_AsyncioEventLoopPolicy if _asyncio_enabled
-   else _PortageEventLoopPolicy)
+DefaultEventLoopPolicy = _PortageEventLoopPolicy



[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/, ...

2018-05-06 Thread Zac Medico
commit: b3b15c451cc21a2c53638f0eacc8396e395dcab3
Author: Zac Medico  gentoo  org>
AuthorDate: Mon May  7 06:24:22 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Mon May  7 06:28:47 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=b3b15c45

retry: add loop parameter during decoration

 pym/portage/sync/modules/rsync/rsync.py  |  2 +-
 pym/portage/tests/util/futures/test_retry.py | 16 
 pym/portage/util/futures/retry.py|  6 +++---
 3 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/pym/portage/sync/modules/rsync/rsync.py 
b/pym/portage/sync/modules/rsync/rsync.py
index 1b8941ff6..070798a53 100644
--- a/pym/portage/sync/modules/rsync/rsync.py
+++ b/pym/portage/sync/modules/rsync/rsync.py
@@ -173,7 +173,7 @@ class RsyncSync(NewBase):
loop = global_event_loop()
func_coroutine = 
functools.partial(loop.run_in_executor,
None, 
noisy_refresh_keys)
-   decorated_func = 
retry_decorator(func_coroutine)
+   decorated_func = 
retry_decorator(func_coroutine, loop=loop)

loop.run_until_complete(decorated_func())
out.eend(0)
except (GematoException, asyncio.TimeoutError) 
as e:

diff --git a/pym/portage/tests/util/futures/test_retry.py 
b/pym/portage/tests/util/futures/test_retry.py
index 16ecccbc7..7a1e76280 100644
--- a/pym/portage/tests/util/futures/test_retry.py
+++ b/pym/portage/tests/util/futures/test_retry.py
@@ -85,7 +85,7 @@ class RetryTestCase(TestCase):
func_coroutine = self._wrap_coroutine_func(SucceedLater(1))
decorator = retry(try_max=,
delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
-   decorated_func = decorator(func_coroutine)
+   decorated_func = decorator(func_coroutine, loop=loop)
result = loop.run_until_complete(decorated_func())
self.assertEqual(result, 'success')
 
@@ -94,7 +94,7 @@ class RetryTestCase(TestCase):
func_coroutine = self._wrap_coroutine_func(SucceedNever())
decorator = retry(try_max=4, try_timeout=None,
delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
-   decorated_func = decorator(func_coroutine)
+   decorated_func = decorator(func_coroutine, loop=loop)
done, pending = 
loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
self.assertEqual(len(done), 1)
self.assertTrue(isinstance(done.pop().exception().__cause__, 
SucceedNeverException))
@@ -104,7 +104,7 @@ class RetryTestCase(TestCase):
func_coroutine = self._wrap_coroutine_func(SucceedNever())
decorator = retry(reraise=True, try_max=4, try_timeout=None,
delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
-   decorated_func = decorator(func_coroutine)
+   decorated_func = decorator(func_coroutine, loop=loop)
done, pending = 
loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
self.assertEqual(len(done), 1)
self.assertTrue(isinstance(done.pop().exception(), 
SucceedNeverException))
@@ -114,7 +114,7 @@ class RetryTestCase(TestCase):
func_coroutine = self._wrap_coroutine_func(HangForever())
decorator = retry(try_max=2, try_timeout=0.1,
delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
-   decorated_func = decorator(func_coroutine)
+   decorated_func = decorator(func_coroutine, loop=loop)
done, pending = 
loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
self.assertEqual(len(done), 1)
self.assertTrue(isinstance(done.pop().exception().__cause__, 
asyncio.TimeoutError))
@@ -124,7 +124,7 @@ class RetryTestCase(TestCase):
func_coroutine = self._wrap_coroutine_func(HangForever())
decorator = retry(reraise=True, try_max=2, try_timeout=0.1,
delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
-   decorated_func = decorator(func_coroutine)
+   decorated_func = decorator(func_coroutine, loop=loop)
done, pending = 
loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
self.assertEqual(len(done), 1)
self.assertTrue(isinstance(done.pop().exception(), 
asyncio.TimeoutError))
@@ -134,7 +134,7 @@ class RetryTestCase(TestCase):
func_coroutine = self._wr

[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/_asyncio/, pym/portage/tests/util/futures/asyncio/, ...

2018-05-06 Thread Zac Medico
commit: 85ac23b7c0c58cef72d22281d66d086521c01e3e
Author: Zac Medico  gentoo  org>
AuthorDate: Sun May  6 11:05:03 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Sun May  6 11:41:45 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=85ac23b7

asyncio: add _wrap_loop helper (bug 654390)

In order to deal with asyncio event loop compatibility issues, add
a _wrap_loop helper. For example, since python3.4 does not have the
AbstractEventLoop.create_future() method, this helper function can
be used to add a wrapper that implements the create_future method
for python3.4.

Bug: https://bugs.gentoo.org/654390

 pym/portage/dbapi/porttree.py | 12 ++--
 .../tests/util/futures/asyncio/test_child_watcher.py  |  2 +-
 .../util/futures/asyncio/test_event_loop_in_fork.py   |  8 
 .../tests/util/futures/asyncio/test_pipe_closed.py|  4 ++--
 .../util/futures/asyncio/test_run_until_complete.py   |  2 +-
 .../util/futures/asyncio/test_subprocess_exec.py  |  4 ++--
 pym/portage/util/futures/_asyncio/__init__.py | 19 ++-
 pym/portage/util/futures/_asyncio/tasks.py|  7 +--
 pym/portage/util/futures/executor/fork.py |  4 ++--
 pym/portage/util/futures/iter_completed.py|  7 +++
 pym/portage/util/futures/retry.py |  3 +--
 11 files changed, 45 insertions(+), 27 deletions(-)

diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py
index 801b5658a..3e36024ff 100644
--- a/pym/portage/dbapi/porttree.py
+++ b/pym/portage/dbapi/porttree.py
@@ -36,7 +36,7 @@ from portage import _encodings
 from portage import _unicode_encode
 from portage import OrderedDict
 from portage.util._eventloop.EventLoop import EventLoop
-from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures import asyncio
 from portage.util.futures.iter_completed import iter_gather
 from _emerge.EbuildMetadataPhase import EbuildMetadataPhase
 
@@ -325,8 +325,8 @@ class portdbapi(dbapi):
@property
def _event_loop(self):
if portage._internal_caller:
-   # For internal portage usage, the global_event_loop is 
safe.
-   return global_event_loop()
+   # For internal portage usage, asyncio._wrap_loop() is 
safe.
+   return asyncio._wrap_loop()
else:
# For external API consumers, use a local EventLoop, 
since
# we don't want to assume that it's safe to override the
@@ -611,7 +611,7 @@ class portdbapi(dbapi):
# to simultaneous instantiation of multiple event loops here.
# Callers of this method certainly want the same event loop to
# be used for all calls.
-   loop = loop or global_event_loop()
+   loop = asyncio._wrap_loop(loop)
future = loop.create_future()
cache_me = False
if myrepo is not None:
@@ -751,7 +751,7 @@ class portdbapi(dbapi):
a set of alternative URIs.
@rtype: asyncio.Future (or compatible)
"""
-   loop = loop or global_event_loop()
+   loop = asyncio._wrap_loop(loop)
result = loop.create_future()
 
def aux_get_done(aux_get_future):
@@ -1419,7 +1419,7 @@ def _async_manifest_fetchlist(portdb, repo_config, cp, 
cpv_list=None,
@return: a Future resulting in a Mapping compatible with FetchlistDict
@rtype: asyncio.Future (or compatible)
"""
-   loop = loop or global_event_loop()
+   loop = asyncio._wrap_loop(loop)
result = loop.create_future()
cpv_list = (portdb.cp_list(cp, mytree=repo_config.location)
if cpv_list is None else cpv_list)

diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py 
b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
index dca01be56..8ef497544 100644
--- a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
+++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
@@ -28,7 +28,7 @@ class ChildWatcherTestCase(TestCase):
 
args_tuple = ('hello', 'world')
 
-   loop = asyncio.get_event_loop()
+   loop = asyncio._wrap_loop()
future = loop.create_future()
 
def callback(pid, returncode, *args):

diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py 
b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
index 7868d792a..19588bf3a 100644
--- a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
+++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
@@ -11,14 +11,14 @@ from portage.util.futures.unix_events import 
DefaultEventL

[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/

2018-04-27 Thread Zac Medico
commit: be61882996099322bb3a1e82e71f475b4141ad40
Author: Zac Medico  gentoo  org>
AuthorDate: Tue Apr 24 23:28:08 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Fri Apr 27 21:33:00 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=be618829

Add iter_gather function (bug 653946)

This is similar to asyncio.gather, but takes an iterator of
futures as input, and includes support for max_jobs and max_load
parameters. For bug 653946, this will be used to asynchronously
gather the results of the portdbapi.async_fetch_map calls that
are required to generate a Manifest, while using the max_jobs
parameter to limit the number of concurrent async_aux_get calls.

Bug: https://bugs.gentoo.org/653946

 pym/portage/util/futures/iter_completed.py | 73 ++
 1 file changed, 73 insertions(+)

diff --git a/pym/portage/util/futures/iter_completed.py 
b/pym/portage/util/futures/iter_completed.py
index 5ad075305..1d6a9a4bd 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -112,3 +112,76 @@ def async_iter_completed(futures, max_jobs=None, 
max_load=None, loop=None):
# cleanup in case of interruption by SIGINT, etc
scheduler.cancel()
scheduler.wait()
+
+
+def iter_gather(futures, max_jobs=None, max_load=None, loop=None):
+   """
+   This is similar to asyncio.gather, but takes an iterator of
+   futures as input, and includes support for max_jobs and max_load
+   parameters.
+
+   @param futures: iterator of asyncio.Future (or compatible)
+   @type futures: iterator
+   @param max_jobs: max number of futures to process concurrently (default
+   is multiprocessing.cpu_count())
+   @type max_jobs: int
+   @param max_load: max load allowed when scheduling a new future,
+   otherwise schedule no more than 1 future at a time (default
+   is multiprocessing.cpu_count())
+   @type max_load: int or float
+   @param loop: event loop
+   @type loop: EventLoop
+   @return: a Future resulting in a list of done input futures, in the
+   same order that they were yielded from the input iterator
+   @rtype: asyncio.Future (or compatible)
+   """
+   loop = loop or global_event_loop()
+   loop = getattr(loop, '_asyncio_wrapper', loop)
+   result = loop.create_future()
+   futures_list = []
+
+   def future_generator():
+   for future in futures:
+   futures_list.append(future)
+   yield future
+
+   completed_iter = async_iter_completed(
+   future_generator(),
+   max_jobs=max_jobs,
+   max_load=max_load,
+   loop=loop,
+   )
+
+   def handle_result(future_done_set):
+   if result.cancelled():
+   if not future_done_set.cancelled():
+   # All exceptions must be consumed from 
future_done_set, in order
+   # to avoid triggering the event loop's 
exception handler.
+   list(future.exception() for future in 
future_done_set.result()
+   if not future.cancelled())
+   return
+
+   try:
+   handle_result.current_task = next(completed_iter)
+   except StopIteration:
+   result.set_result(futures_list)
+   else:
+   
handle_result.current_task.add_done_callback(handle_result)
+
+   try:
+   handle_result.current_task = next(completed_iter)
+   except StopIteration:
+   handle_result.current_task = None
+   result.set_result(futures_list)
+   else:
+   handle_result.current_task.add_done_callback(handle_result)
+
+   def cancel_callback(result):
+   if (result.cancelled() and
+   handle_result.current_task is not None and
+   not handle_result.current_task.done()):
+   handle_result.current_task.cancel()
+
+   result.add_done_callback(cancel_callback)
+
+   return result



[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/

2018-04-17 Thread Zac Medico
commit: a9e8ebaa6979ccf0bb385e457d695bedc7b65bf5
Author: Zac Medico  gentoo  org>
AuthorDate: Tue Apr 17 09:51:36 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Tue Apr 17 17:44:58 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=a9e8ebaa

Add async_iter_completed for asyncio migration (bug 591760)

This serves as a wrapper around portage's internal TaskScheduler
class, allowing TaskScheduler API consumers to be migrated to use
asyncio interfaces.

Bug: https://bugs.gentoo.org/591760

 .../tests/util/futures/test_iter_completed.py  | 37 -
 pym/portage/util/futures/iter_completed.py | 61 +++---
 2 files changed, 91 insertions(+), 7 deletions(-)

diff --git a/pym/portage/tests/util/futures/test_iter_completed.py 
b/pym/portage/tests/util/futures/test_iter_completed.py
index 9c23aefb1..1344523c6 100644
--- a/pym/portage/tests/util/futures/test_iter_completed.py
+++ b/pym/portage/tests/util/futures/test_iter_completed.py
@@ -5,7 +5,11 @@ import time
 from portage.tests import TestCase
 from portage.util._async.ForkProcess import ForkProcess
 from portage.util._eventloop.global_event_loop import global_event_loop
-from portage.util.futures.iter_completed import iter_completed
+from portage.util.futures import asyncio
+from portage.util.futures.iter_completed import (
+   iter_completed,
+   async_iter_completed,
+)
 
 
 class SleepProcess(ForkProcess):
@@ -48,3 +52,34 @@ class IterCompletedTestCase(TestCase):
for seconds, future in zip(expected_order, 
iter_completed(future_generator(),
max_jobs=True, max_load=None, loop=loop)):
self.assertEqual(seconds, future.result())
+
+   def testAsyncCancel(self):
+
+   loop = global_event_loop()._asyncio_wrapper
+   input_futures = set()
+   future_count = 3
+
+   def future_generator():
+   for i in range(future_count):
+   future = loop.create_future()
+   loop.call_soon(lambda future: None if 
future.done()
+   else future.set_result(None), future)
+   input_futures.add(future)
+   yield future
+
+   for future_done_set in async_iter_completed(future_generator(),
+   max_jobs=True, max_load=None, loop=loop):
+   future_done_set.cancel()
+   break
+
+   # With max_jobs=True, async_iter_completed should have executed
+   # the generator until it raised StopIteration.
+   self.assertEqual(future_count, len(input_futures))
+
+   loop.run_until_complete(asyncio.wait(input_futures, loop=loop))
+
+   # The futures may have results or they may have been cancelled
+   # by TaskScheduler, and behavior varies depending on the python
+   # interpreter.
+   for future in input_futures:
+   future.cancelled() or future.result()

diff --git a/pym/portage/util/futures/iter_completed.py 
b/pym/portage/util/futures/iter_completed.py
index 8d324de84..5ad075305 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -1,6 +1,7 @@
 # Copyright 2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
+import functools
 import multiprocessing
 
 from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
@@ -31,6 +32,38 @@ def iter_completed(futures, max_jobs=None, max_load=None, 
loop=None):
"""
loop = loop or global_event_loop()
loop = getattr(loop, '_asyncio_wrapper', loop)
+
+   for future_done_set in async_iter_completed(futures,
+   max_jobs=max_jobs, max_load=max_load, loop=loop):
+   for future in loop.run_until_complete(future_done_set):
+   yield future
+
+
+def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
+   """
+   An asynchronous version of iter_completed. This yields futures, which
+   when done, result in a set of input futures that are done. This serves
+   as a wrapper around portage's internal TaskScheduler class, using
+   standard asyncio interfaces.
+
+   @param futures: iterator of asyncio.Future (or compatible)
+   @type futures: iterator
+   @param max_jobs: max number of futures to process concurrently (default
+   is multiprocessing.cpu_count())
+   @type max_jobs: int
+   @param max_load: max load allowed when scheduling a new future,
+   otherwise schedule no more than 1 future at a time (default
+   is multiprocessing.cpu_count())
+   @type max_load: int or float
+   @param loop: event loop
+   @type loop: EventLoop
+   @retur

[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/

2018-04-15 Thread Zac Medico
commit: 9a7b0a006e65f8683716d60574e4f19f8ffd603d
Author: Zac Medico  gentoo  org>
AuthorDate: Sat Apr 14 21:29:29 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Mon Apr 16 00:04:26 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=9a7b0a00

Implement AbstractEventLoop.connect_read_pipe (bug 649588)

In python versions that support asyncio, this allows API consumers
to use subprocess.PIPE for asyncio.create_subprocess_exec() stdout
and stderr parameters.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_subprocess_exec.py   |  30 
 pym/portage/util/futures/unix_events.py| 157 -
 2 files changed, 184 insertions(+), 3 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index d30f48c43..94984fc93 100644
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -2,6 +2,7 @@
 # Distributed under the terms of the GNU General Public License v2
 
 import os
+import subprocess
 
 from portage.process import find_binary
 from portage.tests import TestCase
@@ -161,3 +162,32 @@ class SubprocessExecTestCase(TestCase):
f.close()
 
self._run_test(test)
+
+   def testReadTransport(self):
+   """
+   Test asyncio.create_subprocess_exec(stdout=subprocess.PIPE) 
which
+   requires an AbstractEventLoop.connect_read_pipe implementation
+   (and a ReadTransport implementation for it to return).
+   """
+   if not hasattr(asyncio, 'create_subprocess_exec'):
+   self.skipTest('create_subprocess_exec not implemented 
for python2')
+
+   args_tuple = (b'hello', b'world')
+   echo_binary = find_binary("echo")
+   self.assertNotEqual(echo_binary, None)
+   echo_binary = echo_binary.encode()
+
+   def test(loop):
+   with open(os.devnull, 'rb', 0) as devnull:
+   proc = loop.run_until_complete(
+   asyncio.create_subprocess_exec(
+   echo_binary, *args_tuple,
+   stdin=devnull,
+   stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT))
+
+   self.assertEqual(
+   
tuple(loop.run_until_complete(proc.stdout.read()).split()),
+   args_tuple)
+   self.assertEqual(loop.run_until_complete(proc.wait()), 
os.EX_OK)
+
+   self._run_test(test)

diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
index d788c2bea..9d84ab6aa 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -9,12 +9,18 @@ __all__ = (
 try:
from asyncio.base_subprocess import BaseSubprocessTransport as 
_BaseSubprocessTransport
from asyncio.unix_events import AbstractChildWatcher as 
_AbstractChildWatcher
+   from asyncio.transports import ReadTransport as _ReadTransport
 except ImportError:
_AbstractChildWatcher = object
_BaseSubprocessTransport = object
+   _ReadTransport = object
 
+import errno
+import fcntl
 import functools
+import logging
 import os
+import stat
 import subprocess
 
 from portage.util._eventloop.global_event_loop import (
@@ -82,6 +88,35 @@ class _PortageEventLoop(events.AbstractEventLoop):
"""
return asyncio.Task(coro, loop=self)
 
+   def connect_read_pipe(self, protocol_factory, pipe):
+   """
+   Register read pipe in event loop. Set the pipe to non-blocking 
mode.
+
+   @type protocol_factory: callable
+   @param protocol_factory: must instantiate object with Protocol 
interface
+   @type pipe: file
+   @param pipe: a pipe to read from
+   @rtype: asyncio.Future
+   @return: Return pair (transport, protocol), where transport 
supports the
+   ReadTransport interface.
+   """
+   protocol = protocol_factory()
+   result = self.create_future()
+   waiter = self.create_future()
+   transport = self._make_read_pipe_transport(pipe, protocol, 
waiter=waiter)
+
+   def waiter_callback(waiter):
+   try:
+   waiter.result()
+   except Exception as e:
+   transport.close()
+   result.set_exception(e)
+   else:
+   result.set_result((transport, protocol))
+
+ 

[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/util/_eventloop/

2018-04-15 Thread Zac Medico
commit: 608663259a8a6fa78c32205389dfa58d00b6f11c
Author: Zac Medico  gentoo  org>
AuthorDate: Sun Apr 15 18:56:43 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Sun Apr 15 18:58:00 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=60866325

Implement AbstractEventLoop.is_running() (bug 649588)

Bug: https://bugs.gentoo.org/649588

 pym/portage/util/_eventloop/EventLoop.py | 14 ++
 pym/portage/util/futures/unix_events.py  |  1 +
 2 files changed, 15 insertions(+)

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index 13ce5478e..a928f3138 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -125,6 +125,10 @@ class EventLoop(object):
self._poll_event_queue = []
self._poll_event_handlers = {}
self._poll_event_handler_ids = {}
+   # Number of current calls to self.iteration(). A number greater
+   # than 1 indicates recursion, which is not supported by 
asyncio's
+   # default event loop.
+   self._iteration_depth = 0
# Increment id for each new handler.
self._event_handler_id = 0
# New call_soon callbacks must have an opportunity to
@@ -262,7 +266,13 @@ class EventLoop(object):
@rtype: bool
@return: True if events were dispatched.
"""
+   self._iteration_depth += 1
+   try:
+   return self._iteration(*args)
+   finally:
+   self._iteration_depth -= 1
 
+   def _iteration(self, *args):
may_block = True
 
if args:
@@ -822,6 +832,10 @@ class EventLoop(object):
self._default_executor = executor
return executor.submit(func, *args)
 
+   def is_running(self):
+   """Return whether the event loop is currently running."""
+   return self._iteration_depth > 0
+
def is_closed(self):
"""Returns True if the event loop was closed."""
return self._poll_obj is None

diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
index 1abc420e1..d788c2bea 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -43,6 +43,7 @@ class _PortageEventLoop(events.AbstractEventLoop):
self.call_soon_threadsafe = loop.call_soon_threadsafe
self.call_later = loop.call_later
self.call_at = loop.call_at
+   self.is_running = loop.is_running
self.is_closed = loop.is_closed
self.close = loop.close
self.create_future = loop.create_future



[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/

2018-04-13 Thread Zac Medico
commit: d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2
Author: Zac Medico  gentoo  org>
AuthorDate: Thu Apr 12 03:56:25 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Fri Apr 13 07:10:10 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=d31db4df

Implement _PortageEventLoop.subprocess_exec (bug 649588)

In python versions that support asyncio, this allows API consumers
to use the asyncio.create_subprocess_exec() function with portage's
internal event loop. Currently, subprocess.PIPE is not implemented
because that would require an implementation of asyncio's private
asyncio.unix_events._UnixReadPipeTransport class. However, it's
possible to use pipes created with os.pipe() for stdin, stdout,
and stderr, as demonstrated in the included unit tests.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_subprocess_exec.py   | 163 +
 pym/portage/util/futures/unix_events.py|  98 +
 2 files changed, 261 insertions(+)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
new file mode 100644
index 0..d30f48c43
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -0,0 +1,163 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+
+from portage.process import find_binary
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.executor.fork import ForkExecutor
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+from _emerge.PipeReader import PipeReader
+
+
+def reader(input_file, loop=None):
+   """
+   Asynchronously read a binary input file.
+
+   @param input_file: binary input file
+   @type input_file: file
+   @param loop: event loop
+   @type loop: EventLoop
+   @return: bytes
+   @rtype: asyncio.Future (or compatible)
+   """
+   loop = loop or asyncio.get_event_loop()
+   loop = getattr(loop, '_asyncio_wrapper', loop)
+   future = loop.create_future()
+   _Reader(future, input_file, loop)
+   return future
+
+
+class _Reader(object):
+   def __init__(self, future, input_file, loop):
+   self._future = future
+   self._pipe_reader = PipeReader(
+   input_files={'input_file':input_file}, 
scheduler=loop._loop)
+
+   self._future.add_done_callback(self._cancel_callback)
+   self._pipe_reader.addExitListener(self._eof)
+   self._pipe_reader.start()
+
+   def _cancel_callback(self, future):
+   if future.cancelled():
+   self._cancel()
+
+   def _eof(self, pipe_reader):
+   self._pipe_reader = None
+   self._future.set_result(pipe_reader.getvalue())
+
+   def _cancel(self):
+   if self._pipe_reader is not None and self._pipe_reader.poll() 
is None:
+   self._pipe_reader.removeExitListener(self._eof)
+   self._pipe_reader.cancel()
+   self._pipe_reader = None
+
+
+class SubprocessExecTestCase(TestCase):
+   def _run_test(self, test):
+   initial_policy = asyncio.get_event_loop_policy()
+   if not isinstance(initial_policy, DefaultEventLoopPolicy):
+   asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+   try:
+   test(asyncio.get_event_loop())
+   finally:
+   asyncio.set_event_loop_policy(initial_policy)
+
+   def testEcho(self):
+   if not hasattr(asyncio, 'create_subprocess_exec'):
+   self.skipTest('create_subprocess_exec not implemented 
for python2')
+
+   args_tuple = (b'hello', b'world')
+   echo_binary = find_binary("echo")
+   self.assertNotEqual(echo_binary, None)
+   echo_binary = echo_binary.encode()
+
+   # Use os.pipe(), since this loop does not implement the
+   # ReadTransport necessary for subprocess.PIPE support.
+   stdout_pr, stdout_pw = os.pipe()
+   stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+   stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+   files = [stdout_pr, stdout_pw]
+
+   def test(loop):
+   output = None
+   try:
+   with open(os.devnull, 'rb', 0) as devnull:
+   proc = loop.run_until_complete(
+   asyncio.create_subprocess_exec(
+   echo_binary, *args_tuple,
+   stdin=devnull, 
stdout=stdout_pw, stderr=stdout_pw))
+
+   

[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/util/_eventloop/

2018-04-12 Thread Zac Medico
commit: 449b7b9f30c869781f7012ca53e9cda4efef6f9b
Author: Zac Medico  gentoo  org>
AuthorDate: Thu Apr 12 19:28:47 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Thu Apr 12 19:28:47 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=449b7b9f

Implement AbstractEventLoop.call_exception_handler (bug 649588)

When using asyncio with _PortageEventLoopPolicy, this method is
required in order to avoid a NotImplementedError if a coroutine
raises an unexpected exception.

Bug: https://bugs.gentoo.org/649588

 pym/portage/util/_eventloop/EventLoop.py | 68 
 pym/portage/util/futures/unix_events.py  |  2 +
 2 files changed, 70 insertions(+)

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index 12c199c76..13ce5478e 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -9,6 +9,7 @@ import os
 import select
 import signal
 import sys
+import traceback
 
 try:
import fcntl
@@ -842,6 +843,73 @@ class EventLoop(object):
close()
self._poll_obj = None
 
+   def default_exception_handler(self, context):
+   """
+   Default exception handler.
+
+   This is called when an exception occurs and no exception
+   handler is set, and can be called by a custom exception
+   handler that wants to defer to the default behavior.
+
+   The context parameter has the same meaning as in
+   `call_exception_handler()`.
+
+   @param context: exception context
+   @type context: dict
+   """
+   message = context.get('message')
+   if not message:
+   message = 'Unhandled exception in event loop'
+
+   exception = context.get('exception')
+   if exception is not None:
+   exc_info = (type(exception), exception, 
exception.__traceback__)
+   else:
+   exc_info = False
+
+   log_lines = [message]
+   for key in sorted(context):
+   if key in {'message', 'exception'}:
+   continue
+   value = context[key]
+   if key == 'source_traceback':
+   tb = ''.join(traceback.format_list(value))
+   value = 'Object created at (most recent call 
last):\n'
+   value += tb.rstrip()
+   elif key == 'handle_traceback':
+   tb = ''.join(traceback.format_list(value))
+   value = 'Handle created at (most recent call 
last):\n'
+   value += tb.rstrip()
+   else:
+   value = repr(value)
+   log_lines.append('{}: {}'.format(key, value))
+
+   logging.error('\n'.join(log_lines), exc_info=exc_info)
+   os.kill(os.getpid(), signal.SIGTERM)
+
+   def call_exception_handler(self, context):
+   """
+   Call the current event loop's exception handler.
+
+   The context argument is a dict containing the following keys:
+
+   - 'message': Error message;
+   - 'exception' (optional): Exception object;
+   - 'future' (optional): Future instance;
+   - 'handle' (optional): Handle instance;
+   - 'protocol' (optional): Protocol instance;
+   - 'transport' (optional): Transport instance;
+   - 'socket' (optional): Socket instance;
+   - 'asyncgen' (optional): Asynchronous generator that caused
+   the exception.
+
+   New keys may be introduced in the future.
+
+   @param context: exception context
+   @type context: dict
+   """
+   self.default_exception_handler(context)
+
def get_debug(self):
"""
Get the debug mode (bool) of the event loop.

diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
index 6fcef45fa..5434cd942 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -48,6 +48,8 @@ class _PortageEventLoop(events.AbstractEventLoop):
self.remove_writer = loop.remove_writer
self.run_in_executor = loop.run_in_executor
self.time = loop.time
+   self.default_exception_handler = loop.default_exception_handler
+   self.call_exception_handler = loop.call_exception_handler
self.set_debug = loop.set_debug
self.get_debug = loop.get_debug
 


[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/, ...

2018-04-12 Thread Zac Medico
commit: a78dca7e47f79ad48aee4909ee10688604996b86
Author: Zac Medico  gentoo  org>
AuthorDate: Wed Apr 11 06:44:41 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Thu Apr 12 08:35:05 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=a78dca7e

Implement AbstractEventLoopPolicy.get_child_watcher() (bug 649588)

Use a _PortageChildWatcher class to wrap portage's internal event loop
and implement asyncio's AbstractChildWatcher interface.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_child_watcher.py | 45 +++
 pym/portage/util/_eventloop/EventLoop.py   |  7 +-
 pym/portage/util/futures/_asyncio.py   | 13 
 pym/portage/util/futures/unix_events.py| 90 ++
 4 files changed, 152 insertions(+), 3 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py 
b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
new file mode 100644
index 0..dca01be56
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
@@ -0,0 +1,45 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+
+from portage.process import find_binary, spawn
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+
+
+class ChildWatcherTestCase(TestCase):
+   def testChildWatcher(self):
+   true_binary = find_binary("true")
+   self.assertNotEqual(true_binary, None)
+
+   initial_policy = asyncio.get_event_loop_policy()
+   if not isinstance(initial_policy, DefaultEventLoopPolicy):
+   asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+   try:
+   try:
+   asyncio.set_child_watcher(None)
+   except NotImplementedError:
+   pass
+   else:
+   self.assertTrue(False)
+
+   args_tuple = ('hello', 'world')
+
+   loop = asyncio.get_event_loop()
+   future = loop.create_future()
+
+   def callback(pid, returncode, *args):
+   future.set_result((pid, returncode, args))
+
+   with asyncio.get_child_watcher() as watcher:
+   pids = spawn([true_binary], returnpid=True)
+   watcher.add_child_handler(pids[0], callback, 
*args_tuple)
+
+   self.assertEqual(
+   loop.run_until_complete(future),
+   (pids[0], os.EX_OK, args_tuple))
+   finally:
+   asyncio.set_event_loop_policy(initial_policy)

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index d53a76ba1..12c199c76 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -25,7 +25,7 @@ import portage
 portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures.futures:Future',
'portage.util.futures.executor.fork:ForkExecutor',
-   'portage.util.futures.unix_events:_PortageEventLoop',
+   
'portage.util.futures.unix_events:_PortageEventLoop,_PortageChildWatcher',
 )
 
 from portage import OrderedDict
@@ -190,6 +190,7 @@ class EventLoop(object):
self._sigchld_src_id = None
self._pid = os.getpid()
self._asyncio_wrapper = _PortageEventLoop(loop=self)
+   self._asyncio_child_watcher = _PortageChildWatcher(self)
 
def create_future(self):
"""
@@ -424,8 +425,8 @@ class EventLoop(object):
self._sigchld_read, self.IO_IN, 
self._sigchld_io_cb)
signal.signal(signal.SIGCHLD, 
self._sigchld_sig_cb)
 
-   # poll now, in case the SIGCHLD has already arrived
-   self._poll_child_processes()
+   # poll soon, in case the SIGCHLD has already arrived
+   self.call_soon(self._poll_child_processes)
return source_id
 
def _sigchld_sig_cb(self, signum, frame):

diff --git a/pym/portage/util/futures/_asyncio.py 
b/pym/portage/util/futures/_asyncio.py
index 02ab5..0f84f14b7 100644
--- a/pym/portage/util/futures/_asyncio.py
+++ b/pym/portage/util/futures/_asyncio.py
@@ -3,7 +3,9 @@
 
 __all__ = (
'ensure_future',
+   'get_child_watcher',
'get_event_loop',
+   'set_child_watcher',
'get_event_loop_policy',
'set_event_loop_policy',
'sleep',
@@ -62,6 +64,17 @@ def get_event_loop():
return get_event_loop_policy().get_event_loop()
 
 
+def get

[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/executor/

2018-04-12 Thread Zac Medico
commit: ae59758e395393601a389ede4f4b521db6786139
Author: Zac Medico  gentoo  org>
AuthorDate: Thu Apr 12 08:25:17 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Thu Apr 12 08:30:49 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=ae59758e

ForkExecutor: fix shutdown to handle empty self._running_tasks

 pym/portage/util/futures/executor/fork.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/pym/portage/util/futures/executor/fork.py 
b/pym/portage/util/futures/executor/fork.py
index 919a72bfd..51367f934 100644
--- a/pym/portage/util/futures/executor/fork.py
+++ b/pym/portage/util/futures/executor/fork.py
@@ -96,6 +96,8 @@ class ForkExecutor(object):
 
def shutdown(self, wait=True):
self._shutdown = True
+   if not self._running_tasks and not self._shutdown_future.done():
+   self._shutdown_future.set_result(None)
if wait:
self._loop.run_until_complete(self._shutdown_future)
 



[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/executor/

2018-04-10 Thread Zac Medico
commit: 52d4689740f5f9fcf1cf7423e3fe4089dbb4c718
Author: Zac Medico  gentoo  org>
AuthorDate: Wed Apr 11 02:01:43 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Wed Apr 11 02:01:43 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=52d46897

ForkExecutor: support asyncio via _PortageEventLoopPolicy (bug 649588)

Support portage's internal EventLoop as well as the _PortageEventLoop
asyncio compatibility wrapper, by using the respective _loop and
_asyncio_wrapper attributes where appropriate.

Bug: https://bugs.gentoo.org/649588

 pym/portage/util/futures/executor/fork.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/pym/portage/util/futures/executor/fork.py 
b/pym/portage/util/futures/executor/fork.py
index 496b4e892..919a72bfd 100644
--- a/pym/portage/util/futures/executor/fork.py
+++ b/pym/portage/util/futures/executor/fork.py
@@ -25,7 +25,8 @@ class ForkExecutor(object):
"""
def __init__(self, max_workers=None, loop=None):
self._max_workers = max_workers or multiprocessing.cpu_count()
-   self._loop = loop or global_event_loop()
+   loop = loop or global_event_loop()
+   self._loop = getattr(loop, '_asyncio_wrapper', loop)
self._submit_queue = collections.deque()
self._running_tasks = {}
self._shutdown = False
@@ -53,7 +54,7 @@ class ForkExecutor(object):
future, proc = self._submit_queue.popleft()

future.add_done_callback(functools.partial(self._cancel_cb, proc))
proc.addExitListener(functools.partial(self._proc_exit, 
future))
-   proc.scheduler = self._loop
+   proc.scheduler = self._loop._loop
proc.start()
self._running_tasks[id(proc)] = proc
 



[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/util/_eventloop/, ...

2018-04-10 Thread Zac Medico
commit: 142d08c0636b172fbc00a7f2b10dc07479a57e2d
Author: Zac Medico  gentoo  org>
AuthorDate: Sun Mar  4 20:10:55 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Wed Apr 11 01:44:34 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=142d08c0

Add minimal asyncio.AbstractEventLoop implementation (bug 649588)

This provides minimal interoperability with existing asyncio code,
by adding a portage.util.futures.unix_events.DefaultEventLoopPolicy
class that makes asyncio use portage's internal event loop when an
instance is passed into asyncio.set_event_loop_policy(). The
get_event_loop() method of this policy returns an instance of a
_PortageEventLoop class that wraps portage's internal event loop and
implements asyncio's AbstractEventLoop interface.

The portage.util.futures.asyncio module refers to the real
asyncio module when available, and otherwise falls back to a
minimal implementation that works with python2.7. The included
EventLoopInForkTestCase demonstrates usage, and works with all
supported versions of python, include python2.7.

In python3.4 and later, API consumers can use asyncio coroutines,
since _PortageEventLoop is compatible with asyncio.Task!

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio}/__init__.py  |   0
 .../util/futures/asyncio/__test__.py}  |   0
 .../futures/asyncio/test_event_loop_in_fork.py |  59 +++
 pym/portage/util/_eventloop/EventLoop.py   |  11 +-
 pym/portage/util/futures/__init__.py   |  11 ++
 pym/portage/util/futures/_asyncio.py   | 114 
 pym/portage/util/futures/events.py | 191 +
 pym/portage/util/futures/futures.py|   9 +-
 pym/portage/util/futures/unix_events.py|  91 ++
 9 files changed, 477 insertions(+), 9 deletions(-)

diff --git a/pym/portage/util/futures/__init__.py 
b/pym/portage/tests/util/futures/asyncio/__init__.py
similarity index 100%
copy from pym/portage/util/futures/__init__.py
copy to pym/portage/tests/util/futures/asyncio/__init__.py

diff --git a/pym/portage/util/futures/__init__.py 
b/pym/portage/tests/util/futures/asyncio/__test__.py
similarity index 100%
copy from pym/portage/util/futures/__init__.py
copy to pym/portage/tests/util/futures/asyncio/__test__.py

diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py 
b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
new file mode 100644
index 0..7868d792a
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
@@ -0,0 +1,59 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import multiprocessing
+import os
+
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+
+
+def fork_main(parent_conn, child_conn):
+   parent_conn.close()
+   loop = asyncio.get_event_loop()
+   # This fails with python's default event loop policy,
+   # see https://bugs.python.org/issue22087.
+   loop.run_until_complete(asyncio.sleep(0.1))
+
+
+def async_main(fork_exitcode, loop=None):
+   loop = loop or asyncio.get_event_loop()
+
+   # Since python2.7 does not support Process.sentinel, use Pipe to
+   # monitor for process exit.
+   parent_conn, child_conn = multiprocessing.Pipe()
+
+   def eof_callback(proc):
+   loop.remove_reader(parent_conn.fileno())
+   parent_conn.close()
+   proc.join()
+   fork_exitcode.set_result(proc.exitcode)
+
+   proc = multiprocessing.Process(target=fork_main, args=(parent_conn, 
child_conn))
+   loop.add_reader(parent_conn.fileno(), eof_callback, proc)
+   proc.start()
+   child_conn.close()
+
+
+class EventLoopInForkTestCase(TestCase):
+   """
+   The default asyncio event loop policy does not support loops
+   running in forks, see https://bugs.python.org/issue22087.
+   Portage's DefaultEventLoopPolicy supports forks.
+   """
+
+   def testEventLoopInForkTestCase(self):
+   initial_policy = asyncio.get_event_loop_policy()
+   if not isinstance(initial_policy, DefaultEventLoopPolicy):
+   asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+   try:
+   loop = asyncio.get_event_loop()
+   fork_exitcode = loop.create_future()
+   # Make async_main fork while the loop is running, which 
would
+   # trigger https://bugs.python.org/issue22087 with 
asyncio's
+   # default event loop policy.
+   loop.call_soon(async_main, fork_exitcode)
+   assert loop.run_until_complete(fork_exitcode) == 
os.EX_OK
+   finally:
+

[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/executor/, pym/portage/util/_eventloop/

2018-04-02 Thread Zac Medico
commit: 4095be74985c5c2eead5fb480cf37baa11308d62
Author: Zac Medico  gentoo  org>
AuthorDate: Wed Mar 14 08:01:26 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Mon Apr  2 16:53:23 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=4095be74

Add ForkExecutor (bug 649588)

This is useful for asynchronous operations that we might
need to cancel if they take too long, since (concurrent.
futures.ProcessPoolExecutor tasks are not cancellable).
The ability to cancel tasks makes this executor useful
as an alternative to portage.exception.AlarmSignal.

Also add an asyncio-compatible EventLoop.run_in_executor
method that uses ForkExecutor as the default executor,
which will later be used to implement the corresponding
asyncio.AbstractEventLoop run_in_executor method.

Bug: https://bugs.gentoo.org/649588
Reviewed-by: Alec Warner  gentoo.org>

 pym/portage/util/_eventloop/EventLoop.py  |  45 -
 pym/portage/util/futures/executor/__init__.py |   0
 pym/portage/util/futures/executor/fork.py | 134 ++
 3 files changed, 178 insertions(+), 1 deletion(-)

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index f472a3dae..1574a6837 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -24,6 +24,7 @@ except ImportError:
 import portage
 portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures.futures:_EventLoopFuture',
+   'portage.util.futures.executor.fork:ForkExecutor',
 )
 
 from portage import OrderedDict
@@ -122,6 +123,7 @@ class EventLoop(object):
self._idle_callbacks = OrderedDict()
self._timeout_handlers = {}
self._timeout_interval = None
+   self._default_executor = None
 
self._poll_obj = None
try:
@@ -721,6 +723,46 @@ class EventLoop(object):
return self._handle(self.timeout_add(
delay * 1000, self._call_soon_callback(callback, 
args)), self)
 
+   def run_in_executor(self, executor, func, *args):
+   """
+   Arrange for a func to be called in the specified executor.
+
+   The executor argument should be an Executor instance. The 
default
+   executor is used if executor is None.
+
+   Use functools.partial to pass keywords to the *func*.
+
+   @param executor: executor
+   @type executor: concurrent.futures.Executor or None
+   @param func: a function to call
+   @type func: callable
+   @return: a Future
+   @rtype: asyncio.Future (or compatible)
+   """
+   if executor is None:
+   executor = self._default_executor
+   if executor is None:
+   executor = ForkExecutor(loop=self)
+   self._default_executor = executor
+   return executor.submit(func, *args)
+
+   def close(self):
+   """Close the event loop.
+
+   This clears the queues and shuts down the executor,
+   and waits for it to finish.
+   """
+   executor = self._default_executor
+   if executor is not None:
+   self._default_executor = None
+   executor.shutdown(wait=True)
+
+   if self._poll_obj is not None:
+   close = getattr(self._poll_obj, 'close')
+   if close is not None:
+   close()
+   self._poll_obj = None
+
 
 _can_poll_device = None
 
@@ -782,10 +824,11 @@ class _epoll_adapter(object):
that is associated with an epoll instance will close automatically when
it is garbage collected, so it's not necessary to close it explicitly.
"""
-   __slots__ = ('_epoll_obj',)
+   __slots__ = ('_epoll_obj', 'close')
 
def __init__(self, epoll_obj):
self._epoll_obj = epoll_obj
+   self.close = epoll_obj.close
 
def register(self, fd, *args):
self._epoll_obj.register(fd, *args)

diff --git a/pym/portage/util/futures/executor/__init__.py 
b/pym/portage/util/futures/executor/__init__.py
new file mode 100644
index 0..e69de29bb

diff --git a/pym/portage/util/futures/executor/fork.py 
b/pym/portage/util/futures/executor/fork.py
new file mode 100644
index 0..496b4e892
--- /dev/null
+++ b/pym/portage/util/futures/executor/fork.py
@@ -0,0 +1,134 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+   'ForkExecutor',
+)
+
+import collections
+import functools
+import multiprocessing
+import os
+import sys
+import traceback
+
+from portage.util._async.AsyncFunction import AsyncFun

[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/

2018-03-14 Thread Zac Medico
commit: 71c59145e0c7b631ec3f41e0d711445786d16f8f
Author: Zac Medico  gentoo  org>
AuthorDate: Thu Mar 15 02:45:31 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Thu Mar 15 02:49:02 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=71c59145

portage.util.futures.wait: fix arguments for asyncio compat

The bare "*" is not supported in python2.7, and in python3
the bare "*" means that keyword arguments must be used for
the arguments that follow.

Fixes: e43f6c583ed9 ("Add iter_completed convenience function (bug 648790)")

 pym/portage/util/futures/iter_completed.py |  2 +-
 pym/portage/util/futures/wait.py   | 16 +++-
 2 files changed, 4 insertions(+), 14 deletions(-)

diff --git a/pym/portage/util/futures/iter_completed.py 
b/pym/portage/util/futures/iter_completed.py
index 1050b6fa7..ad6275b49 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -52,7 +52,7 @@ def iter_completed(futures, max_jobs=None, max_load=None, 
loop=None):
# task_generator is exhausted
while future_map:
done, pending = loop.run_until_complete(
-   wait(*list(future_map.values()), 
return_when=FIRST_COMPLETED))
+   wait(list(future_map.values()), 
return_when=FIRST_COMPLETED))
for future in done:
del future_map[id(future)]
yield future

diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/wait.py
index 3f0bdbff5..bd85bb053 100644
--- a/pym/portage/util/futures/wait.py
+++ b/pym/portage/util/futures/wait.py
@@ -11,15 +11,13 @@ except ImportError:
 from portage.util._eventloop.global_event_loop import global_event_loop
 
 
-# Use **kwargs since python2.7 does not allow arguments with defaults
-# to follow *futures.
-def wait(*futures, **kwargs):
+def wait(futures, loop=None, timeout=None, return_when=ALL_COMPLETED):
"""
Use portage's internal EventLoop to emulate asyncio.wait:
https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
 
-   @param future: future to wait for
-   @type future: asyncio.Future (or compatible)
+   @param futures: futures to wait for
+   @type futures: asyncio.Future (or compatible)
@param timeout: number of seconds to wait (wait indefinitely if
not specified)
@type timeout: int or float
@@ -32,14 +30,6 @@ def wait(*futures, **kwargs):
@return: tuple of (done, pending).
@rtype: asyncio.Future (or compatible)
"""
-   if not futures:
-   raise TypeError("wait() missing 1 required positional argument: 
'future'")
-   loop = kwargs.pop('loop', None)
-   timeout = kwargs.pop('timeout', None)
-   return_when = kwargs.pop('return_when', ALL_COMPLETED)
-   if kwargs:
-   raise TypeError("wait() got an unexpected keyword argument 
'{}'".\
-   format(next(iter(kwargs
loop = loop or global_event_loop()
result_future = loop.create_future()
_Waiter(futures, timeout, return_when, result_future, loop)



[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/

2017-03-26 Thread Zac Medico
commit: d90ff7c046b39de82558655375ea104cfa19b176
Author: Zac Medico  gentoo  org>
AuthorDate: Sun Mar 26 20:08:54 2017 +
Commit: Zac Medico  gentoo  org>
CommitDate: Sun Mar 26 20:12:04 2017 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=d90ff7c0

_EventLoopFuture: reduce indent of class body (whitespace only)

 pym/portage/util/futures/futures.py | 278 ++--
 1 file changed, 139 insertions(+), 139 deletions(-)

diff --git a/pym/portage/util/futures/futures.py 
b/pym/portage/util/futures/futures.py
index dd913a1e3..dcf593c01 100644
--- a/pym/portage/util/futures/futures.py
+++ b/pym/portage/util/futures/futures.py
@@ -42,148 +42,148 @@ _CANCELLED = 'CANCELLED'
 _FINISHED = 'FINISHED'
 
 class _EventLoopFuture(object):
+   """
+   This class provides (a subset of) the asyncio.Future interface, for
+   use with the EventLoop class, because EventLoop is currently
+   missing some of the asyncio.AbstractEventLoop methods that
+   asyncio.Future requires.
+   """
+
+   # Class variables serving as defaults for instance variables.
+   _state = _PENDING
+   _result = None
+   _exception = None
+   _loop = None
+
+   def __init__(self, loop=None):
+   """Initialize the future.
+
+   The optional loop argument allows explicitly setting the event
+   loop object used by the future. If it's not provided, the 
future uses
+   the default event loop.
"""
-   This class provides (a subset of) the asyncio.Future interface, 
for
-   use with the EventLoop class, because EventLoop is currently
-   missing some of the asyncio.AbstractEventLoop methods that
-   asyncio.Future requires.
+   if loop is None:
+   self._loop = global_event_loop()
+   else:
+   self._loop = loop
+   self._callbacks = []
+
+   def cancel(self):
+   """Cancel the future and schedule callbacks.
+
+   If the future is already done or cancelled, return False.  
Otherwise,
+   change the future's state to cancelled, schedule the callbacks 
and
+   return True.
"""
+   if self._state != _PENDING:
+   return False
+   self._state = _CANCELLED
+   self._schedule_callbacks()
+   return True
 
-   # Class variables serving as defaults for instance variables.
-   _state = _PENDING
-   _result = None
-   _exception = None
-   _loop = None
-
-   def __init__(self, loop=None):
-   """Initialize the future.
-
-   The optional loop argument allows explicitly setting 
the event
-   loop object used by the future. If it's not provided, 
the future uses
-   the default event loop.
-   """
-   if loop is None:
-   self._loop = global_event_loop()
-   else:
-   self._loop = loop
-   self._callbacks = []
-
-   def cancel(self):
-   """Cancel the future and schedule callbacks.
-
-   If the future is already done or cancelled, return 
False.  Otherwise,
-   change the future's state to cancelled, schedule the 
callbacks and
-   return True.
-   """
-   if self._state != _PENDING:
-   return False
-   self._state = _CANCELLED
-   self._schedule_callbacks()
-   return True
-
-   def _schedule_callbacks(self):
-   """Internal: Ask the event loop to call all callbacks.
-
-   The callbacks are scheduled to be called as soon as 
possible. Also
-   clears the callback list.
-   """
-   callbacks = self._callbacks[:]
-   if not callbacks:
-   return
-
-   self._callbacks[:] = []
-   for callback in callbacks:
-   self._loop.call_soon(callback, self)
-
-   def cancelled(self):
-   """Return True if the future was cancelled."""
-   return self._state == _CANCELLED
-
-   def done(self):
-   """Return True if the future is done.
-
-   Done means either that a result / exception are 
available, or that the
-   future was cancelled.
-   """
-   return self._state !=