[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/

2018-04-15 Thread Zac Medico
commit: 9a7b0a006e65f8683716d60574e4f19f8ffd603d
Author: Zac Medico  gentoo  org>
AuthorDate: Sat Apr 14 21:29:29 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Mon Apr 16 00:04:26 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=9a7b0a00

Implement AbstractEventLoop.connect_read_pipe (bug 649588)

In python versions that support asyncio, this allows API consumers
to use subprocess.PIPE for asyncio.create_subprocess_exec() stdout
and stderr parameters.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_subprocess_exec.py   |  30 
 pym/portage/util/futures/unix_events.py| 157 -
 2 files changed, 184 insertions(+), 3 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index d30f48c43..94984fc93 100644
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -2,6 +2,7 @@
 # Distributed under the terms of the GNU General Public License v2
 
 import os
+import subprocess
 
 from portage.process import find_binary
 from portage.tests import TestCase
@@ -161,3 +162,32 @@ class SubprocessExecTestCase(TestCase):
f.close()
 
self._run_test(test)
+
+   def testReadTransport(self):
+   """
+   Test asyncio.create_subprocess_exec(stdout=subprocess.PIPE) 
which
+   requires an AbstractEventLoop.connect_read_pipe implementation
+   (and a ReadTransport implementation for it to return).
+   """
+   if not hasattr(asyncio, 'create_subprocess_exec'):
+   self.skipTest('create_subprocess_exec not implemented 
for python2')
+
+   args_tuple = (b'hello', b'world')
+   echo_binary = find_binary("echo")
+   self.assertNotEqual(echo_binary, None)
+   echo_binary = echo_binary.encode()
+
+   def test(loop):
+   with open(os.devnull, 'rb', 0) as devnull:
+   proc = loop.run_until_complete(
+   asyncio.create_subprocess_exec(
+   echo_binary, *args_tuple,
+   stdin=devnull,
+   stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT))
+
+   self.assertEqual(
+   
tuple(loop.run_until_complete(proc.stdout.read()).split()),
+   args_tuple)
+   self.assertEqual(loop.run_until_complete(proc.wait()), 
os.EX_OK)
+
+   self._run_test(test)

diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
index d788c2bea..9d84ab6aa 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -9,12 +9,18 @@ __all__ = (
 try:
from asyncio.base_subprocess import BaseSubprocessTransport as 
_BaseSubprocessTransport
from asyncio.unix_events import AbstractChildWatcher as 
_AbstractChildWatcher
+   from asyncio.transports import ReadTransport as _ReadTransport
 except ImportError:
_AbstractChildWatcher = object
_BaseSubprocessTransport = object
+   _ReadTransport = object
 
+import errno
+import fcntl
 import functools
+import logging
 import os
+import stat
 import subprocess
 
 from portage.util._eventloop.global_event_loop import (
@@ -82,6 +88,35 @@ class _PortageEventLoop(events.AbstractEventLoop):
"""
return asyncio.Task(coro, loop=self)
 
+   def connect_read_pipe(self, protocol_factory, pipe):
+   """
+   Register read pipe in event loop. Set the pipe to non-blocking 
mode.
+
+   @type protocol_factory: callable
+   @param protocol_factory: must instantiate object with Protocol 
interface
+   @type pipe: file
+   @param pipe: a pipe to read from
+   @rtype: asyncio.Future
+   @return: Return pair (transport, protocol), where transport 
supports the
+   ReadTransport interface.
+   """
+   protocol = protocol_factory()
+   result = self.create_future()
+   waiter = self.create_future()
+   transport = self._make_read_pipe_transport(pipe, protocol, 
waiter=waiter)
+
+   def waiter_callback(waiter):
+   try:
+   waiter.result()
+   except Exception as e:
+   transport.close()
+   result.set_exception(e)
+   else:
+   result.set_result((transport, protocol))
+
+ 

[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/

2018-04-13 Thread Zac Medico
commit: d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2
Author: Zac Medico  gentoo  org>
AuthorDate: Thu Apr 12 03:56:25 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Fri Apr 13 07:10:10 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=d31db4df

Implement _PortageEventLoop.subprocess_exec (bug 649588)

In python versions that support asyncio, this allows API consumers
to use the asyncio.create_subprocess_exec() function with portage's
internal event loop. Currently, subprocess.PIPE is not implemented
because that would require an implementation of asyncio's private
asyncio.unix_events._UnixReadPipeTransport class. However, it's
possible to use pipes created with os.pipe() for stdin, stdout,
and stderr, as demonstrated in the included unit tests.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_subprocess_exec.py   | 163 +
 pym/portage/util/futures/unix_events.py|  98 +
 2 files changed, 261 insertions(+)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
new file mode 100644
index 0..d30f48c43
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -0,0 +1,163 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+
+from portage.process import find_binary
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.executor.fork import ForkExecutor
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+from _emerge.PipeReader import PipeReader
+
+
+def reader(input_file, loop=None):
+   """
+   Asynchronously read a binary input file.
+
+   @param input_file: binary input file
+   @type input_file: file
+   @param loop: event loop
+   @type loop: EventLoop
+   @return: bytes
+   @rtype: asyncio.Future (or compatible)
+   """
+   loop = loop or asyncio.get_event_loop()
+   loop = getattr(loop, '_asyncio_wrapper', loop)
+   future = loop.create_future()
+   _Reader(future, input_file, loop)
+   return future
+
+
+class _Reader(object):
+   def __init__(self, future, input_file, loop):
+   self._future = future
+   self._pipe_reader = PipeReader(
+   input_files={'input_file':input_file}, 
scheduler=loop._loop)
+
+   self._future.add_done_callback(self._cancel_callback)
+   self._pipe_reader.addExitListener(self._eof)
+   self._pipe_reader.start()
+
+   def _cancel_callback(self, future):
+   if future.cancelled():
+   self._cancel()
+
+   def _eof(self, pipe_reader):
+   self._pipe_reader = None
+   self._future.set_result(pipe_reader.getvalue())
+
+   def _cancel(self):
+   if self._pipe_reader is not None and self._pipe_reader.poll() 
is None:
+   self._pipe_reader.removeExitListener(self._eof)
+   self._pipe_reader.cancel()
+   self._pipe_reader = None
+
+
+class SubprocessExecTestCase(TestCase):
+   def _run_test(self, test):
+   initial_policy = asyncio.get_event_loop_policy()
+   if not isinstance(initial_policy, DefaultEventLoopPolicy):
+   asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+   try:
+   test(asyncio.get_event_loop())
+   finally:
+   asyncio.set_event_loop_policy(initial_policy)
+
+   def testEcho(self):
+   if not hasattr(asyncio, 'create_subprocess_exec'):
+   self.skipTest('create_subprocess_exec not implemented 
for python2')
+
+   args_tuple = (b'hello', b'world')
+   echo_binary = find_binary("echo")
+   self.assertNotEqual(echo_binary, None)
+   echo_binary = echo_binary.encode()
+
+   # Use os.pipe(), since this loop does not implement the
+   # ReadTransport necessary for subprocess.PIPE support.
+   stdout_pr, stdout_pw = os.pipe()
+   stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+   stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+   files = [stdout_pr, stdout_pw]
+
+   def test(loop):
+   output = None
+   try:
+   with open(os.devnull, 'rb', 0) as devnull:
+   proc = loop.run_until_complete(
+   asyncio.create_subprocess_exec(
+   echo_binary, *args_tuple,
+   stdin=devnull, 
stdout=stdout_pw, stderr=stdout_pw))
+
+   

[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/, ...

2018-04-12 Thread Zac Medico
commit: a78dca7e47f79ad48aee4909ee10688604996b86
Author: Zac Medico  gentoo  org>
AuthorDate: Wed Apr 11 06:44:41 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Thu Apr 12 08:35:05 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=a78dca7e

Implement AbstractEventLoopPolicy.get_child_watcher() (bug 649588)

Use a _PortageChildWatcher class to wrap portage's internal event loop
and implement asyncio's AbstractChildWatcher interface.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_child_watcher.py | 45 +++
 pym/portage/util/_eventloop/EventLoop.py   |  7 +-
 pym/portage/util/futures/_asyncio.py   | 13 
 pym/portage/util/futures/unix_events.py| 90 ++
 4 files changed, 152 insertions(+), 3 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py 
b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
new file mode 100644
index 0..dca01be56
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
@@ -0,0 +1,45 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+
+from portage.process import find_binary, spawn
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+
+
+class ChildWatcherTestCase(TestCase):
+   def testChildWatcher(self):
+   true_binary = find_binary("true")
+   self.assertNotEqual(true_binary, None)
+
+   initial_policy = asyncio.get_event_loop_policy()
+   if not isinstance(initial_policy, DefaultEventLoopPolicy):
+   asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+   try:
+   try:
+   asyncio.set_child_watcher(None)
+   except NotImplementedError:
+   pass
+   else:
+   self.assertTrue(False)
+
+   args_tuple = ('hello', 'world')
+
+   loop = asyncio.get_event_loop()
+   future = loop.create_future()
+
+   def callback(pid, returncode, *args):
+   future.set_result((pid, returncode, args))
+
+   with asyncio.get_child_watcher() as watcher:
+   pids = spawn([true_binary], returnpid=True)
+   watcher.add_child_handler(pids[0], callback, 
*args_tuple)
+
+   self.assertEqual(
+   loop.run_until_complete(future),
+   (pids[0], os.EX_OK, args_tuple))
+   finally:
+   asyncio.set_event_loop_policy(initial_policy)

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index d53a76ba1..12c199c76 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -25,7 +25,7 @@ import portage
 portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures.futures:Future',
'portage.util.futures.executor.fork:ForkExecutor',
-   'portage.util.futures.unix_events:_PortageEventLoop',
+   
'portage.util.futures.unix_events:_PortageEventLoop,_PortageChildWatcher',
 )
 
 from portage import OrderedDict
@@ -190,6 +190,7 @@ class EventLoop(object):
self._sigchld_src_id = None
self._pid = os.getpid()
self._asyncio_wrapper = _PortageEventLoop(loop=self)
+   self._asyncio_child_watcher = _PortageChildWatcher(self)
 
def create_future(self):
"""
@@ -424,8 +425,8 @@ class EventLoop(object):
self._sigchld_read, self.IO_IN, 
self._sigchld_io_cb)
signal.signal(signal.SIGCHLD, 
self._sigchld_sig_cb)
 
-   # poll now, in case the SIGCHLD has already arrived
-   self._poll_child_processes()
+   # poll soon, in case the SIGCHLD has already arrived
+   self.call_soon(self._poll_child_processes)
return source_id
 
def _sigchld_sig_cb(self, signum, frame):

diff --git a/pym/portage/util/futures/_asyncio.py 
b/pym/portage/util/futures/_asyncio.py
index 02ab5..0f84f14b7 100644
--- a/pym/portage/util/futures/_asyncio.py
+++ b/pym/portage/util/futures/_asyncio.py
@@ -3,7 +3,9 @@
 
 __all__ = (
'ensure_future',
+   'get_child_watcher',
'get_event_loop',
+   'set_child_watcher',
'get_event_loop_policy',
'set_event_loop_policy',
'sleep',
@@ -62,6 +64,17 @@ def get_event_loop():
return get_event_loop_policy().get_event_loop()
 
 
+def get