[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/
commit: 9a7b0a006e65f8683716d60574e4f19f8ffd603d Author: Zac Medico gentoo org> AuthorDate: Sat Apr 14 21:29:29 2018 + Commit: Zac Medico gentoo org> CommitDate: Mon Apr 16 00:04:26 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=9a7b0a00 Implement AbstractEventLoop.connect_read_pipe (bug 649588) In python versions that support asyncio, this allows API consumers to use subprocess.PIPE for asyncio.create_subprocess_exec() stdout and stderr parameters. Bug: https://bugs.gentoo.org/649588 .../util/futures/asyncio/test_subprocess_exec.py | 30 pym/portage/util/futures/unix_events.py| 157 - 2 files changed, 184 insertions(+), 3 deletions(-) diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py index d30f48c43..94984fc93 100644 --- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py @@ -2,6 +2,7 @@ # Distributed under the terms of the GNU General Public License v2 import os +import subprocess from portage.process import find_binary from portage.tests import TestCase @@ -161,3 +162,32 @@ class SubprocessExecTestCase(TestCase): f.close() self._run_test(test) + + def testReadTransport(self): + """ + Test asyncio.create_subprocess_exec(stdout=subprocess.PIPE) which + requires an AbstractEventLoop.connect_read_pipe implementation + (and a ReadTransport implementation for it to return). + """ + if not hasattr(asyncio, 'create_subprocess_exec'): + self.skipTest('create_subprocess_exec not implemented for python2') + + args_tuple = (b'hello', b'world') + echo_binary = find_binary("echo") + self.assertNotEqual(echo_binary, None) + echo_binary = echo_binary.encode() + + def test(loop): + with open(os.devnull, 'rb', 0) as devnull: + proc = loop.run_until_complete( + asyncio.create_subprocess_exec( + echo_binary, *args_tuple, + stdin=devnull, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) + + self.assertEqual( + tuple(loop.run_until_complete(proc.stdout.read()).split()), + args_tuple) + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) + + self._run_test(test) diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py index d788c2bea..9d84ab6aa 100644 --- a/pym/portage/util/futures/unix_events.py +++ b/pym/portage/util/futures/unix_events.py @@ -9,12 +9,18 @@ __all__ = ( try: from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher + from asyncio.transports import ReadTransport as _ReadTransport except ImportError: _AbstractChildWatcher = object _BaseSubprocessTransport = object + _ReadTransport = object +import errno +import fcntl import functools +import logging import os +import stat import subprocess from portage.util._eventloop.global_event_loop import ( @@ -82,6 +88,35 @@ class _PortageEventLoop(events.AbstractEventLoop): """ return asyncio.Task(coro, loop=self) + def connect_read_pipe(self, protocol_factory, pipe): + """ + Register read pipe in event loop. Set the pipe to non-blocking mode. + + @type protocol_factory: callable + @param protocol_factory: must instantiate object with Protocol interface + @type pipe: file + @param pipe: a pipe to read from + @rtype: asyncio.Future + @return: Return pair (transport, protocol), where transport supports the + ReadTransport interface. + """ + protocol = protocol_factory() + result = self.create_future() + waiter = self.create_future() + transport = self._make_read_pipe_transport(pipe, protocol, waiter=waiter) + + def waiter_callback(waiter): + try: + waiter.result() + except Exception as e: + transport.close() + result.set_exception(e) + else: + result.set_result((transport, protocol)) + +
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/
commit: d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2 Author: Zac Medico gentoo org> AuthorDate: Thu Apr 12 03:56:25 2018 + Commit: Zac Medico gentoo org> CommitDate: Fri Apr 13 07:10:10 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=d31db4df Implement _PortageEventLoop.subprocess_exec (bug 649588) In python versions that support asyncio, this allows API consumers to use the asyncio.create_subprocess_exec() function with portage's internal event loop. Currently, subprocess.PIPE is not implemented because that would require an implementation of asyncio's private asyncio.unix_events._UnixReadPipeTransport class. However, it's possible to use pipes created with os.pipe() for stdin, stdout, and stderr, as demonstrated in the included unit tests. Bug: https://bugs.gentoo.org/649588 .../util/futures/asyncio/test_subprocess_exec.py | 163 + pym/portage/util/futures/unix_events.py| 98 + 2 files changed, 261 insertions(+) diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py new file mode 100644 index 0..d30f48c43 --- /dev/null +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py @@ -0,0 +1,163 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import os + +from portage.process import find_binary +from portage.tests import TestCase +from portage.util.futures import asyncio +from portage.util.futures.executor.fork import ForkExecutor +from portage.util.futures.unix_events import DefaultEventLoopPolicy +from _emerge.PipeReader import PipeReader + + +def reader(input_file, loop=None): + """ + Asynchronously read a binary input file. + + @param input_file: binary input file + @type input_file: file + @param loop: event loop + @type loop: EventLoop + @return: bytes + @rtype: asyncio.Future (or compatible) + """ + loop = loop or asyncio.get_event_loop() + loop = getattr(loop, '_asyncio_wrapper', loop) + future = loop.create_future() + _Reader(future, input_file, loop) + return future + + +class _Reader(object): + def __init__(self, future, input_file, loop): + self._future = future + self._pipe_reader = PipeReader( + input_files={'input_file':input_file}, scheduler=loop._loop) + + self._future.add_done_callback(self._cancel_callback) + self._pipe_reader.addExitListener(self._eof) + self._pipe_reader.start() + + def _cancel_callback(self, future): + if future.cancelled(): + self._cancel() + + def _eof(self, pipe_reader): + self._pipe_reader = None + self._future.set_result(pipe_reader.getvalue()) + + def _cancel(self): + if self._pipe_reader is not None and self._pipe_reader.poll() is None: + self._pipe_reader.removeExitListener(self._eof) + self._pipe_reader.cancel() + self._pipe_reader = None + + +class SubprocessExecTestCase(TestCase): + def _run_test(self, test): + initial_policy = asyncio.get_event_loop_policy() + if not isinstance(initial_policy, DefaultEventLoopPolicy): + asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) + + try: + test(asyncio.get_event_loop()) + finally: + asyncio.set_event_loop_policy(initial_policy) + + def testEcho(self): + if not hasattr(asyncio, 'create_subprocess_exec'): + self.skipTest('create_subprocess_exec not implemented for python2') + + args_tuple = (b'hello', b'world') + echo_binary = find_binary("echo") + self.assertNotEqual(echo_binary, None) + echo_binary = echo_binary.encode() + + # Use os.pipe(), since this loop does not implement the + # ReadTransport necessary for subprocess.PIPE support. + stdout_pr, stdout_pw = os.pipe() + stdout_pr = os.fdopen(stdout_pr, 'rb', 0) + stdout_pw = os.fdopen(stdout_pw, 'wb', 0) + files = [stdout_pr, stdout_pw] + + def test(loop): + output = None + try: + with open(os.devnull, 'rb', 0) as devnull: + proc = loop.run_until_complete( + asyncio.create_subprocess_exec( + echo_binary, *args_tuple, + stdin=devnull, stdout=stdout_pw, stderr=stdout_pw)) + +
[gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/, ...
commit: a78dca7e47f79ad48aee4909ee10688604996b86 Author: Zac Medico gentoo org> AuthorDate: Wed Apr 11 06:44:41 2018 + Commit: Zac Medico gentoo org> CommitDate: Thu Apr 12 08:35:05 2018 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=a78dca7e Implement AbstractEventLoopPolicy.get_child_watcher() (bug 649588) Use a _PortageChildWatcher class to wrap portage's internal event loop and implement asyncio's AbstractChildWatcher interface. Bug: https://bugs.gentoo.org/649588 .../util/futures/asyncio/test_child_watcher.py | 45 +++ pym/portage/util/_eventloop/EventLoop.py | 7 +- pym/portage/util/futures/_asyncio.py | 13 pym/portage/util/futures/unix_events.py| 90 ++ 4 files changed, 152 insertions(+), 3 deletions(-) diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py new file mode 100644 index 0..dca01be56 --- /dev/null +++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py @@ -0,0 +1,45 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import os + +from portage.process import find_binary, spawn +from portage.tests import TestCase +from portage.util.futures import asyncio +from portage.util.futures.unix_events import DefaultEventLoopPolicy + + +class ChildWatcherTestCase(TestCase): + def testChildWatcher(self): + true_binary = find_binary("true") + self.assertNotEqual(true_binary, None) + + initial_policy = asyncio.get_event_loop_policy() + if not isinstance(initial_policy, DefaultEventLoopPolicy): + asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) + + try: + try: + asyncio.set_child_watcher(None) + except NotImplementedError: + pass + else: + self.assertTrue(False) + + args_tuple = ('hello', 'world') + + loop = asyncio.get_event_loop() + future = loop.create_future() + + def callback(pid, returncode, *args): + future.set_result((pid, returncode, args)) + + with asyncio.get_child_watcher() as watcher: + pids = spawn([true_binary], returnpid=True) + watcher.add_child_handler(pids[0], callback, *args_tuple) + + self.assertEqual( + loop.run_until_complete(future), + (pids[0], os.EX_OK, args_tuple)) + finally: + asyncio.set_event_loop_policy(initial_policy) diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index d53a76ba1..12c199c76 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -25,7 +25,7 @@ import portage portage.proxy.lazyimport.lazyimport(globals(), 'portage.util.futures.futures:Future', 'portage.util.futures.executor.fork:ForkExecutor', - 'portage.util.futures.unix_events:_PortageEventLoop', + 'portage.util.futures.unix_events:_PortageEventLoop,_PortageChildWatcher', ) from portage import OrderedDict @@ -190,6 +190,7 @@ class EventLoop(object): self._sigchld_src_id = None self._pid = os.getpid() self._asyncio_wrapper = _PortageEventLoop(loop=self) + self._asyncio_child_watcher = _PortageChildWatcher(self) def create_future(self): """ @@ -424,8 +425,8 @@ class EventLoop(object): self._sigchld_read, self.IO_IN, self._sigchld_io_cb) signal.signal(signal.SIGCHLD, self._sigchld_sig_cb) - # poll now, in case the SIGCHLD has already arrived - self._poll_child_processes() + # poll soon, in case the SIGCHLD has already arrived + self.call_soon(self._poll_child_processes) return source_id def _sigchld_sig_cb(self, signum, frame): diff --git a/pym/portage/util/futures/_asyncio.py b/pym/portage/util/futures/_asyncio.py index 02ab5..0f84f14b7 100644 --- a/pym/portage/util/futures/_asyncio.py +++ b/pym/portage/util/futures/_asyncio.py @@ -3,7 +3,9 @@ __all__ = ( 'ensure_future', + 'get_child_watcher', 'get_event_loop', + 'set_child_watcher', 'get_event_loop_policy', 'set_event_loop_policy', 'sleep', @@ -62,6 +64,17 @@ def get_event_loop(): return get_event_loop_policy().get_event_loop() +def get