[gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/asyncio/, pym/portage/util/_eventloop/

2018-05-24 Thread Zac Medico
commit: adee194534f0b3d9762efd1e8e8713c316b93f5a
Author: Zac Medico  gentoo  org>
AuthorDate: Thu May 24 22:36:29 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Fri May 25 02:01:27 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=adee1945

AsyncioEventLoop: suppress BlockingIOError warning (bug 655656)

Override AbstractEventLoop.run_until_complete() to prevent
BlockingIOError from occurring when the event loop is not running,
by using signal.set_wakeup_fd(-1) to temporarily disable the wakeup
fd. In order to avoid potential interference with API consumers,
only modify wakeup fd when portage._interal_caller is True.

Bug: https://bugs.gentoo.org/655656

 .../util/futures/asyncio/test_wakeup_fd_sigchld.py | 76 ++
 pym/portage/util/_eventloop/asyncio_event_loop.py  | 37 +--
 2 files changed, 106 insertions(+), 7 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_wakeup_fd_sigchld.py 
b/pym/portage/tests/util/futures/asyncio/test_wakeup_fd_sigchld.py
new file mode 100644
index 0..abc67c241
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_wakeup_fd_sigchld.py
@@ -0,0 +1,76 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+import subprocess
+
+import portage
+from portage.const import PORTAGE_PYM_PATH
+from portage.tests import TestCase
+from portage.util._eventloop.global_event_loop import _asyncio_enabled
+
+
+class WakeupFdSigchldTestCase(TestCase):
+   def testWakeupFdSigchld(self):
+   """
+   This is expected to trigger a bunch of messages like the 
following
+   unless the fix for bug 655656 works as intended:
+
+   Exception ignored when trying to write to the signal wakeup fd:
+   BlockingIOError: [Errno 11] Resource temporarily unavailable
+   """
+   if not _asyncio_enabled:
+   self.skipTest('asyncio not enabled')
+
+   script = """
+import asyncio as _real_asyncio
+import os
+import signal
+import sys
+
+import portage
+
+# In order to avoid potential interference with API consumers, wakeup
+# fd handling is enabled only when portage._interal_caller is True.
+portage._internal_caller = True
+
+from portage.util.futures import asyncio
+
+loop = asyncio._wrap_loop()
+
+# Cause the loop to register a child watcher.
+proc = loop.run_until_complete(_real_asyncio.create_subprocess_exec('sleep', 
'0'))
+loop.run_until_complete(proc.wait())
+
+for i in range(8192):
+   os.kill(os.getpid(), signal.SIGCHLD)
+
+# Verify that the child watcher still works correctly
+# (this will hang if it doesn't).
+proc = loop.run_until_complete(_real_asyncio.create_subprocess_exec('sleep', 
'0'))
+loop.run_until_complete(proc.wait())
+loop.close()
+sys.stdout.write('success')
+sys.exit(os.EX_OK)
+"""
+
+   pythonpath = os.environ.get('PYTHONPATH', '').strip().split(':')
+   if not pythonpath or pythonpath[0] != PORTAGE_PYM_PATH:
+   pythonpath = [PORTAGE_PYM_PATH] + pythonpath
+   pythonpath = ':'.join(filter(None, pythonpath))
+
+   proc = subprocess.Popen(
+   [portage._python_interpreter, '-c', script],
+   stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+   env=dict(os.environ, PYTHONPATH=pythonpath))
+
+   out, err = proc.communicate()
+   try:
+   self.assertEqual(out[:100], b'success')
+   except Exception:
+   portage.writemsg(''.join('{}\n'.format(line)
+   for line in 
out.decode(errors='replace').splitlines()[:50]),
+   noiselevel=-1)
+   raise
+
+   self.assertEqual(proc.wait(), os.EX_OK)

diff --git a/pym/portage/util/_eventloop/asyncio_event_loop.py 
b/pym/portage/util/_eventloop/asyncio_event_loop.py
index bf5937de8..65b354544 100644
--- a/pym/portage/util/_eventloop/asyncio_event_loop.py
+++ b/pym/portage/util/_eventloop/asyncio_event_loop.py
@@ -1,6 +1,7 @@
 # Copyright 2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
+import os
 import signal
 
 try:
@@ -11,6 +12,8 @@ except ImportError:
_real_asyncio = None
_AbstractEventLoop = object
 
+import portage
+
 
 class AsyncioEventLoop(_AbstractEventLoop):
"""
@@ -26,13 +29,15 @@ class AsyncioEventLoop(_AbstractEventLoop):
def __init__(self, loop=None):
loop = loop or _real_asyncio.get_event_loop()
self._loop = loop
-   self.run_until_complete = loop.run_until_complete
+   self.run_until_complete = (self._run_until_complete
+   if portage._internal_caller else 
loop.run_until_complete)

[gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/asyncio/

2018-05-09 Thread Zac Medico
commit: 3a55ecd1f79c31f477d7bdd0b9f0e97d8a15eb9e
Author: Zac Medico  gentoo  org>
AuthorDate: Wed May  9 14:19:11 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Wed May  9 14:19:11 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=3a55ecd1

DefaultEventLoopPolicy: test NotImplementedError due to recursion

 .../asyncio/test_policy_wrapper_recursion.py   | 29 ++
 1 file changed, 29 insertions(+)

diff --git 
a/pym/portage/tests/util/futures/asyncio/test_policy_wrapper_recursion.py 
b/pym/portage/tests/util/futures/asyncio/test_policy_wrapper_recursion.py
new file mode 100644
index 0..d3cd94b35
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_policy_wrapper_recursion.py
@@ -0,0 +1,29 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+try:
+   import asyncio
+except ImportError:
+   asyncio = None
+
+from portage.tests import TestCase
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+
+
+class PolicyWrapperRecursionTestCase(TestCase):
+   def testPolicyWrapperRecursion(self):
+   if asyncio is None:
+   self.skipTest('asyncio is not available')
+
+   initial_policy = asyncio.get_event_loop_policy()
+   if not isinstance(initial_policy, DefaultEventLoopPolicy):
+   asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+   try:
+   with self.assertRaises(NotImplementedError):
+   asyncio.get_event_loop()
+
+   with self.assertRaises(NotImplementedError):
+   asyncio.get_child_watcher()
+   finally:
+   asyncio.set_event_loop_policy(initial_policy)



[gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/asyncio/

2018-05-09 Thread Zac Medico
commit: 0c96d2d0b18036cb94d68c42783441d32af2d9d3
Author: Zac Medico  gentoo  org>
AuthorDate: Wed May  9 07:54:26 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Wed May  9 07:59:56 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=0c96d2d0

SubprocessExecTestCase: fix unintended skipTest

The internal asyncio shim does not provide a
create_subprocess_exec attribute, so access it
directly from the real asyncio module.

Fixes 82a3cda6f1ff ("Minimize _asyncio_wrapper usage (bug 654390)")

 .../util/futures/asyncio/test_subprocess_exec.py| 21 +
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index 534d79c53..5a812ba6a 100644
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -4,6 +4,11 @@
 import os
 import subprocess
 
+try:
+   from asyncio import create_subprocess_exec
+except ImportError:
+   create_subprocess_exec = None
+
 from portage.process import find_binary
 from portage.tests import TestCase
 from portage.util._eventloop.global_event_loop import global_event_loop
@@ -71,7 +76,7 @@ class SubprocessExecTestCase(TestCase):

self.assertFalse(global_event_loop().is_closed())
 
def testEcho(self):
-   if not hasattr(asyncio, 'create_subprocess_exec'):
+   if create_subprocess_exec is None:
self.skipTest('create_subprocess_exec not implemented 
for python2')
 
args_tuple = (b'hello', b'world')
@@ -91,7 +96,7 @@ class SubprocessExecTestCase(TestCase):
try:
with open(os.devnull, 'rb', 0) as devnull:
proc = loop.run_until_complete(
-   asyncio.create_subprocess_exec(
+   create_subprocess_exec(
echo_binary, *args_tuple,
stdin=devnull, 
stdout=stdout_pw, stderr=stdout_pw))
 
@@ -114,7 +119,7 @@ class SubprocessExecTestCase(TestCase):
self._run_test(test)
 
def testCat(self):
-   if not hasattr(asyncio, 'create_subprocess_exec'):
+   if create_subprocess_exec is None:
self.skipTest('create_subprocess_exec not implemented 
for python2')
 
stdin_data = b'hello world'
@@ -138,7 +143,7 @@ class SubprocessExecTestCase(TestCase):
output = None
try:
proc = loop.run_until_complete(
-   asyncio.create_subprocess_exec(
+   create_subprocess_exec(
cat_binary,
stdin=stdin_pr, stdout=stdout_pw, 
stderr=stdout_pw))
 
@@ -173,7 +178,7 @@ class SubprocessExecTestCase(TestCase):
requires an AbstractEventLoop.connect_read_pipe implementation
(and a ReadTransport implementation for it to return).
"""
-   if not hasattr(asyncio, 'create_subprocess_exec'):
+   if create_subprocess_exec is None:
self.skipTest('create_subprocess_exec not implemented 
for python2')
 
args_tuple = (b'hello', b'world')
@@ -184,7 +189,7 @@ class SubprocessExecTestCase(TestCase):
def test(loop):
with open(os.devnull, 'rb', 0) as devnull:
proc = loop.run_until_complete(
-   asyncio.create_subprocess_exec(
+   create_subprocess_exec(
echo_binary, *args_tuple,
stdin=devnull,
stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT))
@@ -202,7 +207,7 @@ class SubprocessExecTestCase(TestCase):
requires an AbstractEventLoop.connect_write_pipe implementation
(and a WriteTransport implementation for it to return).
"""
-   if not hasattr(asyncio, 'create_subprocess_exec'):
+   if create_subprocess_exec is None:
self.skipTest('create_subprocess_exec not implemented 
for python2')
 
stdin_data = b'hello world'
@@ -212,7 +217,7 @@ class SubprocessExecTestCase(TestCase):
 
def test(loop):
proc = loop.run_until_complete(
-   asyncio.create_subprocess_exec(
+   create_subprocess_exec(
   

[gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/asyncio/

2018-05-05 Thread Zac Medico
commit: 87aeab1a62cc6fa1d48354a42ec4fa787dbe9603
Author: Zac Medico  gentoo  org>
AuthorDate: Sun May  6 01:19:08 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Sun May  6 01:26:50 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=87aeab1a

WriterPipeClosedTestCase: retry filling pipe

This should suppress spurious writer callback observed
twice for pypy in travis.

See: https://travis-ci.org/gentoo/portage/jobs/375411936
See: https://travis-ci.org/gentoo/portage/jobs/373734825

 .../tests/util/futures/asyncio/test_pipe_closed.py | 39 +-
 1 file changed, 23 insertions(+), 16 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py 
b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
index 5398ca35c..e63829888 100644
--- a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
+++ b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
@@ -105,25 +105,32 @@ class WriterPipeClosedTestCase(_PipeClosedTestCase, 
TestCase):
 
writer_callback.called = loop.create_future()
_set_nonblocking(write_end.fileno())
+   loop.add_writer(write_end.fileno(), writer_callback)
 
-   # Fill up the pipe, so that no writer callbacks should 
be
-   # received until the state has changed.
-   while True:
-   try:
-   os.write(write_end.fileno(), 512 * b'0')
-   except EnvironmentError as e:
-   if e.errno != errno.EAGAIN:
-   raise
+   # With pypy we've seen intermittent spurious writer 
callbacks
+   # here, so retry until the correct state is achieved.
+   tries = 10
+   while tries:
+   tries -= 1
+
+   # Fill up the pipe, so that no writer callbacks 
should be
+   # received until the state has changed.
+   while True:
+   try:
+   os.write(write_end.fileno(), 
512 * b'0')
+   except EnvironmentError as e:
+   if e.errno != errno.EAGAIN:
+   raise
+   break
+
+   # Allow the loop to check for IO events, and 
assert
+   # that our future is still not done.
+   loop.run_until_complete(asyncio.sleep(0, 
loop=loop))
+   if writer_callback.called.done():
+   writer_callback.called = 
loop.create_future()
+   else:
break
 
-   # We've seen at least one spurious writer callback when
-   # this was registered before the pipe was filled, so
-   # register it afterwards.
-   loop.add_writer(write_end.fileno(), writer_callback)
-
-   # Allow the loop to check for IO events, and assert
-   # that our future is still not done.
-   loop.run_until_complete(asyncio.sleep(0, loop=loop))
self.assertFalse(writer_callback.called.done())
 
# Demonstrate that the callback is called afer the



[gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/asyncio/

2018-05-02 Thread Zac Medico
commit: 10253819aae4cefee80f377e167ad521516d66a2
Author: Zac Medico  gentoo  org>
AuthorDate: Thu May  3 01:18:39 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Thu May  3 01:18:39 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=10253819

WriterPipeClosedTestCase: add_writer after pipe is filled

Hopefully this supresses a spurious writer callback observed
once for pypy in travis.

See: https://travis-ci.org/gentoo/portage/jobs/373734825

 pym/portage/tests/util/futures/asyncio/test_pipe_closed.py | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py 
b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
index 1ecddab78..5398ca35c 100644
--- a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
+++ b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
@@ -105,7 +105,6 @@ class WriterPipeClosedTestCase(_PipeClosedTestCase, 
TestCase):
 
writer_callback.called = loop.create_future()
_set_nonblocking(write_end.fileno())
-   loop.add_writer(write_end.fileno(), writer_callback)
 
# Fill up the pipe, so that no writer callbacks should 
be
# received until the state has changed.
@@ -117,6 +116,11 @@ class WriterPipeClosedTestCase(_PipeClosedTestCase, 
TestCase):
raise
break
 
+   # We've seen at least one spurious writer callback when
+   # this was registered before the pipe was filled, so
+   # register it afterwards.
+   loop.add_writer(write_end.fileno(), writer_callback)
+
# Allow the loop to check for IO events, and assert
# that our future is still not done.
loop.run_until_complete(asyncio.sleep(0, loop=loop))



[gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/asyncio/, pym/portage/util/_eventloop/

2018-04-29 Thread Zac Medico
commit: c77afbc31fa687cc612a6f946b324bf4d74d8175
Author: Zac Medico  gentoo  org>
AuthorDate: Mon Apr 30 01:49:18 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Mon Apr 30 02:14:41 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=c77afbc3

EventLoop: call add_reader/writer callbacks after pipe is closed (bug 654382)

Callbacks registered via add_reader/writer methods need to be called
when the other end of a pipe is closed, which does not result in a
normal read or write event. Therefore, respond to other event types
as well, for compatibility with the asyncio event loop implementation.

The included unit tests demonstrate asyncio compatible behavior for
both reader and writer callbacks.

Bug: https://bugs.gentoo.org/654382

 .../tests/util/futures/asyncio/test_pipe_closed.py | 133 +
 pym/portage/util/_eventloop/EventLoop.py   |   7 +-
 2 files changed, 138 insertions(+), 2 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py 
b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
new file mode 100644
index 0..1ecddab78
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
@@ -0,0 +1,133 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import os
+import pty
+import shutil
+import socket
+import sys
+import tempfile
+
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import (
+   DefaultEventLoopPolicy,
+   _set_nonblocking,
+)
+
+
+class _PipeClosedTestCase(object):
+
+   def test_pipe(self):
+   read_end, write_end = os.pipe()
+   self._do_test(read_end, write_end)
+
+   def test_pty_device(self):
+   try:
+   read_end, write_end = pty.openpty()
+   except EnvironmentError:
+   self.skipTest('pty not available')
+   self._do_test(read_end, write_end)
+
+   def test_domain_socket(self):
+   if sys.version_info >= (3, 2):
+   read_end, write_end = socket.socketpair()
+   else:
+   self.skipTest('socket detach not supported')
+   self._do_test(read_end.detach(), write_end.detach())
+
+   def test_named_pipe(self):
+   tempdir = tempfile.mkdtemp()
+   try:
+   fifo_path = os.path.join(tempdir, 'fifo')
+   os.mkfifo(fifo_path)
+   self._do_test(os.open(fifo_path, 
os.O_NONBLOCK|os.O_RDONLY),
+   os.open(fifo_path, os.O_NONBLOCK|os.O_WRONLY))
+   finally:
+   shutil.rmtree(tempdir)
+
+
+class ReaderPipeClosedTestCase(_PipeClosedTestCase, TestCase):
+   """
+   Test that a reader callback is called after the other end of
+   the pipe has been closed.
+   """
+   def _do_test(self, read_end, write_end):
+   initial_policy = asyncio.get_event_loop_policy()
+   if not isinstance(initial_policy, DefaultEventLoopPolicy):
+   asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+   loop = asyncio.get_event_loop()
+   read_end = os.fdopen(read_end, 'rb', 0)
+   write_end = os.fdopen(write_end, 'wb', 0)
+   try:
+   def reader_callback():
+   if not reader_callback.called.done():
+   reader_callback.called.set_result(None)
+
+   reader_callback.called = loop.create_future()
+   loop.add_reader(read_end.fileno(), reader_callback)
+
+   # Allow the loop to check for IO events, and assert
+   # that our future is still not done.
+   loop.run_until_complete(asyncio.sleep(0, loop=loop))
+   self.assertFalse(reader_callback.called.done())
+
+   # Demonstrate that the callback is called afer the
+   # other end of the pipe has been closed.
+   write_end.close()
+   loop.run_until_complete(reader_callback.called)
+   finally:
+   loop.remove_reader(read_end.fileno())
+   write_end.close()
+   read_end.close()
+   asyncio.set_event_loop_policy(initial_policy)
+
+
+class WriterPipeClosedTestCase(_PipeClosedTestCase, TestCase):
+   """
+   Test that a writer callback is called after the other end of
+   the pipe has been closed.
+   """
+   def _do_test(self, read_end, write_end):
+   initial_policy = asyncio.get_event_loop_policy()
+   if not isinstance(initial_policy, 

[gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/asyncio/

2018-04-23 Thread Zac Medico
commit: c23d093d0330a9318534a1c521fb00d6360fabc0
Author: Zac Medico  gentoo  org>
AuthorDate: Mon Apr 23 18:20:25 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Mon Apr 23 18:24:11 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=c23d093d

tests: prove run_until_complete executes done callbacks

Prove that done callbacks are executed before run_until_complete
returns, which is how asyncio's default event loop behaves. This
behavior was implemented in portage's internal event loop in commit
25245d7eb86ed197b7d7cfead0dbe4ce8ad4bc5b.

Bug: https://bugs.gentoo.org/591760

 .../futures/asyncio/test_run_until_complete.py | 29 ++
 1 file changed, 29 insertions(+)

diff --git a/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py 
b/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py
new file mode 100644
index 0..fc8f198ca
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py
@@ -0,0 +1,29 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+
+
+class RunUntilCompleteTestCase(TestCase):
+   def test_add_done_callback(self):
+   initial_policy = asyncio.get_event_loop_policy()
+   if not isinstance(initial_policy, DefaultEventLoopPolicy):
+   asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+   try:
+   loop = asyncio.get_event_loop()
+   f1 = loop.create_future()
+   f2 = loop.create_future()
+   f1.add_done_callback(f2.set_result)
+   loop.call_soon(lambda: f1.set_result(None))
+   loop.run_until_complete(f1)
+   self.assertEqual(f1.done(), True)
+
+   # This proves that done callbacks of f1 are executed 
before
+   # loop.run_until_complete(f1) returns, which is how 
asyncio's
+   # default event loop behaves.
+   self.assertEqual(f2.done(), True)
+   finally:
+   asyncio.set_event_loop_policy(initial_policy)



[gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/asyncio/, pym/portage/util/futures/

2018-04-16 Thread Zac Medico
commit: 72914c0dbae1dfab7565a627451b616e330b8889
Author: Zac Medico  gentoo  org>
AuthorDate: Sun Apr 15 10:11:21 2018 +
Commit: Zac Medico  gentoo  org>
CommitDate: Tue Apr 17 02:19:57 2018 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=72914c0d

Implement AbstractEventLoop.connect_write_pipe (bug 649588)

In python versions that support asyncio, this allows API consumers
to use subprocess.PIPE for asyncio.create_subprocess_exec() stdin
parameters.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_subprocess_exec.py   |  34 +++
 pym/portage/util/futures/transports.py |  90 +++
 pym/portage/util/futures/unix_events.py| 259 -
 3 files changed, 373 insertions(+), 10 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index 94984fc93..8c8c395ca 100644
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -191,3 +191,37 @@ class SubprocessExecTestCase(TestCase):
self.assertEqual(loop.run_until_complete(proc.wait()), 
os.EX_OK)
 
self._run_test(test)
+
+   def testWriteTransport(self):
+   """
+   Test asyncio.create_subprocess_exec(stdin=subprocess.PIPE) which
+   requires an AbstractEventLoop.connect_write_pipe implementation
+   (and a WriteTransport implementation for it to return).
+   """
+   if not hasattr(asyncio, 'create_subprocess_exec'):
+   self.skipTest('create_subprocess_exec not implemented 
for python2')
+
+   stdin_data = b'hello world'
+   cat_binary = find_binary("cat")
+   self.assertNotEqual(cat_binary, None)
+   cat_binary = cat_binary.encode()
+
+   def test(loop):
+   proc = loop.run_until_complete(
+   asyncio.create_subprocess_exec(
+   cat_binary,
+   stdin=subprocess.PIPE,
+   stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT))
+
+   # This buffers data when necessary to avoid blocking.
+   proc.stdin.write(stdin_data)
+   # Any buffered data is written asynchronously after the
+   # close method is called.
+   proc.stdin.close()
+
+   self.assertEqual(
+   loop.run_until_complete(proc.stdout.read()),
+   stdin_data)
+   self.assertEqual(loop.run_until_complete(proc.wait()), 
os.EX_OK)
+
+   self._run_test(test)

diff --git a/pym/portage/util/futures/transports.py 
b/pym/portage/util/futures/transports.py
new file mode 100644
index 0..60ea93073
--- /dev/null
+++ b/pym/portage/util/futures/transports.py
@@ -0,0 +1,90 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+try:
+   from asyncio.transports import Transport as _Transport
+except ImportError:
+   _Transport = object
+
+
+class _FlowControlMixin(_Transport):
+   """
+   This is identical to the standard library's private
+   asyncio.transports._FlowControlMixin class.
+
+   All the logic for (write) flow control in a mix-in base class.
+
+   The subclass must implement get_write_buffer_size().  It must call
+   _maybe_pause_protocol() whenever the write buffer size increases,
+   and _maybe_resume_protocol() whenever it decreases.  It may also
+   override set_write_buffer_limits() (e.g. to specify different
+   defaults).
+
+   The subclass constructor must call super().__init__(extra).  This
+   will call set_write_buffer_limits().
+
+   The user may call set_write_buffer_limits() and
+   get_write_buffer_size(), and their protocol's pause_writing() and
+   resume_writing() may be called.
+   """
+
+   def __init__(self, extra=None, loop=None):
+   super().__init__(extra)
+   assert loop is not None
+   self._loop = loop
+   self._protocol_paused = False
+   self._set_write_buffer_limits()
+
+   def _maybe_pause_protocol(self):
+   size = self.get_write_buffer_size()
+   if size <= self._high_water:
+   return
+   if not self._protocol_paused:
+   self._protocol_paused = True
+   try:
+   self._protocol.pause_writing()
+   except Exception as exc:
+   self._loop.call_exception_handler({
+   'message':